From 1131d47dc0a62d2af2c19f1c5fd62c5dcbdac8aa Mon Sep 17 00:00:00 2001 From: Nekojimi Date: Thu, 2 Oct 2025 00:08:51 +0100 Subject: [PATCH] PeerConnection: display URI during debug prints. Also change error handling. --- .../friendcloud/network/PeerConnection.java | 101 ++++++++++++------ .../network/TCPPeerConnection.java | 58 +++++++--- 2 files changed, 111 insertions(+), 48 deletions(-) diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java index e3765f0..28e4f53 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java @@ -2,6 +2,7 @@ package moe.nekojimi.friendcloud.network; import com.google.protobuf.*; +import moe.nekojimi.friendcloud.Controller; import moe.nekojimi.friendcloud.FilePieceAccess; import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.ObjectChangeRecord; @@ -20,6 +21,7 @@ import java.io.IOException; import java.net.URI; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; public abstract class PeerConnection extends Thread { @@ -27,22 +29,18 @@ public abstract class PeerConnection extends Thread private ObjectID peerID = new ObjectID(0); private long nextMessageId = 1; private final URI uri; - private long artificalDelayMs = 0; + private long artificalDelayMs; private final Map> messageHandlers = new HashMap<>(); - public PeerConnection() - { - this(null); - } - - public PeerConnection(URI uri) + public PeerConnection(@NotNull URI uri) { this.uri = uri; installDefaultMessageHandlers(); + artificalDelayMs = Main.getInstance().getArgs().getArtificialLagMs(); } - public PeerConnection(URI uri, @NotNull ObjectID peerID) + public PeerConnection(@NotNull URI uri, @NotNull ObjectID peerID) { this(uri); this.peerID = peerID; @@ -58,19 +56,18 @@ public abstract class PeerConnection extends Thread public synchronized CompletableFuture makeRequest(Request request) { if (!isAlive()) - throw new IllegalStateException("Request made to PeerConnection that isn't running!"); + throw new IllegalStateException("PeerConnection (" + getUri() + "): Request made to PeerConnection that isn't running!"); try { Message message = request.buildMessage(); CommonMessages.FriendCloudMessage wrappedMessage = wrapMessage(message); - pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request); sendMessage(wrappedMessage); - + pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request); return request.getFuture(); } catch (Exception e) { - System.err.println("Request failed!"); + System.err.println("PeerConnection (" + getUri() + "): Request failed!"); e.printStackTrace(System.err); return CompletableFuture.failedFuture(e); } @@ -107,7 +104,7 @@ public abstract class PeerConnection extends Thread private void replyWithError(CommonMessages.Error error, CommonMessages.MessageHeader replyHeader) throws IOException { - System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId()); + System.err.println("PeerConnection (" + getUri() + "): Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId()); CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build(); sendMessage(wrapMessage(errorMessage, replyHeader)); } @@ -118,7 +115,7 @@ public abstract class PeerConnection extends Thread Any body = message.getBody(); long replyToMessageId = header.getReplyToMessageId(); ObjectID senderID = new ObjectID(header.getSenderId()); - System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId); + System.out.println("PeerConnection (" + getUri() + "): Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId); try { try @@ -127,7 +124,7 @@ public abstract class PeerConnection extends Thread { try { - System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms..."); + System.err.println("WARNING: artificial lag activated! Waiting " + artificalDelayMs + "ms..."); Thread.sleep(artificalDelayMs); } catch (InterruptedException e) { @@ -137,9 +134,15 @@ public abstract class PeerConnection extends Thread if (!senderID.isNull()) { - if (peerID.isNull()) + Peer localPeer = Main.getInstance().getModel().getLocalPeer(); + if (localPeer != null && Objects.equals(senderID, localPeer.getObjectID())) { - System.out.println("PeerConnection: Identified sender as " + senderID); + System.err.println("PeerConnection (" + getUri() + "): Connected to ourselves, terminating connection!"); + shutdown(); + } + else if (peerID.isNull()) + { + System.out.println("PeerConnection (" + getUri() + "): Identified sender as " + senderID); peerID = senderID; } else @@ -184,7 +187,7 @@ public abstract class PeerConnection extends Thread private void handleErrorToUnsolicitedMessage(CommonMessages.MessageHeader header, CommonMessages.ErrorMessage body) { - throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name()); + throw new RuntimeException("PeerConnection (" + getUri() + "): Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name()); } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -200,7 +203,7 @@ public abstract class PeerConnection extends Thread } else { - System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!"); + System.err.println("PeerConnection (" + getUri() + "): don't have a MessageHandler for message type " + typeUrl + "!"); replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header); } } @@ -208,7 +211,7 @@ public abstract class PeerConnection extends Thread private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException { long replyToMessageId = header.getReplyToMessageId(); - System.out.println("Received reply to message ID " + replyToMessageId); + System.out.println("PeerConnection (" + getUri() + "): Received reply to message ID " + replyToMessageId); Request request = pendingRequests.get(replyToMessageId); boolean doneWithRequest = request.handleReply(body); if (doneWithRequest) @@ -236,14 +239,17 @@ public abstract class PeerConnection extends Thread private void installDefaultMessageHandlers() { + Controller controller = Main.getInstance().getModel(); installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class) { @Override protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException { - List objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList())); + List objects = controller.listObjects(new HashSet<>(message.getTypesList())); ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder(); + ObjectChangeRecord currentChangeRecord = controller.getLocalData().getCurrentChangeRecord(); + objectList.setChangeHead(currentChangeRecord == null ? 0L : currentChangeRecord.getChangeID()); for (NetworkObject object : objects) { objectList.addStates(object.buildObjectState()); @@ -263,7 +269,7 @@ public abstract class PeerConnection extends Thread replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header); } - NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId())); + NetworkFile networkFile = controller.getObject(new ObjectID(message.getFileId())); if (networkFile == null) { replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header); @@ -273,7 +279,7 @@ public abstract class PeerConnection extends Thread { int startIndex = message.getStartPieceIndex(); int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1; - System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex); + System.out.println("PeerConnection (" + getUri() + "): Been asked for pieces from " + startIndex + " to " + endIndex); List indices = new ArrayList<>(); for (int index = startIndex; index <= endIndex; index += message.getPieceMod()) { @@ -286,7 +292,7 @@ public abstract class PeerConnection extends Thread byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index)); if (buffer != null) { - System.out.println("Replying to file piece request with piece " + index); + System.out.println("PeerConnection (" + getUri() + "): Replying to file piece request with piece " + index); PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder() .setPieceIndex(Math.toIntExact(index)) .setFileId(networkFile.getObjectID().toLong()) @@ -296,7 +302,7 @@ public abstract class PeerConnection extends Thread } else { - System.err.println("Don't have requested piece " + index + "!"); + System.err.println("PeerConnection (" + getUri() + "): Don't have requested piece " + index + "!"); replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header); break; } @@ -310,8 +316,8 @@ public abstract class PeerConnection extends Thread protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException { List changesSinceList = message.getChangesSinceList(); - System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString)); - Set changes = Main.getInstance().getModel().findChangesSince(changesSinceList); + System.out.println("PeerConnection (" + getUri() + "): Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString).collect(Collectors.toSet())); + Set changes = controller.findChangesSince(changesSinceList); if (changes == null) { replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header); @@ -323,7 +329,7 @@ public abstract class PeerConnection extends Thread { reply.addChangeMessages(change.buildObjectChangeMessage()); } - System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes"); + System.out.println("PeerConnection (" + getUri() + "): Replying with " + reply.getChangeMessagesCount() + " changes"); sendMessage(wrapMessage(reply.build(), header)); } } @@ -334,19 +340,26 @@ public abstract class PeerConnection extends Thread protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message) { ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message); - Main.getInstance().getModel().applyChangeRecord(record); + controller.applyChangeRecord(record); } }); installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class) { @Override - protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message) + protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message) throws IOException { - List remoteChangeHeads = message.getCurrentChangeHeadsList(); + Peer peer = controller.getObject(peerID); + if (peer != null) + { + peer.setLastKnownChangeID(message.getCurrentChange()); + controller.objectChanged(peer); + } + + Set remoteChangeHeads = new HashSet<>(message.getCurrentChangeHeadsList()); boolean potentialNewChanges = false; for (long remoteChangeHead : remoteChangeHeads) { - boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead); + boolean exists = controller.getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead); if (!exists) { potentialNewChanges = true; @@ -355,8 +368,28 @@ public abstract class PeerConnection extends Thread } if (potentialNewChanges) { - PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID))); - Main.getInstance().getExecutor().submit(task); + if (peer != null) + { + PullChangesTask task = new PullChangesTask(Set.of(peer)); + Main.getInstance().getExecutor().submit(task); + } + } + else + { + Set changeHeadIDs = controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()); + // if there's no new changes then we know all the changes that the remote has + // so if they're not the same as our latest changes then they don't know about something we do + // so send them a checkin + boolean remoteOutOfDate = !changeHeadIDs.equals(remoteChangeHeads); + + if (remoteOutOfDate) + { + CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder() + .addAllCurrentChangeHeads(changeHeadIDs) + + .build(); + sendMessage(wrapMessage(checkInMessage)); + } } } }); diff --git a/src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java index 04a3f0c..97ab03f 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java @@ -2,12 +2,12 @@ package moe.nekojimi.friendcloud.network; import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.CommonMessages; +import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.net.Socket; -import java.net.URI; +import java.net.*; public class TCPPeerConnection extends PeerConnection { @@ -18,24 +18,37 @@ public class TCPPeerConnection extends PeerConnection { super(tcpURL, peer); socket = new Socket(tcpURL.getHost(), tcpURL.getPort()); - System.out.println("TCP Connection: connected to " + tcpURL + " OK!"); + System.out.println("TCPPeerConnection: connected to " + tcpURL + " OK!"); } public TCPPeerConnection(Socket openSocket) { - super(); + super(getSocketURI(openSocket.getInetAddress(), openSocket.getPort())); socket = openSocket; } + private static URI getSocketURI(@NotNull InetAddress address, int port) + { + try + { + return new URI("tcp://" + address.getHostAddress() + ":" + port); + } catch (URISyntaxException e) + { + throw new RuntimeException(e); + } + } + @Override public void run() { super.run(); - try + try(InputStream inputStream = socket.getInputStream()) { - InputStream inputStream = socket.getInputStream(); - while (!socket.isClosed()) + socket.setKeepAlive(true); + socket.setSoTimeout(keepAliveTimeS * 1000); + + while (!socket.isClosed() && !socket.isInputShutdown()) { CommonMessages.FriendCloudMessage message = CommonMessages.FriendCloudMessage.parseDelimitedFrom(inputStream); // Any any = Any.parseDelimitedFrom(inputStream); @@ -45,22 +58,38 @@ public class TCPPeerConnection extends PeerConnection messageReceived(message); } } - } catch (Exception ex) + } + catch (SocketTimeoutException ex) + { + System.out.println("TCPPeerConnection (" + getUri() + "): Read timed out, closing connection."); + } + catch (Exception ex) { // fuck ex.printStackTrace(System.err); } - System.out.println("TCP Connection: connection closed"); + System.out.println("TCPPeerConnection (" + getUri() + "): connection closed"); + shutdown(); } @Override protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException { - OutputStream outputStream = socket.getOutputStream(); - System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl()); - message.writeDelimitedTo(outputStream); - outputStream.flush(); + try + { + OutputStream outputStream = socket.getOutputStream(); + System.out.println("TCPPeerConnection (" + getUri() + "): Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl()); + message.writeDelimitedTo(outputStream); + outputStream.flush(); + } + catch (SocketException ex) + { + // handle this type of exception by closing the connection + System.err.println("TCPPeerConnection (" + getUri() + "): Failed to send, closing connection:" + ex.getMessage()); + shutdown(); + throw ex; // upper layer needs to know it failed + } } @Override @@ -69,9 +98,10 @@ public class TCPPeerConnection extends PeerConnection try { socket.close(); + interrupt(); } catch (IOException e) { - System.err.println("TCPPeerConnection: failed to shut down!"); + System.err.println("TCPPeerConnection (" + getUri() + "): failed to shut down!"); e.printStackTrace(System.err); } }