diff --git a/src/main/java/moe/nekojimi/friendcloud/DownloadManager.java b/src/main/java/moe/nekojimi/friendcloud/DownloadManager.java index 6f53547..6250f90 100644 --- a/src/main/java/moe/nekojimi/friendcloud/DownloadManager.java +++ b/src/main/java/moe/nekojimi/friendcloud/DownloadManager.java @@ -1,11 +1,10 @@ package moe.nekojimi.friendcloud; import moe.nekojimi.friendcloud.objects.NetworkFile; +import moe.nekojimi.friendcloud.tasks.FileDownloadTask; -import java.io.File; import java.util.*; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.FutureTask; public class DownloadManager { diff --git a/src/main/java/moe/nekojimi/friendcloud/Main.java b/src/main/java/moe/nekojimi/friendcloud/Main.java index 6b7f723..bf99255 100644 --- a/src/main/java/moe/nekojimi/friendcloud/Main.java +++ b/src/main/java/moe/nekojimi/friendcloud/Main.java @@ -21,9 +21,7 @@ import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.PeerFileState; import moe.nekojimi.friendcloud.protos.ObjectStatements; import org.slf4j.simple.SimpleLogger; -import org.xml.sax.SAXException; -import javax.xml.parsers.ParserConfigurationException; import java.io.File; import java.io.IOException; import java.net.*; @@ -118,8 +116,8 @@ public class Main connectionManager.start(); String hostname = Hostname.getHostname(); - Model.getInstance().getSelfNode().setSystemName(hostname); - Model.getInstance().getSelfNode().setUserName(System.getProperty("user.name") + "-" + tcpPort); + Model.getInstance().getSelfPeer().setSystemName(hostname); + Model.getInstance().getSelfPeer().setUserName(System.getProperty("user.name") + "-" + tcpPort); addHostAddress(InetAddress.getLocalHost()); /* @@ -146,7 +144,7 @@ public class Main networkFile.updateFromLocalFile(file); PeerFileState peerFileState = (PeerFileState) Model.getInstance().createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE); - peerFileState.setNode(Model.getInstance().getSelfNode()); + peerFileState.setNode(Model.getInstance().getSelfPeer()); peerFileState.setFile(networkFile); peerFileState.setProgress(100); } @@ -204,7 +202,7 @@ public class Main private void addHostAddress(InetAddress address) { String host = address.getCanonicalHostName(); - Peer selfNode = Model.getInstance().getSelfNode(); + Peer selfNode = Model.getInstance().getSelfPeer(); try { URI uri = new URI("tcp", null, host, tcpPort, null, null, null); diff --git a/src/main/java/moe/nekojimi/friendcloud/Model.java b/src/main/java/moe/nekojimi/friendcloud/Model.java index 6089c89..a77caaa 100644 --- a/src/main/java/moe/nekojimi/friendcloud/Model.java +++ b/src/main/java/moe/nekojimi/friendcloud/Model.java @@ -19,7 +19,7 @@ public class Model private final int systemID; private Peer selfPeer = null; - private ObjectChangeRecord changeHead; + private ObjectChangeRecord currentChange; private final Map changeRecords = new HashMap<>(); private Model() @@ -28,10 +28,15 @@ public class Model systemID = ran.nextInt() & 0x00FFFFFF; } - public synchronized Peer getSelfNode() + public void setSelfPeer(Peer selfPeer) { - if (selfPeer == null) - selfPeer = (Peer) createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER); + this.selfPeer = selfPeer; + } + + public synchronized Peer getSelfPeer() + { +// if (selfPeer == null) +// selfPeer = (Peer) createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER); return selfPeer; } // private Map nodes = new HashMap<>(); @@ -127,8 +132,8 @@ public class Model public void applyChangeRecord(ObjectChangeRecord record) { - if (!record.getChangeHeads().contains(changeHead)) - throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + changeHead.getChangeID()); + if (!record.getChangeHeads().contains(currentChange)) + throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + currentChange.getChangeID()); if (!changeRecords.containsKey(record.getChangeID())) addChangeRecord(record); @@ -136,4 +141,27 @@ public class Model // if (record == null) // throw new IllegalArgumentException("Cannot apply unknown change!"); } + + public Set getChangeHeads() + { + // stupid algorithm - start with all of the changes, then remove the ones that are referenced by something + // TODO: better algorithm + Set ret = new HashSet<>(changeRecords.values()); + for (ObjectChangeRecord record : changeRecords.values()) + { + + } + } + + public Set listOtherPeers() + { + Set ret = new HashSet<>(); + for (NetworkObject.ObjectID peerID : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER))) + { + Peer peer = (Peer) getObject(peerID); + if (peer != getSelfPeer()) + ret.add(peer); + } + return ret; + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java index d5f6b6d..637567b 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java +++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java @@ -1,39 +1,170 @@ package moe.nekojimi.friendcloud; +import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.protos.ObjectStatements; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; public class ObjectChangeRecord { - private final long changeID; +// private final long changeID; - private ObjectChangeRecord(long changeID) + private final NetworkObject.ObjectID creatorPeer; + private final Set changeHeads = new HashSet<>(); + private final Set changes = new HashSet<>(); + + public ObjectChangeRecord(NetworkObject.ObjectID creatorPeer) { - this.changeID = changeID; + this.creatorPeer = creatorPeer; } - private final Set changeHeads = new HashSet<>(); - - public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChange objectChange) + public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage) { - throw new UnsupportedOperationException("NYI!"); + ObjectChangeRecord record = new ObjectChangeRecord(new NetworkObject.ObjectID(0)); // TODO: decode creator + record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList()); + for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList()) + { + record.changes.add(Change.createFromObjectChange(objectChange)); + } + long calculatedID = record.getChangeID(); + long specifiedID = objectChangeMessage.getChangeId(); + if (calculatedID != specifiedID) + { + System.err.println("WARNING: didn't decode change ID correctly!"); + } + return record; } - public static ObjectChangeRecord createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after) + public static ObjectChangeRecord createFromChanges(NetworkObject.ObjectID creator, Set changes) { - throw new UnsupportedOperationException("NYI!"); + ObjectChangeRecord record = new ObjectChangeRecord(creator); + record.changes.addAll(changes); + return record; + } + + public byte[] getHash() + { + try + { + MessageDigest digest = MessageDigest.getInstance("SHA-256"); + return digest.digest(toString().getBytes(StandardCharsets.UTF_8)); + } catch (NoSuchAlgorithmException e) + { + throw new RuntimeException(e); + } + } + + public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage() + { + ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder(); + builder.setChangeId(getChangeID()); + builder.addAllChangeHeads(changeHeads); + for (Change change : changes) + { + builder.addChanges(change.buildObjectChange()); + } + return builder; + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + for (long changeHeadId: changeHeads) + { + sb.append(changeHeadId).append(","); + } + sb.append(";"); + for (Change change: changes) + { + sb.append(change.toString()).append(";"); + } + return sb.toString(); } public long getChangeID() { - return changeID; + MessageDigest digest = null; + try + { + digest = MessageDigest.getInstance("SHA-1"); + } catch (NoSuchAlgorithmException e) + { + throw new RuntimeException(e); + } + byte[] bytes = digest.digest(toString().getBytes(StandardCharsets.UTF_8)); + return Util.xorBytesToLong(bytes); } - public Set getChangeHeads() + public NetworkObject.ObjectID getCreatorPeer() { - return Collections.unmodifiableSet(changeHeads); + return creatorPeer; + } + + public static class Change + { + private final NetworkObject.ObjectID objectID; + private final Map beforeValues; + private final Map afterValues; + + public Change(NetworkObject.ObjectID objectID, Map before, Map after) + { + this.objectID = objectID; + this.beforeValues = before; + this.afterValues = after; + } + + public static Change createFromObjectChange(ObjectStatements.ObjectChange change) + { + return new Change(new NetworkObject.ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap()); + } + + public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after) + { + Map beforeValues = new HashMap<>(); + Map afterValues = new HashMap<>(); + for (String key: after.getValuesMap().keySet()) + { + String beforeValue = before.getValuesOrDefault(key, null); + String afterValue = after.getValuesOrDefault(key, null); + if (!afterValue.equals(beforeValue)) + { + beforeValues.put(key,beforeValue); + afterValues.put(key,afterValue); + } + } + if (!afterValues.isEmpty()) + { + return new Change(new NetworkObject.ObjectID(before.getObjectId()), beforeValues, afterValues); + } + return null; + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append(objectID.toLong()).append(";"); // The object ID, then ; + // now all key-value pairs in alphabetical order + List keys = new ArrayList<>(beforeValues.keySet()); + Collections.sort(keys); + for (String key : keys) + { + sb.append(key).append(":").append(afterValues.get(key)); + } + sb.append(";"); + return sb.toString(); + } + + + public ObjectStatements.ObjectChange.Builder buildObjectChange() + { + ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder(); + builder.putAllBefore(beforeValues); + builder.putAllAfter(afterValues); + builder.setObjectId(objectID.toLong()); + return builder; + } } } diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java new file mode 100644 index 0000000..fb7596f --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java @@ -0,0 +1,73 @@ +package moe.nekojimi.friendcloud; + +import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.protos.ObjectStatements; +import moe.nekojimi.friendcloud.tasks.PropagateMessageTask; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class ObjectChangeTransaction implements Closeable +{ + private final NetworkObject.ObjectID creator; + private final ConnectionManager connectionManager; + private final Map beforeStates = new HashMap<>(); + + private boolean ended = false; + + ObjectChangeTransaction(ConnectionManager connectionManager, NetworkObject.ObjectID creator) + { + this.creator = creator; + this.connectionManager = connectionManager; + } + + public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, NetworkObject.ObjectID creatorPeer, NetworkObject.ObjectID... objects) + { + ObjectChangeTransaction builder = new ObjectChangeTransaction(connectionManager, creatorPeer); + for (NetworkObject.ObjectID id : objects) + { + builder.addObjectBeforeChange(id); + } + return builder; + } + + public ObjectChangeTransaction addObjectBeforeChange(NetworkObject.ObjectID id) + { + NetworkObject object = Model.getInstance().getObject(id); + if (object != null) + beforeStates.put(id, object.buildObjectState().build()); + return this; + } + + public ObjectChangeRecord endTransaction() + { + if (ended) + throw new IllegalStateException("Transaction already ended!"); + ended = true; + Set changes = new HashSet<>(); + + for (Map.Entry entry : beforeStates.entrySet()) + { + ObjectStatements.ObjectState afterState = Model.getInstance().getObject(entry.getKey()).buildObjectState().build(); + ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState); + changes.add(change); + } + + return ObjectChangeRecord.createFromChanges(creator, changes); + } + + @Override + public void close() throws IOException + { + // end the transaction and get the change object + ObjectChangeRecord objectChangeRecord = endTransaction(); + // add the new change to the model + Model.getInstance().addChangeRecord(objectChangeRecord); + // create a task to propagate the change to other peers + Main.getInstance().getExecutor().submit(new PropagateMessageTask(objectChangeRecord.buildObjectChangeMessage().build())); + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/Util.java b/src/main/java/moe/nekojimi/friendcloud/Util.java new file mode 100644 index 0000000..5e8a9ab --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/Util.java @@ -0,0 +1,19 @@ +package moe.nekojimi.friendcloud; + +import java.nio.ByteBuffer; +import java.nio.LongBuffer; + +public class Util +{ + public static long xorBytesToLong(byte[] bytes) + { + ByteBuffer buf = ByteBuffer.wrap(bytes); + LongBuffer longs = buf.asLongBuffer(); + long ret = 0xBEEFCAFEF00DBABEL; + for (long l: longs.array()) + { + ret = ret ^ l; + } + return ret; + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java index 8a7000e..bdc5d04 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java @@ -75,13 +75,18 @@ public abstract class PeerConnection extends Thread } } + public synchronized void sendUnsolicitedMessage(Message message) throws IOException + { + sendMessage(wrapMessage(message)); + } + protected abstract void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException; private CommonMessages.FriendCloudMessage wrapMessage(Message message, CommonMessages.MessageHeader inReplyTo) { CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder() .setMessageId(nextMessageId) - .setSenderId(Model.getInstance().getSelfNode().getObjectID().toLong()); + .setSenderId(Model.getInstance().getSelfPeer().getObjectID().toLong()); if (inReplyTo != null) headerBuilder.setReplyToMessageId(inReplyTo.getMessageId()); @@ -102,37 +107,43 @@ public abstract class PeerConnection extends Thread { System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId()); CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build(); - sendMessage(wrapMessage(errorMessage,replyHeader)); + sendMessage(wrapMessage(errorMessage, replyHeader)); } protected void messageReceived(@org.jetbrains.annotations.NotNull CommonMessages.FriendCloudMessage message) { - if (artificalDelayMs > 0) - { - try - { - System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms..."); - Thread.sleep(artificalDelayMs); - } catch (InterruptedException e) - { - // well never mind then - } - } - CommonMessages.MessageHeader header = message.getHeader(); - - NetworkObject.ObjectID senderID = new NetworkObject.ObjectID(header.getSenderId()); - peer = (Peer) Model.getInstance().getOrCreateObject(senderID); - - Any body = message.getBody(); - - long replyToMessageId = header.getReplyToMessageId(); - System.out.println("Received message! type=" + body.getTypeUrl() + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId ); - try { try { + if (artificalDelayMs > 0) + { + try + { + System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms..."); + Thread.sleep(artificalDelayMs); + } catch (InterruptedException e) + { + // well never mind then + } + } + + NetworkObject.ObjectID senderID = new NetworkObject.ObjectID(header.getSenderId()); + if (peer == null) + peer = (Peer) Model.getInstance().getOrCreateObject(senderID); + else + { + if (!senderID.equals(peer.getObjectID())) + throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU); + } + + Any body = message.getBody(); + + long replyToMessageId = header.getReplyToMessageId(); + System.out.println("Received message! type=" + body.getTypeUrl() + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId); + + if (replyToMessageId != 0) { if (pendingRequests.containsKey(replyToMessageId)) @@ -146,13 +157,11 @@ public abstract class PeerConnection extends Thread { handleUnsolicitedMessage(header, body); } - } - catch (ReplyWithErrorException ex) + } catch (ReplyWithErrorException ex) { ex.printStackTrace(System.err); replyWithError(ex.getError(), header); - } - catch (IllegalArgumentException ex) + } catch (IllegalArgumentException ex) { ex.printStackTrace(System.err); replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header); diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectChangeRequest.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectChangeRequest.java new file mode 100644 index 0000000..5135b66 --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectChangeRequest.java @@ -0,0 +1,43 @@ +package moe.nekojimi.friendcloud.network.requests; + +import com.google.protobuf.Any; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import moe.nekojimi.friendcloud.Model; +import moe.nekojimi.friendcloud.ObjectChangeRecord; +import moe.nekojimi.friendcloud.protos.ObjectStatements; + +import java.util.Set; + +public class ObjectChangeRequest extends Request> +{ + private final Set changesSinceIDs; + + public ObjectChangeRequest(Set changesSinceIDs) + { + this.changesSinceIDs = changesSinceIDs; + } + + @Override + public ObjectStatements.ObjectChangeRequest buildMessage() + { + return ObjectStatements.ObjectChangeRequest.newBuilder().addAllChangesSince(changesSinceIDs).build(); + } + + @Override + public boolean handleReply(Any reply) throws InvalidProtocolBufferException + { + if (!super.handleReply(reply)) + return false; + + if (reply.is(ObjectStatements.ObjectChangeMessage.class)) + { + ObjectStatements.ObjectChangeMessage objectChangeMessage = reply.unpack(ObjectStatements.ObjectChangeMessage.class); + ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(objectChangeMessage); + Model.getInstance().applyChangeRecord(objectChangeRecord); + return true; + } + + return false; + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java index 8bf14c2..c7e8116 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java @@ -24,7 +24,7 @@ public abstract class NetworkFSNode extends NetworkObject { long parentID = Long.parseLong(state.getValuesOrThrow("parent")); if (parentID != 0) - parent = (NetworkFolder) Model.getInstance().getOrCreateObject(new ObjectID(parentID)); + parent = (NetworkFolder) Model.getInstance().getObject(new ObjectID(parentID)); else parent = null; } diff --git a/src/main/java/moe/nekojimi/friendcloud/FileDownloadTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java similarity index 98% rename from src/main/java/moe/nekojimi/friendcloud/FileDownloadTask.java rename to src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java index 8cf706a..a54eeeb 100644 --- a/src/main/java/moe/nekojimi/friendcloud/FileDownloadTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java @@ -1,6 +1,7 @@ -package moe.nekojimi.friendcloud; +package moe.nekojimi.friendcloud.tasks; +import moe.nekojimi.friendcloud.ConnectionManager; import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest; import moe.nekojimi.friendcloud.objects.NetworkFile; diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java new file mode 100644 index 0000000..857653a --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java @@ -0,0 +1,39 @@ +package moe.nekojimi.friendcloud.tasks; + +import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.Model; +import moe.nekojimi.friendcloud.ObjectChangeTransaction; +import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.objects.Peer; +import moe.nekojimi.friendcloud.protos.ObjectStatements; + +import java.io.IOException; + +public class JoinNetworkTask implements Runnable +{ + + @Override + public void run() + { + // generate new peer ID + NetworkObject.ObjectID peerID = null; + try (ObjectChangeTransaction builder = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), peerID)) + { + Peer selfPeer = Model.getInstance().getSelfPeer(); + if (selfPeer != null) + peerID = selfPeer.getObjectID(); + else + peerID = Model.getInstance().getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER); + + // synchronise with the network + SyncWithNetworkTask syncWithNetworkTask = new SyncWithNetworkTask(); + syncWithNetworkTask.run(); + + + + } catch (IOException e) + { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java new file mode 100644 index 0000000..1253510 --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java @@ -0,0 +1,39 @@ +package moe.nekojimi.friendcloud.tasks; + +import com.google.protobuf.Message; +import moe.nekojimi.friendcloud.ConnectionManager; +import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.Model; +import moe.nekojimi.friendcloud.network.PeerConnection; +import moe.nekojimi.friendcloud.objects.Peer; +import moe.nekojimi.friendcloud.protos.CommonMessages; + +import java.io.IOException; + +public class PropagateMessageTask implements Runnable +{ + private final Message message; + + public PropagateMessageTask(Message message) + { + this.message = message; + } + + @Override + public void run() + { + ConnectionManager connectionManager = Main.getInstance().getConnectionManager(); + for (Peer peer: Model.getInstance().listOtherPeers()) + { + try + { + PeerConnection connection = connectionManager.getNodeConnection(peer); + if (connection != null) + connection.sendUnsolicitedMessage(message); + } catch (IOException e) + { + throw new RuntimeException(e); + } + } + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java new file mode 100644 index 0000000..ff5d66f --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java @@ -0,0 +1,47 @@ +package moe.nekojimi.friendcloud.tasks; + +import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.Model; +import moe.nekojimi.friendcloud.ObjectChangeRecord; +import moe.nekojimi.friendcloud.network.PeerConnection; +import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest; +import moe.nekojimi.friendcloud.objects.Peer; +import moe.nekojimi.friendcloud.protos.ObjectStatements; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class SyncWithNetworkTask implements Runnable +{ + + @Override + public void run() + { + // for each other peer: + for (Peer peer : Model.getInstance().listOtherPeers()) + { + // open a connection + try + { + PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer); + // send a ObjectChangeRequest + ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Model.getInstance().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet())); + CompletableFuture> future = connection.makeRequest(objectChangeRequest); + + // integrate the returned changes with our change graph + } catch (IOException e) + { + System.err.println("SyncWithNetworkTask: Couldn't connect to " + peer + ": " + e.getMessage()); + continue; + } + + } + // if no peers could be contacted: + // return success (everyone's offline) + // if everyone reported end of history: + // delete our change graph + // perform a full sync + } +} diff --git a/src/main/protobuf/CommonMessages.proto b/src/main/protobuf/CommonMessages.proto index d4ecb02..879790d 100644 --- a/src/main/protobuf/CommonMessages.proto +++ b/src/main/protobuf/CommonMessages.proto @@ -15,6 +15,11 @@ message FriendCloudMessage { google.protobuf.Any body = 2; } +message HelloMessage { + uint32 protocol_version = 1; // this is the version of the FriendCloud protocol I speak + +} + message LoginMessage { } @@ -22,7 +27,7 @@ enum Error { ERROR_UNSPECIFIED = 0; ERROR_WHO_THE_FUCK_ARE_YOU = 1; // sender unidentified or unauthenticated ERROR_PERMISSION_DENIED = 2; // you can't do that - ERROR_OBJECT_NOT_FOUND = 3; // one or more object(s) specified don't exist + ERROR_OBJECT_NOT_FOUND = 3; // one or more object(s) referenced don't exist ERROR_INTERNAL = 4; // internal error ERROR_OUT_OF_DATE = 5; // your action is impossible because you have an out-of-date state (in a way that matters) ERROR_CHECKSUM_FAILURE = 6; // a supplied checksum didn't match the relevant data @@ -30,10 +35,13 @@ enum Error { ERROR_INVALID_ARGUMENT = 8; // an argument specified is outside the expected range ERROR_NOT_EXPECTING_REPLY = 9; // you sent a reply to a message that I wasn't expecting a reply to ERROR_INVALID_PROTOBUF = 10; // your message couldn't be decoded at all + ERROR_END_OF_HISTORY = 11; // you're referencing a change ID that I've forgotten (or never had) + ERROR_MESSAGE_BODY_UNKNOWN = 12; // I don't know how to handle the type of message in your message body; please stop sending that one } message ErrorMessage { Error error = 1; + string text = 2; } message PingMessage { diff --git a/src/main/protobuf/ObjectStatements.proto b/src/main/protobuf/ObjectStatements.proto index c097e12..75cbf9d 100644 --- a/src/main/protobuf/ObjectStatements.proto +++ b/src/main/protobuf/ObjectStatements.proto @@ -29,9 +29,19 @@ message ObjectStateRequest { } message ObjectChange { + uint64 object_id = 1; + map before = 3; + map after = 4; +} + +message ObjectChangeMessage { uint64 change_id = 1; repeated uint64 change_heads = 2; - repeated ObjectState states = 3; + repeated ObjectChange changes = 3; +} + +message ObjectChangeListMessage { + repeated ObjectChangeMessage change_messages = 1; } message ObjectChangeRequest {