Initial work on object change records

This commit is contained in:
Nekojimi 2025-09-17 12:16:15 +01:00
parent ad75b3f29c
commit 54b31ac7d1
15 changed files with 504 additions and 60 deletions

View file

@ -1,11 +1,10 @@
package moe.nekojimi.friendcloud; package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.tasks.FileDownloadTask;
import java.io.File;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.FutureTask;
public class DownloadManager public class DownloadManager
{ {

View file

@ -21,9 +21,7 @@ import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.objects.PeerFileState; import moe.nekojimi.friendcloud.objects.PeerFileState;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import org.slf4j.simple.SimpleLogger; import org.slf4j.simple.SimpleLogger;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
@ -118,8 +116,8 @@ public class Main
connectionManager.start(); connectionManager.start();
String hostname = Hostname.getHostname(); String hostname = Hostname.getHostname();
Model.getInstance().getSelfNode().setSystemName(hostname); Model.getInstance().getSelfPeer().setSystemName(hostname);
Model.getInstance().getSelfNode().setUserName(System.getProperty("user.name") + "-" + tcpPort); Model.getInstance().getSelfPeer().setUserName(System.getProperty("user.name") + "-" + tcpPort);
addHostAddress(InetAddress.getLocalHost()); addHostAddress(InetAddress.getLocalHost());
/* /*
@ -146,7 +144,7 @@ public class Main
networkFile.updateFromLocalFile(file); networkFile.updateFromLocalFile(file);
PeerFileState peerFileState = (PeerFileState) Model.getInstance().createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE); PeerFileState peerFileState = (PeerFileState) Model.getInstance().createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE);
peerFileState.setNode(Model.getInstance().getSelfNode()); peerFileState.setNode(Model.getInstance().getSelfPeer());
peerFileState.setFile(networkFile); peerFileState.setFile(networkFile);
peerFileState.setProgress(100); peerFileState.setProgress(100);
} }
@ -204,7 +202,7 @@ public class Main
private void addHostAddress(InetAddress address) private void addHostAddress(InetAddress address)
{ {
String host = address.getCanonicalHostName(); String host = address.getCanonicalHostName();
Peer selfNode = Model.getInstance().getSelfNode(); Peer selfNode = Model.getInstance().getSelfPeer();
try try
{ {
URI uri = new URI("tcp", null, host, tcpPort, null, null, null); URI uri = new URI("tcp", null, host, tcpPort, null, null, null);

View file

@ -19,7 +19,7 @@ public class Model
private final int systemID; private final int systemID;
private Peer selfPeer = null; private Peer selfPeer = null;
private ObjectChangeRecord changeHead; private ObjectChangeRecord currentChange;
private final Map<Long, ObjectChangeRecord> changeRecords = new HashMap<>(); private final Map<Long, ObjectChangeRecord> changeRecords = new HashMap<>();
private Model() private Model()
@ -28,10 +28,15 @@ public class Model
systemID = ran.nextInt() & 0x00FFFFFF; systemID = ran.nextInt() & 0x00FFFFFF;
} }
public synchronized Peer getSelfNode() public void setSelfPeer(Peer selfPeer)
{ {
if (selfPeer == null) this.selfPeer = selfPeer;
selfPeer = (Peer) createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER); }
public synchronized Peer getSelfPeer()
{
// if (selfPeer == null)
// selfPeer = (Peer) createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER);
return selfPeer; return selfPeer;
} }
// private Map<Long, Node> nodes = new HashMap<>(); // private Map<Long, Node> nodes = new HashMap<>();
@ -127,8 +132,8 @@ public class Model
public void applyChangeRecord(ObjectChangeRecord record) public void applyChangeRecord(ObjectChangeRecord record)
{ {
if (!record.getChangeHeads().contains(changeHead)) if (!record.getChangeHeads().contains(currentChange))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + changeHead.getChangeID()); throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + currentChange.getChangeID());
if (!changeRecords.containsKey(record.getChangeID())) if (!changeRecords.containsKey(record.getChangeID()))
addChangeRecord(record); addChangeRecord(record);
@ -136,4 +141,27 @@ public class Model
// if (record == null) // if (record == null)
// throw new IllegalArgumentException("Cannot apply unknown change!"); // throw new IllegalArgumentException("Cannot apply unknown change!");
} }
public Set<ObjectChangeRecord> getChangeHeads()
{
// stupid algorithm - start with all of the changes, then remove the ones that are referenced by something
// TODO: better algorithm
Set<ObjectChangeRecord> ret = new HashSet<>(changeRecords.values());
for (ObjectChangeRecord record : changeRecords.values())
{
}
}
public Set<Peer> listOtherPeers()
{
Set<Peer> ret = new HashSet<>();
for (NetworkObject.ObjectID peerID : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER)))
{
Peer peer = (Peer) getObject(peerID);
if (peer != getSelfPeer())
ret.add(peer);
}
return ret;
}
} }

View file

@ -1,39 +1,170 @@
package moe.nekojimi.friendcloud; package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.Collections; import java.nio.charset.StandardCharsets;
import java.util.HashSet; import java.security.MessageDigest;
import java.util.Set; import java.security.NoSuchAlgorithmException;
import java.util.*;
public class ObjectChangeRecord public class ObjectChangeRecord
{ {
private final long changeID; // private final long changeID;
private ObjectChangeRecord(long changeID) private final NetworkObject.ObjectID creatorPeer;
private final Set<Long> changeHeads = new HashSet<>();
private final Set<Change> changes = new HashSet<>();
public ObjectChangeRecord(NetworkObject.ObjectID creatorPeer)
{ {
this.changeID = changeID; this.creatorPeer = creatorPeer;
} }
private final Set<ObjectChangeRecord> changeHeads = new HashSet<>(); public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage)
public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChange objectChange)
{ {
throw new UnsupportedOperationException("NYI!"); ObjectChangeRecord record = new ObjectChangeRecord(new NetworkObject.ObjectID(0)); // TODO: decode creator
record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList());
for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList())
{
record.changes.add(Change.createFromObjectChange(objectChange));
}
long calculatedID = record.getChangeID();
long specifiedID = objectChangeMessage.getChangeId();
if (calculatedID != specifiedID)
{
System.err.println("WARNING: didn't decode change ID correctly!");
}
return record;
} }
public static ObjectChangeRecord createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after) public static ObjectChangeRecord createFromChanges(NetworkObject.ObjectID creator, Set<Change> changes)
{ {
throw new UnsupportedOperationException("NYI!"); ObjectChangeRecord record = new ObjectChangeRecord(creator);
record.changes.addAll(changes);
return record;
}
public byte[] getHash()
{
try
{
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return digest.digest(toString().getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
}
public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage()
{
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
builder.setChangeId(getChangeID());
builder.addAllChangeHeads(changeHeads);
for (Change change : changes)
{
builder.addChanges(change.buildObjectChange());
}
return builder;
}
public String toString()
{
StringBuilder sb = new StringBuilder();
for (long changeHeadId: changeHeads)
{
sb.append(changeHeadId).append(",");
}
sb.append(";");
for (Change change: changes)
{
sb.append(change.toString()).append(";");
}
return sb.toString();
} }
public long getChangeID() public long getChangeID()
{ {
return changeID; MessageDigest digest = null;
try
{
digest = MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
byte[] bytes = digest.digest(toString().getBytes(StandardCharsets.UTF_8));
return Util.xorBytesToLong(bytes);
} }
public Set<ObjectChangeRecord> getChangeHeads() public NetworkObject.ObjectID getCreatorPeer()
{ {
return Collections.unmodifiableSet(changeHeads); return creatorPeer;
}
public static class Change
{
private final NetworkObject.ObjectID objectID;
private final Map<String,String> beforeValues;
private final Map<String,String> afterValues;
public Change(NetworkObject.ObjectID objectID, Map<String, String> before, Map<String, String> after)
{
this.objectID = objectID;
this.beforeValues = before;
this.afterValues = after;
}
public static Change createFromObjectChange(ObjectStatements.ObjectChange change)
{
return new Change(new NetworkObject.ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap());
}
public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after)
{
Map<String,String> beforeValues = new HashMap<>();
Map<String,String> afterValues = new HashMap<>();
for (String key: after.getValuesMap().keySet())
{
String beforeValue = before.getValuesOrDefault(key, null);
String afterValue = after.getValuesOrDefault(key, null);
if (!afterValue.equals(beforeValue))
{
beforeValues.put(key,beforeValue);
afterValues.put(key,afterValue);
}
}
if (!afterValues.isEmpty())
{
return new Change(new NetworkObject.ObjectID(before.getObjectId()), beforeValues, afterValues);
}
return null;
}
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append(objectID.toLong()).append(";"); // The object ID, then ;
// now all key-value pairs in alphabetical order
List<String> keys = new ArrayList<>(beforeValues.keySet());
Collections.sort(keys);
for (String key : keys)
{
sb.append(key).append(":").append(afterValues.get(key));
}
sb.append(";");
return sb.toString();
}
public ObjectStatements.ObjectChange.Builder buildObjectChange()
{
ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();
builder.putAllBefore(beforeValues);
builder.putAllAfter(afterValues);
builder.setObjectId(objectID.toLong());
return builder;
}
} }
} }

View file

@ -0,0 +1,73 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.tasks.PropagateMessageTask;
import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class ObjectChangeTransaction implements Closeable
{
private final NetworkObject.ObjectID creator;
private final ConnectionManager connectionManager;
private final Map<NetworkObject.ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
private boolean ended = false;
ObjectChangeTransaction(ConnectionManager connectionManager, NetworkObject.ObjectID creator)
{
this.creator = creator;
this.connectionManager = connectionManager;
}
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, NetworkObject.ObjectID creatorPeer, NetworkObject.ObjectID... objects)
{
ObjectChangeTransaction builder = new ObjectChangeTransaction(connectionManager, creatorPeer);
for (NetworkObject.ObjectID id : objects)
{
builder.addObjectBeforeChange(id);
}
return builder;
}
public ObjectChangeTransaction addObjectBeforeChange(NetworkObject.ObjectID id)
{
NetworkObject object = Model.getInstance().getObject(id);
if (object != null)
beforeStates.put(id, object.buildObjectState().build());
return this;
}
public ObjectChangeRecord endTransaction()
{
if (ended)
throw new IllegalStateException("Transaction already ended!");
ended = true;
Set<ObjectChangeRecord.Change> changes = new HashSet<>();
for (Map.Entry<NetworkObject.ObjectID, ObjectStatements.ObjectState> entry : beforeStates.entrySet())
{
ObjectStatements.ObjectState afterState = Model.getInstance().getObject(entry.getKey()).buildObjectState().build();
ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState);
changes.add(change);
}
return ObjectChangeRecord.createFromChanges(creator, changes);
}
@Override
public void close() throws IOException
{
// end the transaction and get the change object
ObjectChangeRecord objectChangeRecord = endTransaction();
// add the new change to the model
Model.getInstance().addChangeRecord(objectChangeRecord);
// create a task to propagate the change to other peers
Main.getInstance().getExecutor().submit(new PropagateMessageTask(objectChangeRecord.buildObjectChangeMessage().build()));
}
}

View file

@ -0,0 +1,19 @@
package moe.nekojimi.friendcloud;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
public class Util
{
public static long xorBytesToLong(byte[] bytes)
{
ByteBuffer buf = ByteBuffer.wrap(bytes);
LongBuffer longs = buf.asLongBuffer();
long ret = 0xBEEFCAFEF00DBABEL;
for (long l: longs.array())
{
ret = ret ^ l;
}
return ret;
}
}

View file

@ -75,13 +75,18 @@ public abstract class PeerConnection extends Thread
} }
} }
public synchronized void sendUnsolicitedMessage(Message message) throws IOException
{
sendMessage(wrapMessage(message));
}
protected abstract void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException; protected abstract void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException;
private CommonMessages.FriendCloudMessage wrapMessage(Message message, CommonMessages.MessageHeader inReplyTo) private CommonMessages.FriendCloudMessage wrapMessage(Message message, CommonMessages.MessageHeader inReplyTo)
{ {
CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder() CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder()
.setMessageId(nextMessageId) .setMessageId(nextMessageId)
.setSenderId(Model.getInstance().getSelfNode().getObjectID().toLong()); .setSenderId(Model.getInstance().getSelfPeer().getObjectID().toLong());
if (inReplyTo != null) if (inReplyTo != null)
headerBuilder.setReplyToMessageId(inReplyTo.getMessageId()); headerBuilder.setReplyToMessageId(inReplyTo.getMessageId());
@ -102,37 +107,43 @@ public abstract class PeerConnection extends Thread
{ {
System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId()); System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build(); CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build();
sendMessage(wrapMessage(errorMessage,replyHeader)); sendMessage(wrapMessage(errorMessage, replyHeader));
} }
protected void messageReceived(@org.jetbrains.annotations.NotNull CommonMessages.FriendCloudMessage message) protected void messageReceived(@org.jetbrains.annotations.NotNull CommonMessages.FriendCloudMessage message)
{ {
if (artificalDelayMs > 0)
{
try
{
System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms...");
Thread.sleep(artificalDelayMs);
} catch (InterruptedException e)
{
// well never mind then
}
}
CommonMessages.MessageHeader header = message.getHeader(); CommonMessages.MessageHeader header = message.getHeader();
NetworkObject.ObjectID senderID = new NetworkObject.ObjectID(header.getSenderId());
peer = (Peer) Model.getInstance().getOrCreateObject(senderID);
Any body = message.getBody();
long replyToMessageId = header.getReplyToMessageId();
System.out.println("Received message! type=" + body.getTypeUrl() + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId );
try try
{ {
try try
{ {
if (artificalDelayMs > 0)
{
try
{
System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms...");
Thread.sleep(artificalDelayMs);
} catch (InterruptedException e)
{
// well never mind then
}
}
NetworkObject.ObjectID senderID = new NetworkObject.ObjectID(header.getSenderId());
if (peer == null)
peer = (Peer) Model.getInstance().getOrCreateObject(senderID);
else
{
if (!senderID.equals(peer.getObjectID()))
throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU);
}
Any body = message.getBody();
long replyToMessageId = header.getReplyToMessageId();
System.out.println("Received message! type=" + body.getTypeUrl() + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
if (replyToMessageId != 0) if (replyToMessageId != 0)
{ {
if (pendingRequests.containsKey(replyToMessageId)) if (pendingRequests.containsKey(replyToMessageId))
@ -146,13 +157,11 @@ public abstract class PeerConnection extends Thread
{ {
handleUnsolicitedMessage(header, body); handleUnsolicitedMessage(header, body);
} }
} } catch (ReplyWithErrorException ex)
catch (ReplyWithErrorException ex)
{ {
ex.printStackTrace(System.err); ex.printStackTrace(System.err);
replyWithError(ex.getError(), header); replyWithError(ex.getError(), header);
} } catch (IllegalArgumentException ex)
catch (IllegalArgumentException ex)
{ {
ex.printStackTrace(System.err); ex.printStackTrace(System.err);
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header); replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);

View file

@ -0,0 +1,43 @@
package moe.nekojimi.friendcloud.network.requests;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.Set;
public class ObjectChangeRequest extends Request<ObjectStatements.ObjectChangeRequest, Set<ObjectStatements.ObjectChange>>
{
private final Set<Long> changesSinceIDs;
public ObjectChangeRequest(Set<Long> changesSinceIDs)
{
this.changesSinceIDs = changesSinceIDs;
}
@Override
public ObjectStatements.ObjectChangeRequest buildMessage()
{
return ObjectStatements.ObjectChangeRequest.newBuilder().addAllChangesSince(changesSinceIDs).build();
}
@Override
public boolean handleReply(Any reply) throws InvalidProtocolBufferException
{
if (!super.handleReply(reply))
return false;
if (reply.is(ObjectStatements.ObjectChangeMessage.class))
{
ObjectStatements.ObjectChangeMessage objectChangeMessage = reply.unpack(ObjectStatements.ObjectChangeMessage.class);
ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(objectChangeMessage);
Model.getInstance().applyChangeRecord(objectChangeRecord);
return true;
}
return false;
}
}

View file

@ -24,7 +24,7 @@ public abstract class NetworkFSNode extends NetworkObject
{ {
long parentID = Long.parseLong(state.getValuesOrThrow("parent")); long parentID = Long.parseLong(state.getValuesOrThrow("parent"));
if (parentID != 0) if (parentID != 0)
parent = (NetworkFolder) Model.getInstance().getOrCreateObject(new ObjectID(parentID)); parent = (NetworkFolder) Model.getInstance().getObject(new ObjectID(parentID));
else else
parent = null; parent = null;
} }

View file

@ -1,6 +1,7 @@
package moe.nekojimi.friendcloud; package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.ConnectionManager;
import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest; import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest;
import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.NetworkFile;

View file

@ -0,0 +1,39 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.ObjectChangeTransaction;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.IOException;
public class JoinNetworkTask implements Runnable
{
@Override
public void run()
{
// generate new peer ID
NetworkObject.ObjectID peerID = null;
try (ObjectChangeTransaction builder = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), peerID))
{
Peer selfPeer = Model.getInstance().getSelfPeer();
if (selfPeer != null)
peerID = selfPeer.getObjectID();
else
peerID = Model.getInstance().getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER);
// synchronise with the network
SyncWithNetworkTask syncWithNetworkTask = new SyncWithNetworkTask();
syncWithNetworkTask.run();
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}

View file

@ -0,0 +1,39 @@
package moe.nekojimi.friendcloud.tasks;
import com.google.protobuf.Message;
import moe.nekojimi.friendcloud.ConnectionManager;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import java.io.IOException;
public class PropagateMessageTask implements Runnable
{
private final Message message;
public PropagateMessageTask(Message message)
{
this.message = message;
}
@Override
public void run()
{
ConnectionManager connectionManager = Main.getInstance().getConnectionManager();
for (Peer peer: Model.getInstance().listOtherPeers())
{
try
{
PeerConnection connection = connectionManager.getNodeConnection(peer);
if (connection != null)
connection.sendUnsolicitedMessage(message);
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
}

View file

@ -0,0 +1,47 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class SyncWithNetworkTask implements Runnable
{
@Override
public void run()
{
// for each other peer:
for (Peer peer : Model.getInstance().listOtherPeers())
{
// open a connection
try
{
PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
// send a ObjectChangeRequest
ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Model.getInstance().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()));
CompletableFuture<Set<ObjectStatements.ObjectChange>> future = connection.makeRequest(objectChangeRequest);
// integrate the returned changes with our change graph
} catch (IOException e)
{
System.err.println("SyncWithNetworkTask: Couldn't connect to " + peer + ": " + e.getMessage());
continue;
}
}
// if no peers could be contacted:
// return success (everyone's offline)
// if everyone reported end of history:
// delete our change graph
// perform a full sync
}
}

View file

@ -15,6 +15,11 @@ message FriendCloudMessage {
google.protobuf.Any body = 2; google.protobuf.Any body = 2;
} }
message HelloMessage {
uint32 protocol_version = 1; // this is the version of the FriendCloud protocol I speak
}
message LoginMessage { message LoginMessage {
} }
@ -22,7 +27,7 @@ enum Error {
ERROR_UNSPECIFIED = 0; ERROR_UNSPECIFIED = 0;
ERROR_WHO_THE_FUCK_ARE_YOU = 1; // sender unidentified or unauthenticated ERROR_WHO_THE_FUCK_ARE_YOU = 1; // sender unidentified or unauthenticated
ERROR_PERMISSION_DENIED = 2; // you can't do that ERROR_PERMISSION_DENIED = 2; // you can't do that
ERROR_OBJECT_NOT_FOUND = 3; // one or more object(s) specified don't exist ERROR_OBJECT_NOT_FOUND = 3; // one or more object(s) referenced don't exist
ERROR_INTERNAL = 4; // internal error ERROR_INTERNAL = 4; // internal error
ERROR_OUT_OF_DATE = 5; // your action is impossible because you have an out-of-date state (in a way that matters) ERROR_OUT_OF_DATE = 5; // your action is impossible because you have an out-of-date state (in a way that matters)
ERROR_CHECKSUM_FAILURE = 6; // a supplied checksum didn't match the relevant data ERROR_CHECKSUM_FAILURE = 6; // a supplied checksum didn't match the relevant data
@ -30,10 +35,13 @@ enum Error {
ERROR_INVALID_ARGUMENT = 8; // an argument specified is outside the expected range ERROR_INVALID_ARGUMENT = 8; // an argument specified is outside the expected range
ERROR_NOT_EXPECTING_REPLY = 9; // you sent a reply to a message that I wasn't expecting a reply to ERROR_NOT_EXPECTING_REPLY = 9; // you sent a reply to a message that I wasn't expecting a reply to
ERROR_INVALID_PROTOBUF = 10; // your message couldn't be decoded at all ERROR_INVALID_PROTOBUF = 10; // your message couldn't be decoded at all
ERROR_END_OF_HISTORY = 11; // you're referencing a change ID that I've forgotten (or never had)
ERROR_MESSAGE_BODY_UNKNOWN = 12; // I don't know how to handle the type of message in your message body; please stop sending that one
} }
message ErrorMessage { message ErrorMessage {
Error error = 1; Error error = 1;
string text = 2;
} }
message PingMessage { message PingMessage {

View file

@ -29,9 +29,19 @@ message ObjectStateRequest {
} }
message ObjectChange { message ObjectChange {
uint64 object_id = 1;
map<string,string> before = 3;
map<string,string> after = 4;
}
message ObjectChangeMessage {
uint64 change_id = 1; uint64 change_id = 1;
repeated uint64 change_heads = 2; repeated uint64 change_heads = 2;
repeated ObjectState states = 3; repeated ObjectChange changes = 3;
}
message ObjectChangeListMessage {
repeated ObjectChangeMessage change_messages = 1;
} }
message ObjectChangeRequest { message ObjectChangeRequest {