From 480b6429f944008bf9c462dc10acba218d44a9b4 Mon Sep 17 00:00:00 2001 From: Nekojimi Date: Thu, 2 Oct 2025 00:13:49 +0100 Subject: [PATCH] Various other changes idk --- .../moe/nekojimi/friendcloud/Controller.java | 13 +++-- .../friendcloud/FileRemoteAccess.java | 44 +++++++++++---- .../friendcloud/ObjectChangeRecord.java | 35 ++++++++++-- .../friendcloud/ObjectChangeTransaction.java | 13 ++--- .../friendcloud/SharedFileManager.java | 2 +- .../network/TCPConnectionBackend.java | 56 ++++++++++++++++--- .../network/requests/ObjectListRequest.java | 15 ++++- .../friendcloud/tasks/FileDownloadTask.java | 38 ++++++++----- .../friendcloud/tasks/JoinNetworkTask.java | 2 +- .../friendcloud/tasks/PullStateTask.java | 30 +++++++--- 10 files changed, 179 insertions(+), 69 deletions(-) diff --git a/src/main/java/moe/nekojimi/friendcloud/Controller.java b/src/main/java/moe/nekojimi/friendcloud/Controller.java index 84a2137..1b78e68 100644 --- a/src/main/java/moe/nekojimi/friendcloud/Controller.java +++ b/src/main/java/moe/nekojimi/friendcloud/Controller.java @@ -154,6 +154,7 @@ public class Controller public synchronized void addChangeRecord(ObjectChangeRecord record) { + System.out.println("Controller: Adding change record " + record ); DataStore.DAO dao = dataStore.getDAOForClass(ObjectChangeRecord.class); dao.update(record); // update the change heads; if any of this change's heads are included in ours, then this change replaces them @@ -165,13 +166,13 @@ public class Controller for (long changeHeadID : record.getChangeHeads()) { ObjectChangeRecord headRecord = dao.get(changeHeadID); - recordIsNewHead = changeHeads.remove(headRecord); + recordIsNewHead |= changeHeads.remove(headRecord); } } if (recordIsNewHead) { changeHeads.add(record); - System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet())); + System.out.println("Controller: Change heads updated: " + changeHeads); } } } @@ -204,7 +205,7 @@ public class Controller System.out.println("Controller: Applying change record " + record); long changeID = localData.getCurrentChangeRecord().getChangeID(); if (!record.getChangeHeads().contains(changeID)) - throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID)); + throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord()); addChangeRecord(record); record.applyToLocalState(); @@ -256,7 +257,7 @@ public class Controller changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll()); Set referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet()); changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID())); - System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet())); + System.out.println("Controller: Determined change heads to be " + changeHeads); } return changeHeads; } @@ -335,7 +336,7 @@ public class Controller String path = newpath.substring(0, lastSlash); if (path.isEmpty()) path = "/"; - try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID())) + try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID())) { transaction.addObjectBeforeChange(fsNode); if (!name.equals(fsNode.getName())) @@ -372,7 +373,7 @@ public class Controller if (path.equals("/") || path.isEmpty()) return null; NetworkFolder ret = null; - try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID())) + try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID())) { NetworkFSNode node = getFSNode(path); if (node != null && !(node instanceof NetworkFolder)) diff --git a/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java b/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java index c0c493a..98c194c 100644 --- a/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java +++ b/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java @@ -5,10 +5,9 @@ import moe.nekojimi.friendcloud.objects.NetworkFile; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.time.Instant; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; public class FileRemoteAccess { @@ -61,20 +60,22 @@ public class FileRemoteAccess else piecesToDownload = neededPieces; + CompletableFuture downloadFuture = null; if (!piecesToDownload.isEmpty()) { System.out.println("FRA: will fetch pieces " + piecesToDownload); - DownloadManager.getInstance().downloadPieces(file, piecesToDownload) - .thenRun(() -> { - preemptiveDownloadInProgress = false; - }); + downloadFuture = DownloadManager.getInstance().downloadPieces(file, piecesToDownload) + .thenRun(() -> + { + preemptiveDownloadInProgress = false; + }); // Main.getInstance().getExecutor().submit(futureTask); } if (!neededPieces.isEmpty()) { - boolean ok = waitForPieceRange(neededPieces, 1000); + boolean ok = waitForPieceRange(neededPieces, 30_000, downloadFuture); if (!ok) { System.err.println("FRA: timed out while waiting for pieces " + neededPieces); @@ -101,13 +102,34 @@ public class FileRemoteAccess return ret; } - private boolean waitForPieceRange(Set pieces, long pieceTimeoutMs) + private boolean waitForPieceRange(Set pieces, long pieceTimeoutMs, CompletableFuture downloadFuture) { for (int pieceIdx : pieces) { - boolean ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs); - if (!ok) - return false; + boolean ok = false; + while (pieceTimeoutMs > 0 && !ok) + { + long waitStartTime = Instant.now().toEpochMilli(); + try + { + ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs); + } + catch (InterruptedException ignored) + { + ok = false; + } + if (!ok) + { + if (downloadFuture != null && (downloadFuture.isCancelled() || downloadFuture.isCancelled())) + { + System.err.println("FRA: Download failed."); + return false; + } + } + long waitEndTime = Instant.now().toEpochMilli(); + long waitTime = waitEndTime - waitStartTime; + pieceTimeoutMs -= waitTime; + } } return true; } diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java index 812a8d9..e27ff11 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java +++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java @@ -55,13 +55,27 @@ public class ObjectChangeRecord implements Storable try { MessageDigest digest = MessageDigest.getInstance("SHA-256"); - return digest.digest(toString().getBytes(StandardCharsets.UTF_8)); + return digest.digest(getCanonicalStringRepresentation().getBytes(StandardCharsets.UTF_8)); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } } + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + ObjectChangeRecord record = (ObjectChangeRecord) o; + return Objects.equals(getChangeID(), record.getChangeID()); + } + + @Override + public int hashCode() + { + return (int) getChangeID(); + } + public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage() { ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder(); @@ -108,7 +122,7 @@ public class ObjectChangeRecord implements Storable } } - public String toString() + public String getCanonicalStringRepresentation() { StringBuilder sb = new StringBuilder(); sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";"); @@ -119,11 +133,21 @@ public class ObjectChangeRecord implements Storable sb.append(";"); for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList()) { - sb.append(change.toString()).append(";"); + sb.append(change.getCanonicalStringRepresentation()).append(";"); } return sb.toString(); } + @Override + public String toString() + { + return "Change{" + Long.toHexString(getChangeID()) + + ", heads=" + changeHeads.stream().map(Long::toHexString).collect(Collectors.toSet()) + + ", time=" + creationTime.toString() + + ", creator=" + creatorPeer.toString() + + "}"; + } + public long getChangeID() { MessageDigest digest = null; @@ -134,7 +158,7 @@ public class ObjectChangeRecord implements Storable { throw new RuntimeException(e); } - String stringVal = toString(); + String stringVal = getCanonicalStringRepresentation(); byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8)); // System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal); return Util.xorBytesToLong(bytes); @@ -195,7 +219,7 @@ public class ObjectChangeRecord implements Storable return null; } - public String toString() + public String getCanonicalStringRepresentation() { StringBuilder sb = new StringBuilder(); sb.append(objectID).append(";"); // The object ID, then ; @@ -213,7 +237,6 @@ public class ObjectChangeRecord implements Storable return sb.toString(); } - public ObjectStatements.ObjectChange.Builder buildObjectChange() { ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder(); diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java index 3e1fe93..ef04689 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java +++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java @@ -1,6 +1,5 @@ package moe.nekojimi.friendcloud; -import moe.nekojimi.friendcloud.network.ConnectionManager; import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.ObjectStatements; @@ -16,22 +15,19 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; public class ObjectChangeTransaction implements AutoCloseable { private final ObjectID creator; - private final ConnectionManager connectionManager; private final Map beforeStates = new HashMap<>(); private static ObjectChangeTransaction currentTransaction = null; private int openCount = 0; private boolean ended = false; - private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator) + private ObjectChangeTransaction(ObjectID creator) { this.creator = creator; - this.connectionManager = connectionManager; System.out.println("ObjectChangeTransaction: opening transaction"); @@ -51,10 +47,10 @@ public class ObjectChangeTransaction implements AutoCloseable } } - public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer) + public static ObjectChangeTransaction startTransaction(ObjectID creatorPeer) { if (currentTransaction == null) - currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer); + currentTransaction = new ObjectChangeTransaction(creatorPeer); currentTransaction.increaseOpenCount(); return currentTransaction; } @@ -95,7 +91,8 @@ public class ObjectChangeTransaction implements AutoCloseable if (changes.isEmpty()) return null; - return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes); + ObjectChangeRecord currentChangeRecord = Main.getInstance().getModel().getLocalData().getCurrentChangeRecord(); + return ObjectChangeRecord.createFromChanges(creator, currentChangeRecord != null ? Set.of(currentChangeRecord.getChangeID()) : Set.of(), changes); } public void commit() diff --git a/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java b/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java index 0dab24e..4ac09ac 100644 --- a/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java +++ b/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java @@ -18,7 +18,7 @@ public class SharedFileManager if (files.isEmpty()) return; Controller controller = Main.getInstance().getModel(); - try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID())) + try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(controller.getLocalPeer().getObjectID())) { List knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE)); diff --git a/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java b/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java index a3c629a..5432d4f 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java @@ -13,11 +13,10 @@ import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.Peer; import java.io.IOException; -import java.net.InetAddress; -import java.net.ServerSocket; -import java.net.Socket; -import java.net.URI; +import java.net.*; import java.util.ArrayList; +import java.util.Enumeration; +import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; @@ -30,22 +29,57 @@ public class TCPConnectionBackend extends ConnectionBackend List getURIs() { List ret = new ArrayList<>(); - ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort())); - ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort())); + InetAddress inetAddress = serverSocket.getInetAddress(); + addAddressToList(inetAddress, ret); + + try + { + Enumeration networkInterfaces = NetworkInterface.getNetworkInterfaces(); + for (Iterator it = networkInterfaces.asIterator(); it.hasNext(); ) + { + NetworkInterface networkInterface = it.next(); + if (networkInterface.isLoopback()) + continue; + if (!networkInterface.isUp()) + continue; + for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses()) + addAddressToList(interfaceAddress.getAddress(), ret); + } + } catch (SocketException e) + { + + } + if (mappedPort != null) { - ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort())); - ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort())); + addAddressToList(mappedPort.getExternalAddress(), ret); } + System.out.println("Local addresses: " + ret); return ret; } + /** + * utility method for processing URIs + */ + private void addAddressToList(InetAddress address, List list) + { + if (address.isAnyLocalAddress()) + return; + String hostAddress = address.getHostAddress(); + if (!hostAddress.isEmpty()) + list.add(makeURI(hostAddress, serverSocket.getLocalPort())); + String hostName = address.getHostName(); + if (!hostName.isEmpty() && !hostName.equals(hostAddress)) + list.add(makeURI(hostName, serverSocket.getLocalPort())); + } + public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException { super("TCP Listen Thread", "tcp", connectionManager); serverSocket = new ServerSocket(port); -// setupIGP(port); + if (!Main.getInstance().getArgs().isNoUpnp()) + setupIGP(port); } private void setupIGP(int port) @@ -60,6 +94,9 @@ public class TCPConnectionBackend extends ConnectionBackend // Discover port forwarding devices and take the first one found System.out.println("Discovering port mappers..."); List mappers = PortMapperFactory.discover(networkBus, processBus); + + if (mappers.isEmpty()) + return; ; PortMapper mapper = mappers.getFirst(); System.out.println("Got mapper " + mapper + ", mapping port..."); @@ -102,6 +139,7 @@ public class TCPConnectionBackend extends ConnectionBackend protected TCPPeerConnection getConnection() throws IOException { Socket socket = serverSocket.accept(); + System.out.println("TCPConnectionBackend: Received connection from " + socket.getRemoteSocketAddress()); return new TCPPeerConnection(socket); } diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java index d8ed429..ddcc04b 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java @@ -11,7 +11,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; -public class ObjectListRequest extends Request> +public class ObjectListRequest extends Request { private final Set types; @@ -39,7 +39,10 @@ public class ObjectListRequest extends Request objects; + public long changeID; + } + } diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java index d0ce01c..5b06199 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java @@ -20,7 +20,7 @@ public class FileDownloadTask implements RunnableFuture private final NetworkFile file; private final ConnectionManager connectionManager; - private final long timeoutPerPieceMs = 1_000; + private final long timeoutPerPieceMs = 30_000; private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128; private final SortedSet missingPieceIndices = new TreeSet<>(); @@ -65,23 +65,28 @@ public class FileDownloadTask implements RunnableFuture Peer selfPeer = Main.getInstance().getModel().getLocalPeer(); while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done) { - System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces."); + System.out.println("FileDownloadTask: Need to get " + missingPieceIndices.size() + " missing pieces."); // Map fileStates = file.getFileStates(); // determine what nodes we can connect to - List connections = new ArrayList<>(); - for (Peer peer : file.getPeersWithCopy()) - { - if (peer == selfPeer) - continue; // yeah that's us dipshit - PeerConnection connection = connectionManager.getNodeConnection(peer); - if (connection != null) - { - System.out.println("FileDownloadTask: Will download from " + peer.getNodeName()); - connections.add(connection); - } - } + List connections = new ArrayList<>(file.getPeersWithCopy().stream() + .filter(peer -> peer != selfPeer) + .map(connectionManager::getNodeConnectionAsync) + .map(CompletableFuture::join) + .flatMap(Optional::stream) + .toList()); +// for (Peer peer : file.getPeersWithCopy()) +// { +// if (peer == selfPeer) +// continue; // yeah that's us dipshit +// PeerConnection connection = connectionManager.getNodeConnection(peer); +// if (connection != null) +// { +// System.out.println("FileDownloadTask: Will download from " + peer.getNodeName()); +// connections.add(connection); +// } +// } // connectionLine = "Connected to " + connections.size() + " peers."; // notification.setBody(connectionLine + "\n" + progressLine); @@ -90,6 +95,7 @@ public class FileDownloadTask implements RunnableFuture // shuffle the connections list Collections.shuffle(connections); + if (connections.isEmpty()) { System.err.println("FileDownloadTask: No peers have the file, download failed!"); @@ -98,6 +104,8 @@ public class FileDownloadTask implements RunnableFuture break; } + System.out.println("FileDownloadTask: Will download from " + connections.size() + " peers: " + connections.stream().map(PeerConnection::getUri).toList()); + // find a continuous run of pieces to download // TODO: allow for runs with regular gaps (e.g. every 2) to account for previous failed download attempts int runStart = -1; @@ -156,7 +164,7 @@ public class FileDownloadTask implements RunnableFuture if (file.getDownloadPercentage() >= 100.0) { - try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID())) + try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(selfPeer.getObjectID())) { transaction.addObjectBeforeChange(file); file.addPeerWithCopy(selfPeer); diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java index 4427279..4da1e28 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java @@ -53,7 +53,7 @@ public class JoinNetworkTask implements Runnable throw new RuntimeException(e); } // create our local peer object - try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID)) + try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(peerID)) { // create and submit our Peer object if it doesn't exist Peer selfPeer = controller.getLocalData().getLocalPeer(); diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java index 097dd7d..053dbc3 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java @@ -1,6 +1,7 @@ package moe.nekojimi.friendcloud.tasks; import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.ObjectChangeRecord; import moe.nekojimi.friendcloud.Util; import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest; @@ -31,6 +32,8 @@ public class PullStateTask implements Runnable System.out.println("PullStateTask: Pulling state from peers..."); Set connections = new HashSet<>(); + // TODO: pull changes first, then select the peer(s) with the latest changes to pull state from + for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers()) { String[] split = knownPeerAddress.split(":"); @@ -50,7 +53,7 @@ public class PullStateTask implements Runnable { connections.add(nodeConnection); } - } catch (URISyntaxException | IOException e) + } catch (URISyntaxException e) { throw new RuntimeException(e); } @@ -80,24 +83,33 @@ public class PullStateTask implements Runnable for (PeerConnection connection : connections) { - futures.add(connection.makeRequest(new ObjectListRequest(Set.of( + CompletableFuture changeFuture = connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords -> + { + System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records."); + Main.getInstance().getModel().addChangeRecords(objectChangeRecords); + }); + + CompletableFuture objectFuture = connection.makeRequest(new ObjectListRequest(Set.of( ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER, - ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects -> + ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenApply(objectListRequestResult -> { - + List networkObjects = objectListRequestResult.objects; System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects."); for (NetworkObject object : networkObjects) { Main.getInstance().getModel().addNetworkObject(object); } - })); + return objectListRequestResult.changeID; + }); - futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords -> + CompletableFuture bothFuture = changeFuture.thenAcceptBoth(objectFuture, (unused, changeID) -> { - System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records."); - Main.getInstance().getModel().addChangeRecords(objectChangeRecords); - })); + ObjectChangeRecord record = Main.getInstance().getModel().getChangeRecord(changeID); + System.out.println("PullStateTask: updating current change ID to " + record); + Main.getInstance().getModel().setCurrentChangeRecord(record); + }); + futures.add(bothFuture); } Util.collectFutures(futures).join();