PeerConnection: display URI during debug prints. Also change error handling.

This commit is contained in:
Nekojimi 2025-10-02 00:08:51 +01:00
parent eae105ab61
commit 1131d47dc0
2 changed files with 111 additions and 48 deletions

View file

@ -2,6 +2,7 @@ package moe.nekojimi.friendcloud.network;
import com.google.protobuf.*;
import moe.nekojimi.friendcloud.Controller;
import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
@ -20,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public abstract class PeerConnection extends Thread
{
@ -27,22 +29,18 @@ public abstract class PeerConnection extends Thread
private ObjectID peerID = new ObjectID(0);
private long nextMessageId = 1;
private final URI uri;
private long artificalDelayMs = 0;
private long artificalDelayMs;
private final Map<String, MessageHandler<?>> messageHandlers = new HashMap<>();
public PeerConnection()
{
this(null);
}
public PeerConnection(URI uri)
public PeerConnection(@NotNull URI uri)
{
this.uri = uri;
installDefaultMessageHandlers();
artificalDelayMs = Main.getInstance().getArgs().getArtificialLagMs();
}
public PeerConnection(URI uri, @NotNull ObjectID peerID)
public PeerConnection(@NotNull URI uri, @NotNull ObjectID peerID)
{
this(uri);
this.peerID = peerID;
@ -58,19 +56,18 @@ public abstract class PeerConnection extends Thread
public synchronized <T> CompletableFuture<T> makeRequest(Request<?, T> request)
{
if (!isAlive())
throw new IllegalStateException("Request made to PeerConnection that isn't running!");
throw new IllegalStateException("PeerConnection (" + getUri() + "): Request made to PeerConnection that isn't running!");
try
{
Message message = request.buildMessage();
CommonMessages.FriendCloudMessage wrappedMessage = wrapMessage(message);
pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request);
sendMessage(wrappedMessage);
pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request);
return request.getFuture();
} catch (Exception e)
{
System.err.println("Request failed!");
System.err.println("PeerConnection (" + getUri() + "): Request failed!");
e.printStackTrace(System.err);
return CompletableFuture.failedFuture(e);
}
@ -107,7 +104,7 @@ public abstract class PeerConnection extends Thread
private void replyWithError(CommonMessages.Error error, CommonMessages.MessageHeader replyHeader) throws IOException
{
System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
System.err.println("PeerConnection (" + getUri() + "): Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build();
sendMessage(wrapMessage(errorMessage, replyHeader));
}
@ -118,7 +115,7 @@ public abstract class PeerConnection extends Thread
Any body = message.getBody();
long replyToMessageId = header.getReplyToMessageId();
ObjectID senderID = new ObjectID(header.getSenderId());
System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
System.out.println("PeerConnection (" + getUri() + "): Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
try
{
try
@ -127,7 +124,7 @@ public abstract class PeerConnection extends Thread
{
try
{
System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms...");
System.err.println("WARNING: artificial lag activated! Waiting " + artificalDelayMs + "ms...");
Thread.sleep(artificalDelayMs);
} catch (InterruptedException e)
{
@ -137,9 +134,15 @@ public abstract class PeerConnection extends Thread
if (!senderID.isNull())
{
if (peerID.isNull())
Peer localPeer = Main.getInstance().getModel().getLocalPeer();
if (localPeer != null && Objects.equals(senderID, localPeer.getObjectID()))
{
System.out.println("PeerConnection: Identified sender as " + senderID);
System.err.println("PeerConnection (" + getUri() + "): Connected to ourselves, terminating connection!");
shutdown();
}
else if (peerID.isNull())
{
System.out.println("PeerConnection (" + getUri() + "): Identified sender as " + senderID);
peerID = senderID;
}
else
@ -184,7 +187,7 @@ public abstract class PeerConnection extends Thread
private void handleErrorToUnsolicitedMessage(CommonMessages.MessageHeader header, CommonMessages.ErrorMessage body)
{
throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
throw new RuntimeException("PeerConnection (" + getUri() + "): Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
}
@SuppressWarnings({"rawtypes", "unchecked"})
@ -200,7 +203,7 @@ public abstract class PeerConnection extends Thread
}
else
{
System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!");
System.err.println("PeerConnection (" + getUri() + "): don't have a MessageHandler for message type " + typeUrl + "!");
replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header);
}
}
@ -208,7 +211,7 @@ public abstract class PeerConnection extends Thread
private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException
{
long replyToMessageId = header.getReplyToMessageId();
System.out.println("Received reply to message ID " + replyToMessageId);
System.out.println("PeerConnection (" + getUri() + "): Received reply to message ID " + replyToMessageId);
Request<?, ?> request = pendingRequests.get(replyToMessageId);
boolean doneWithRequest = request.handleReply(body);
if (doneWithRequest)
@ -236,14 +239,17 @@ public abstract class PeerConnection extends Thread
private void installDefaultMessageHandlers()
{
Controller controller = Main.getInstance().getModel();
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException
{
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList()));
List<NetworkObject> objects = controller.listObjects(new HashSet<>(message.getTypesList()));
ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
ObjectChangeRecord currentChangeRecord = controller.getLocalData().getCurrentChangeRecord();
objectList.setChangeHead(currentChangeRecord == null ? 0L : currentChangeRecord.getChangeID());
for (NetworkObject object : objects)
{
objectList.addStates(object.buildObjectState());
@ -263,7 +269,7 @@ public abstract class PeerConnection extends Thread
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
}
NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId()));
NetworkFile networkFile = controller.getObject(new ObjectID(message.getFileId()));
if (networkFile == null)
{
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
@ -273,7 +279,7 @@ public abstract class PeerConnection extends Thread
{
int startIndex = message.getStartPieceIndex();
int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1;
System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex);
System.out.println("PeerConnection (" + getUri() + "): Been asked for pieces from " + startIndex + " to " + endIndex);
List<Long> indices = new ArrayList<>();
for (int index = startIndex; index <= endIndex; index += message.getPieceMod())
{
@ -286,7 +292,7 @@ public abstract class PeerConnection extends Thread
byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index));
if (buffer != null)
{
System.out.println("Replying to file piece request with piece " + index);
System.out.println("PeerConnection (" + getUri() + "): Replying to file piece request with piece " + index);
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
.setPieceIndex(Math.toIntExact(index))
.setFileId(networkFile.getObjectID().toLong())
@ -296,7 +302,7 @@ public abstract class PeerConnection extends Thread
}
else
{
System.err.println("Don't have requested piece " + index + "!");
System.err.println("PeerConnection (" + getUri() + "): Don't have requested piece " + index + "!");
replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
break;
}
@ -310,8 +316,8 @@ public abstract class PeerConnection extends Thread
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException
{
List<Long> changesSinceList = message.getChangesSinceList();
System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString));
Set<ObjectChangeRecord> changes = Main.getInstance().getModel().findChangesSince(changesSinceList);
System.out.println("PeerConnection (" + getUri() + "): Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString).collect(Collectors.toSet()));
Set<ObjectChangeRecord> changes = controller.findChangesSince(changesSinceList);
if (changes == null)
{
replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header);
@ -323,7 +329,7 @@ public abstract class PeerConnection extends Thread
{
reply.addChangeMessages(change.buildObjectChangeMessage());
}
System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes");
System.out.println("PeerConnection (" + getUri() + "): Replying with " + reply.getChangeMessagesCount() + " changes");
sendMessage(wrapMessage(reply.build(), header));
}
}
@ -334,19 +340,26 @@ public abstract class PeerConnection extends Thread
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message)
{
ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message);
Main.getInstance().getModel().applyChangeRecord(record);
controller.applyChangeRecord(record);
}
});
installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message)
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message) throws IOException
{
List<Long> remoteChangeHeads = message.getCurrentChangeHeadsList();
Peer peer = controller.getObject(peerID);
if (peer != null)
{
peer.setLastKnownChangeID(message.getCurrentChange());
controller.objectChanged(peer);
}
Set<Long> remoteChangeHeads = new HashSet<>(message.getCurrentChangeHeadsList());
boolean potentialNewChanges = false;
for (long remoteChangeHead : remoteChangeHeads)
{
boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
boolean exists = controller.getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
if (!exists)
{
potentialNewChanges = true;
@ -355,10 +368,30 @@ public abstract class PeerConnection extends Thread
}
if (potentialNewChanges)
{
PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID)));
if (peer != null)
{
PullChangesTask task = new PullChangesTask(Set.of(peer));
Main.getInstance().getExecutor().submit(task);
}
}
else
{
Set<Long> changeHeadIDs = controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet());
// if there's no new changes then we know all the changes that the remote has
// so if they're not the same as our latest changes then they don't know about something we do
// so send them a checkin
boolean remoteOutOfDate = !changeHeadIDs.equals(remoteChangeHeads);
if (remoteOutOfDate)
{
CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder()
.addAllCurrentChangeHeads(changeHeadIDs)
.build();
sendMessage(wrapMessage(checkInMessage));
}
}
}
});
}

View file

@ -2,12 +2,12 @@ package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
import java.net.*;
public class TCPPeerConnection extends PeerConnection
{
@ -18,24 +18,37 @@ public class TCPPeerConnection extends PeerConnection
{
super(tcpURL, peer);
socket = new Socket(tcpURL.getHost(), tcpURL.getPort());
System.out.println("TCP Connection: connected to " + tcpURL + " OK!");
System.out.println("TCPPeerConnection: connected to " + tcpURL + " OK!");
}
public TCPPeerConnection(Socket openSocket)
{
super();
super(getSocketURI(openSocket.getInetAddress(), openSocket.getPort()));
socket = openSocket;
}
private static URI getSocketURI(@NotNull InetAddress address, int port)
{
try
{
return new URI("tcp://" + address.getHostAddress() + ":" + port);
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
@Override
public void run()
{
super.run();
try
try(InputStream inputStream = socket.getInputStream())
{
InputStream inputStream = socket.getInputStream();
while (!socket.isClosed())
socket.setKeepAlive(true);
socket.setSoTimeout(keepAliveTimeS * 1000);
while (!socket.isClosed() && !socket.isInputShutdown())
{
CommonMessages.FriendCloudMessage message = CommonMessages.FriendCloudMessage.parseDelimitedFrom(inputStream);
// Any any = Any.parseDelimitedFrom(inputStream);
@ -45,23 +58,39 @@ public class TCPPeerConnection extends PeerConnection
messageReceived(message);
}
}
} catch (Exception ex)
}
catch (SocketTimeoutException ex)
{
System.out.println("TCPPeerConnection (" + getUri() + "): Read timed out, closing connection.");
}
catch (Exception ex)
{
// fuck
ex.printStackTrace(System.err);
}
System.out.println("TCP Connection: connection closed");
System.out.println("TCPPeerConnection (" + getUri() + "): connection closed");
shutdown();
}
@Override
protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException
{
try
{
OutputStream outputStream = socket.getOutputStream();
System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
System.out.println("TCPPeerConnection (" + getUri() + "): Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
message.writeDelimitedTo(outputStream);
outputStream.flush();
}
catch (SocketException ex)
{
// handle this type of exception by closing the connection
System.err.println("TCPPeerConnection (" + getUri() + "): Failed to send, closing connection:" + ex.getMessage());
shutdown();
throw ex; // upper layer needs to know it failed
}
}
@Override
public synchronized void shutdown()
@ -69,9 +98,10 @@ public class TCPPeerConnection extends PeerConnection
try
{
socket.close();
interrupt();
} catch (IOException e)
{
System.err.println("TCPPeerConnection: failed to shut down!");
System.err.println("TCPPeerConnection (" + getUri() + "): failed to shut down!");
e.printStackTrace(System.err);
}
}