diff --git a/pom.xml b/pom.xml
index ea57383..3d2b4e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,6 +74,11 @@
jlibnotify
1.1.0
+
+ engineering.swat
+ java-watch
+ 0.9.5
+
diff --git a/src/main/java/moe/nekojimi/friendcloud/Controller.java b/src/main/java/moe/nekojimi/friendcloud/Controller.java
new file mode 100644
index 0000000..84a2137
--- /dev/null
+++ b/src/main/java/moe/nekojimi/friendcloud/Controller.java
@@ -0,0 +1,411 @@
+package moe.nekojimi.friendcloud;
+
+import moe.nekojimi.friendcloud.objects.*;
+import moe.nekojimi.friendcloud.protos.ObjectStatements;
+import moe.nekojimi.friendcloud.storage.CachingDataStore;
+import moe.nekojimi.friendcloud.storage.DataStore;
+import moe.nekojimi.friendcloud.storage.LocalData;
+import moe.nekojimi.friendcloud.storage.Storable;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class Controller
+{
+
+ private final CachingDataStore dataStore;
+ private LocalData localData;
+
+ private Set changeHeads = null;
+
+ public Controller(DataStore dataStore)
+ {
+ this.dataStore = new CachingDataStore(dataStore);
+ }
+
+ public synchronized void init()
+ {
+
+ List localDataList = dataStore.getDAOForClass(LocalData.class).getAll();
+ if (localDataList.isEmpty())
+ {
+ localData = dataStore.getDAOForClass(LocalData.class).create(0);
+ }
+ else if (localDataList.size() == 1)
+ {
+ localData = localDataList.getFirst();
+ }
+ else
+ {
+ throw new IllegalStateException("We have more than one LocalData somehow!!");
+ }
+ if (localData.getSystemID() == 0)
+ {
+ Random ran = new Random();
+ localData.setSystemID(ran.nextInt() & 0x00FFFFFF);
+ objectChanged(localData);
+ }
+ }
+
+ public LocalData getLocalData()
+ {
+ return localData;
+ }
+
+ public Peer getLocalPeer()
+ {
+ return localData.getLocalPeer();
+ }
+
+ public Peer createLocalPeer(ObjectID id)
+ {
+ if (localData.getLocalPeer() == null)
+ {
+ localData.setLocalPeer(dataStore.getDAOForClass(Peer.class).create(id.toLong()));
+ objectChanged(localData);
+ }
+ return localData.getLocalPeer();
+ }
+
+ // private Map nodes = new HashMap<>();
+
+ public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type)
+ {
+ Random ran = new Random();
+ int randomNumber = ran.nextInt();
+ ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber);
+ System.out.println("Assigned new object ID: " + objectID);
+ return objectID;
+ }
+
+ public static Class extends NetworkObject> getNetworkObjectClassByType(ObjectStatements.ObjectType type)
+ {
+ return switch (type)
+ {
+ case OBJECT_TYPE_FILE -> NetworkFile.class;
+ case OBJECT_TYPE_FOLDER -> NetworkFolder.class;
+ case OBJECT_TYPE_PEER -> Peer.class;
+ case OBJECT_TYPE_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("???");
+ default -> throw new UnsupportedOperationException("NYI: " + type);
+ };
+ }
+
+ public synchronized T createObjectByID(ObjectID id)
+ {
+ if (id.toLong() == 0)
+ throw new IllegalArgumentException("Cannot create an object with ID=0!");
+ ObjectStatements.ObjectType type = id.getType();
+ System.out.println("Creating new object with type: " + type.name());
+ if (type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED)
+ throw new IllegalArgumentException();
+ T ret = (T) dataStore.getDAOForClass(getNetworkObjectClassByType(type)).create(id.toLong());
+ return ret;
+ }
+
+ public synchronized T createObjectByType(ObjectStatements.ObjectType type)
+ {
+ return createObjectByID(getNextObjectID(type));
+ }
+
+ public synchronized T getObject(ObjectID id)
+ {
+ if (id.toLong() == 0)
+ return null;
+ Class clazz = (Class) getNetworkObjectClassByType(id.getType());
+ return dataStore.getDAOForClass(clazz).get(id.toLong());
+ }
+
+// public synchronized T getOrCreateObject(ObjectID id)
+// {
+// if (id.toLong() == 0)
+// return null;
+// Class clazz = (Class) getNetworkObjectClassByType(id.getType());
+// return dataStore.getDAOForClass(clazz).getOrCreate(id.toLong());
+// }
+
+ public synchronized List listObjects(Set types)
+ {
+ List ret = new ArrayList<>();
+ Set> classes = types.stream().map(Controller::getNetworkObjectClassByType).collect(Collectors.toSet());
+ for (Class extends NetworkObject> clazz: classes)
+ {
+ List extends NetworkObject> list = dataStore.getDAOForClass(clazz).getAll();
+ ret.addAll(list);
+ }
+ return ret;
+ }
+
+ public synchronized List listFSNodes(String path)
+ {
+ //TODO: dumbest algorithm in the world
+
+ NetworkFolder folder = (NetworkFolder) getFSNode(path);
+
+ List ret = new ArrayList<>();
+ for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
+ {
+ NetworkFSNode fsNode = (NetworkFSNode) object;
+ if (Objects.equals(fsNode.getParent(), folder))
+ ret.add(fsNode);
+ }
+ return ret;
+ }
+
+ public synchronized void addChangeRecord(ObjectChangeRecord record)
+ {
+ DataStore.DAO dao = dataStore.getDAOForClass(ObjectChangeRecord.class);
+ dao.update(record);
+ // update the change heads; if any of this change's heads are included in ours, then this change replaces them
+ if (changeHeads != null)
+ {
+ boolean recordIsNewHead = changeHeads.isEmpty();
+ if (!recordIsNewHead)
+ {
+ for (long changeHeadID : record.getChangeHeads())
+ {
+ ObjectChangeRecord headRecord = dao.get(changeHeadID);
+ recordIsNewHead = changeHeads.remove(headRecord);
+ }
+ }
+ if (recordIsNewHead)
+ {
+ changeHeads.add(record);
+ System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
+ }
+ }
+ }
+
+ public void setCurrentChangeRecord(ObjectChangeRecord record)
+ {
+ addChangeRecord(record);
+ long oldChangeID = localData.getCurrentChangeRecord() == null ? 0L : localData.getCurrentChangeRecord().getChangeID();
+ localData.setCurrentChangeRecord(record);
+ objectChanged(localData);
+ System.out.println("Controller: Change record set; old= " + Long.toHexString(oldChangeID) + " current= " + Long.toHexString(localData.getCurrentChangeRecord().getChangeID()));
+ }
+
+ public ObjectChangeRecord getChangeRecord(long id)
+ {
+ if (id == 0)
+ return null;
+ return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id);
+ }
+
+ public void addChangeRecords(Set objectChangeRecords)
+ {
+ List sorted = ObjectChangeRecord.partiallySort(objectChangeRecords);
+ for (ObjectChangeRecord record : sorted)
+ addChangeRecord(record);
+ }
+
+ public void applyChangeRecord(ObjectChangeRecord record)
+ {
+ System.out.println("Controller: Applying change record " + record);
+ long changeID = localData.getCurrentChangeRecord().getChangeID();
+ if (!record.getChangeHeads().contains(changeID))
+ throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID));
+ addChangeRecord(record);
+
+ record.applyToLocalState();
+
+ setCurrentChangeRecord(record);
+ objectChanged(localData);
+
+// if (record == null)
+// throw new IllegalArgumentException("Cannot apply unknown change!");
+ }
+
+ public void applyChangeRecords(Set changes)
+ {
+ System.out.println("Controller: applying " + changes.size() + " change records.");
+ addChangeRecords(changes);
+
+// List record = ObjectChangeRecord.partiallySort(changes);
+
+
+ Set pendingChanges = new HashSet<>(changes);
+
+ while (!pendingChanges.isEmpty())
+ {
+ // find all changes whose change heads = our current change heads
+ Set applicableChanges = pendingChanges.stream()
+ .filter(objectChangeRecord -> objectChangeRecord.getChangeHeads().equals(getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet())))
+ .collect(Collectors.toSet());
+
+ // if there are none, we've reached the present (or a present), break
+ if (applicableChanges.isEmpty())
+ break;
+
+ // if there's just one, follow that one
+ // TODO: if there's more than one, that's a fork, we should select a current branch by some logic
+ ObjectChangeRecord changeToApply = applicableChanges.iterator().next();
+ applyChangeRecord(changeToApply);
+
+ // the change we took (and any untaken forks) are no longer pending
+ pendingChanges.removeAll(applicableChanges);
+ }
+ }
+
+ public Set getChangeHeads()
+ {
+ // stupid algorithm - start with all of the changes, then remove the ones that are referenced by something
+ // TODO: better algorithm
+ if (changeHeads == null)
+ {
+ changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
+ Set referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet());
+ changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID()));
+ System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
+ }
+ return changeHeads;
+ }
+
+ public Set listOtherPeers()
+ {
+ Set ret = new HashSet<>();
+ for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER)))
+ {
+ Peer peer = (Peer) object;
+ if (localData.getLocalPeer() == null || !peer.equals(localData.getLocalPeer()))
+ ret.add(peer);
+ }
+ return ret;
+ }
+
+ public void objectChanged(T storable)
+ {
+ Class clazz = (Class) storable.getClass();
+ dataStore.getDAOForClass(clazz).update(storable);
+ }
+
+ public DataStore getDataStore()
+ {
+ return dataStore;
+ }
+
+ public void clearEverything()
+ {
+ dataStore.clear();
+ }
+
+ public void addNetworkObject(NetworkObject object)
+ {
+ objectChanged(object);
+ }
+
+ public Set findChangesSince(List changeIDs)
+ {
+ // check that the specified change IDs are actually present in our history
+ DataStore.DAO ocrDao = dataStore.getDAOForClass(ObjectChangeRecord.class);
+ if (!changeIDs.stream().allMatch(ocrDao::exists))
+ return null;
+
+ // start with the current change heads
+ Set ret = new HashSet<>();
+
+ Deque openSet = new LinkedList<>(getChangeHeads());
+ while (!openSet.isEmpty())
+ {
+ ObjectChangeRecord record = openSet.poll();
+ if (changeIDs.contains(record.getChangeID()))
+ continue;
+ ret.add(record);
+ for (long changeHeadID: record.getChangeHeads())
+ {
+ openSet.add(ocrDao.get(changeHeadID));
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Changes the network name and/or path of a given network FS node.
+ * Performs both renaming and moving operations. If the destination folder(s) don't exist, they will be created as per makeFolder.
+ * @param fsNode the file or folder being renamed or moved.
+ * @param newpath the new path of the node, starting with and separated by /.
+ */
+ public synchronized boolean renameFSNode(NetworkFSNode fsNode, String newpath) throws IOException
+ {
+ if (fsNode.getNetworkPath().equals(newpath))
+ return true;
+ int lastSlash = newpath.lastIndexOf("/");
+ String name = newpath.substring(lastSlash+1);
+ String path = newpath.substring(0, lastSlash);
+ if (path.isEmpty())
+ path = "/";
+ try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
+ {
+ transaction.addObjectBeforeChange(fsNode);
+ if (!name.equals(fsNode.getName()))
+ fsNode.setName(name);
+ NetworkFolder parent = fsNode.getParent();
+ if (!path.equals(parent != null ? parent.getNetworkPath() : "/"))
+ {
+ NetworkFSNode fsn = getFSNode(path);
+ if (fsn != null && !(fsn instanceof NetworkFolder))
+ throw new IllegalArgumentException("Given parent path " + fsn.getNetworkPath() + " is a file!");
+ NetworkFolder newParent = (NetworkFolder) fsn;
+ if (newParent == null && !path.equals("/"))
+ {
+ newParent = makeFSFolder(path);
+ }
+ fsNode.setParent(newParent);
+ }
+
+ } catch (IOException e)
+ {
+ e.printStackTrace(System.err);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Creates (or ensures the existence of) a NetworkFolder at the given path, along with all non-existent parent folders.
+ * @param path the path of the new NetworkFolder; must start with "/"
+ * @return the new NetworkFolder; or null, if path is "/"
+ */
+ public synchronized NetworkFolder makeFSFolder(String path)
+ {
+ if (path.equals("/") || path.isEmpty())
+ return null;
+ NetworkFolder ret = null;
+ try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
+ {
+ NetworkFSNode node = getFSNode(path);
+ if (node != null && !(node instanceof NetworkFolder))
+ throw new IllegalArgumentException("Given parent path " + node.getNetworkPath() + " is a file!");
+ ret = (NetworkFolder) node;
+ if (ret != null)
+ return ret; // already exists
+
+ int lastSlash = path.lastIndexOf("/");
+ String name = path.substring(lastSlash+1);
+ String parentPath = path.substring(0, lastSlash);
+ NetworkFolder parent = makeFSFolder(parentPath);
+ ret = createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER);
+ ret.setParent(parent);
+ ret.setName(name);
+ transaction.addNewlyCreatedObject(ret);
+ return ret;
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public synchronized NetworkFSNode getFSNode(String path)
+ {
+ // TODO: bad algorithm make better
+ for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
+ {
+ NetworkFSNode fsNode = (NetworkFSNode) object;
+ String networkPath = fsNode.getNetworkPath();
+ if (networkPath.equals(path))
+ return fsNode;
+ }
+ return null;
+ }
+}
diff --git a/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java b/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java
index 80cecb1..f03175a 100644
--- a/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java
+++ b/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java
@@ -12,13 +12,16 @@ public class FilePieceAccess implements Closeable
private final NetworkFile networkFile;
private final File file;
private final RandomAccessFile randomAccessFile;
+ private final OpenMode openMode;
- public FilePieceAccess(NetworkFile networkFile) throws IOException
+ public FilePieceAccess(NetworkFile networkFile, OpenMode openMode) throws IOException
{
this.networkFile = networkFile;
this.file = networkFile.getOrCreateLocalFile();
- this.randomAccessFile = new RandomAccessFile(file,"rw");
- randomAccessFile.setLength(file.length());
+ this.randomAccessFile = new RandomAccessFile(file,openMode == OpenMode.READ_WRITE ? "rw" : "r");
+ this.openMode = openMode;
+ if (openMode == OpenMode.READ_WRITE)
+ randomAccessFile.setLength(file.length());
}
public long getPieceOffset(int index)
@@ -56,7 +59,9 @@ public class FilePieceAccess implements Closeable
public void writePiece(int index, byte[] buffer) throws IOException
{
- if (buffer.length != getPieceSize(index))
+ if (openMode == OpenMode.READ_ONLY)
+ throw new IllegalStateException("File was opened read-only!");
+ else if (buffer.length != getPieceSize(index))
throw new IllegalArgumentException("Received a file piece that's the wrong size!! Length = " + buffer.length + " != Piece Size = " + getPieceSize(index));
else if (index >= networkFile.getPieceCount())
throw new IllegalArgumentException("Received a file piece with an index past the end of the file!!");
@@ -72,4 +77,10 @@ public class FilePieceAccess implements Closeable
{
randomAccessFile.close();
}
+
+ public enum OpenMode
+ {
+ READ_ONLY,
+ READ_WRITE;
+ }
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/Main.java b/src/main/java/moe/nekojimi/friendcloud/Main.java
index 6f19f50..b1802d5 100644
--- a/src/main/java/moe/nekojimi/friendcloud/Main.java
+++ b/src/main/java/moe/nekojimi/friendcloud/Main.java
@@ -2,37 +2,17 @@ package moe.nekojimi.friendcloud;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
-import com.kstruct.gethostname4j.Hostname;
-import com.offbynull.portmapper.PortMapperFactory;
-import com.offbynull.portmapper.gateway.Bus;
-import com.offbynull.portmapper.gateway.Gateway;
-import com.offbynull.portmapper.gateways.network.NetworkGateway;
-import com.offbynull.portmapper.gateways.process.ProcessGateway;
-import com.offbynull.portmapper.mapper.MappedPort;
-import com.offbynull.portmapper.mapper.PortMapper;
-import com.offbynull.portmapper.mapper.PortType;
-import es.blackleg.jlibnotify.JLibnotify;
-import es.blackleg.jlibnotify.JLibnotifyNotification;
-import es.blackleg.jlibnotify.core.DefaultJLibnotify;
-import es.blackleg.jlibnotify.core.DefaultJLibnotifyLoader;
-import es.blackleg.jlibnotify.exception.JLibnotifyInitException;
-import es.blackleg.jlibnotify.exception.JLibnotifyLoadException;
import jnr.ffi.Platform;
import moe.nekojimi.friendcloud.filesystem.FUSEAccess;
-import moe.nekojimi.friendcloud.network.PeerConnection;
-import moe.nekojimi.friendcloud.network.requests.ObjectListRequest;
-import moe.nekojimi.friendcloud.objects.NetworkFile;
-import moe.nekojimi.friendcloud.objects.NetworkObject;
-import moe.nekojimi.friendcloud.objects.Peer;
-import moe.nekojimi.friendcloud.objects.PeerFileState;
-import moe.nekojimi.friendcloud.protos.ObjectStatements;
+import moe.nekojimi.friendcloud.network.ConnectionManager;
+import moe.nekojimi.friendcloud.network.TCPConnectionBackend;
+import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.storage.DataStore;
-import moe.nekojimi.friendcloud.storage.Model;
import moe.nekojimi.friendcloud.storage.StupidJSONFileStore;
import moe.nekojimi.friendcloud.tasks.JoinNetworkTask;
+import moe.nekojimi.friendcloud.tasks.PropagateMessageTask;
import org.slf4j.simple.SimpleLogger;
-import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.net.*;
@@ -43,41 +23,31 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
+import java.util.stream.Collectors;
public class Main
{
private static Main instance;
- @Parameter(names="-share")
- private List sharedFilePaths = new ArrayList<>();
- @Parameter(names="-known-peer")
- private List knownPeers = new ArrayList<>();
-
- @Parameter(names="-tcp-port")
- private int tcpPort = 7777;
-
- @Parameter(names="-no-upnp")
- private boolean noUpnp = false;
-
- @Parameter(names="-create-network")
- private boolean createNetwork = false;
-
- @Parameter(names = "-storage")
- private String storageLocation = ".";
// @Parameter(names="-file")
+ private Args args;
private ConnectionManager connectionManager;
private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(16);
+ private final Set> scheduledFutures = new HashSet<>();
private final FUSEAccess fuseAccess = new FUSEAccess();
- private Model model;
+ private Controller controller;
private final NotificationManager notificationManager = new NotificationManager();
+ private final SharedFileManager sharedFileManager = new SharedFileManager();
- public static void main(String[] args)
+ public static void main(String[] argv)
{
instance = new Main();
- JCommander.newBuilder().addObject(instance).build().parse(args);
+ Args args = new Args();
+ JCommander.newBuilder().addObject(args).build().parse(argv);
+ instance.args = args;
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info");
@@ -88,25 +58,19 @@ public class Main
{
System.err.println("main() received exception, dying horribly!!");
e.printStackTrace(System.err);
- try
- {
- instance.shutdown();
- } catch (IOException f)
- {
- throw new RuntimeException(f);
- }
+ instance.shutdown();
System.exit(1);
}
// TestMessage.SearchRequest request = TestMessage.SearchRequest.newBuilder().setQuery("bees!").setPageNumber(316).setResultsPerPage(42069).build();
}
- private void run() throws IOException, InterruptedException, JLibnotifyLoadException, JLibnotifyInitException
+ private void run() throws IOException
{
- DataStore dataStore = new StupidJSONFileStore(new File(storageLocation));
- model = new Model(dataStore);
+ DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation));
+ controller = new Controller(dataStore);
- model.init();
- connectionManager = new ConnectionManager(tcpPort);
+ controller.init();
+ connectionManager = new ConnectionManager();
Path mountPoint;
if (Platform.getNativePlatform().getOS() == Platform.OS.WINDOWS)
@@ -115,23 +79,14 @@ public class Main
}
else
{
- mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + tcpPort);
- boolean created = mountPoint.toFile().mkdirs();
+ mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + args.tcpPort);
+ mountPoint.toFile().mkdirs();
System.out.println("Created FUSE mount point " + mountPoint);
}
fuseAccess.mount(mountPoint);
System.out.println("Mounted virtual filesystem at " + mountPoint);
- Runtime.getRuntime().addShutdownHook(new Thread(() ->
- {
- try
- {
- shutdown();
- } catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }));
+ Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
// if (Desktop.isDesktopSupported())
// {
@@ -139,15 +94,9 @@ public class Main
// desktop.browse(mountPoint.toFile().toURI());
// }
- connectionManager.addNewConnectionConsumer(this::requestCompleteState);
+// connectionManager.addNewConnectionConsumer(this::requestCompleteState);
- connectionManager.start();
-
- String hostname = Hostname.getHostname();
- model.getSelfPeer().setSystemName(hostname);
- model.getSelfPeer().setUserName(System.getProperty("user.name") + "-" + tcpPort);
- addHostAddress(InetAddress.getLocalHost());
- model.objectChanged(model.getSelfPeer());
+ connectionManager.addConnectionBackend(new TCPConnectionBackend(args.tcpPort, connectionManager));
/*
Startup procedure:
@@ -159,149 +108,71 @@ public class Main
- Publish local file changes
*/
- if (!noUpnp)
- setupIGP();
+ JoinNetworkTask joinNetworkTask = new JoinNetworkTask();
+ CompletableFuture.runAsync(joinNetworkTask,executor)
+ .thenRun(this::shareInitialFiles)
+ .thenRun(this::scheduleCheckins)
+ .handle((unused, throwable) ->
+ {
+ if (throwable != null)
+ {
+ System.err.println("Error in initial task!");
+ throwable.printStackTrace(System.err);
+ shutdown();
+ }
+ return null;
+ });
+ }
+ private void scheduleCheckins()
+ {
+ executor.scheduleWithFixedDelay(() -> {
+ System.out.println("Checking in with friends...");
+ CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder()
+ .addAllCurrentChangeHeads(controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()))
+ .build();
+ PropagateMessageTask propagateMessageTask = new PropagateMessageTask(checkInMessage);
+ executor.submit(propagateMessageTask);
+ }, 0,5, TimeUnit.MINUTES);
+ }
+
+ private void shareInitialFiles()
+ {
+ System.out.println("Sharing files given on command line...");
Set sharedFiles = new HashSet<>();
- for (String sharedFilePath: sharedFilePaths)
+ for (String sharedFilePath: args.sharedFilePaths)
{
sharedFiles.add(new File(sharedFilePath));
}
- List knownFiles = model.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
-
- for (NetworkObject knownFile: knownFiles)
- {
- NetworkFile f = (NetworkFile) knownFile;
- boolean removed = sharedFiles.remove(f.getLocalFile());
- if (removed)
- System.out.println("Identified known local file " + f.getObjectID() + " = " + f.getLocalFile());
- }
-
- for (File sharedFile: sharedFiles)
- {
- if (sharedFile.exists())
- {
- System.out.println("Adding shared network file: " + sharedFile.getAbsolutePath());
-
- NetworkFile networkFile = (NetworkFile) model.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FILE);
- networkFile.updateFromLocalFile(sharedFile);
- model.objectChanged(networkFile);
-
-// PeerFileState peerFileState = (PeerFileState) model.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE);
-// peerFileState.setNode(model.getSelfPeer());
-// peerFileState.setFile(networkFile);
-// peerFileState.setProgress(100);
-// model.objectChanged(peerFileState);
- }
- }
-
-// JoinNetworkTask joinNetworkTask = new JoinNetworkTask();
-// executor.submit(joinNetworkTask);
-
- for (String knownPeerAddress : knownPeers)
- {
- String[] split = knownPeerAddress.split(":");
- if (split.length != 2)
- {
- System.err.println("ERROR: " + knownPeerAddress + " isn't a valid address.");
- continue;
- }
- InetSocketAddress address = new InetSocketAddress(split[0],Integer.parseInt(split[1]));
-
- try
- {
- URI uri = new URI("tcp", null, address.getHostString(), address.getPort(), null, null, null);
- PeerConnection nodeConnection = connectionManager.getNodeConnection(uri);
-
- requestCompleteState(nodeConnection);
- } catch (ConnectException ex)
- {
- System.out.println("Couldn't connect to host " + address);
- }
- catch (URISyntaxException e)
- {
- throw new RuntimeException(e);
- }
- }
- }
-
- private void requestCompleteState(PeerConnection nodeConnection)
- {
- CompletableFuture> objectListFuture = nodeConnection.makeRequest(new ObjectListRequest(Set.of(
- ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
- ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE,
- ObjectStatements.ObjectType.OBJECT_TYPE_PEER)));
+ sharedFileManager.addSharedFiles(sharedFiles);
}
- private void addHostAddress(InetAddress address)
+ private void shutdown()
{
- String host = address.getCanonicalHostName();
- Peer selfNode = model.getSelfPeer();
try
{
- URI uri = new URI("tcp", null, host, tcpPort, null, null, null);
- System.out.println("Added local address " + uri);
- selfNode.addAddress(uri);
- } catch (URISyntaxException e)
+ fuseAccess.umount();
+ connectionManager.shutdown();
+ executor.shutdown();
+ System.out.println("Waiting 10 seconds to complete tasks...");
+ boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
+
+ if (!terminated)
+ {
+ System.out.println("Timed out, ending tasks now. Goodbye!");
+ executor.shutdownNow();
+ }
+ else
+ {
+ System.out.println("Finished everything. Goodbye!");
+ }
+ } catch (Exception e)
{
throw new RuntimeException(e);
}
}
- private void setupIGP() throws InterruptedException
- {
- try
- {
- // Start gateways
- Gateway network = NetworkGateway.create();
- Gateway process = ProcessGateway.create();
- Bus networkBus = network.getBus();
- Bus processBus = process.getBus();
-
- // Discover port forwarding devices and take the first one found
- System.out.println("Discovering port mappers...");
- List mappers = PortMapperFactory.discover(networkBus, processBus);;
- PortMapper mapper = mappers.getFirst();
- System.out.println("Got mapper " + mapper + ", mapping port...");
-
- MappedPort mappedPort = mapper.mapPort(PortType.TCP, tcpPort, tcpPort, 60);
- System.out.println("Port mapping added: " + mappedPort);
-
- addHostAddress(mappedPort.getExternalAddress());
- }
- catch (IllegalStateException ex)
- {
- System.err.println("Failed to map port! error=" + ex.getMessage());
- ex.printStackTrace(System.err);
- }
- }
-
- private void shutdown() throws IOException
- {
- fuseAccess.umount();
- connectionManager.shutdown();
- executor.shutdown();
- System.out.println("Waiting 10 seconds to complete tasks...");
- boolean terminated = false;
- try
- {
- terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
- } catch (InterruptedException e)
- {
- throw new RuntimeException(e);
- }
- if (!terminated)
- {
- System.out.println("Timed out, ending tasks now. Goodbye!");
- executor.shutdownNow();
- }
- else
- {
- System.out.println("Finished everything. Goodbye!");
- }
- }
-
public static Main getInstance()
{
return instance;
@@ -317,13 +188,80 @@ public class Main
return connectionManager;
}
- public Model getModel()
+ public Controller getModel()
{
- return model;
+ return controller;
}
public NotificationManager getNotificationManager()
{
return notificationManager;
}
+
+ public Controller getController()
+ {
+ return controller;
+ }
+
+ public Args getArgs()
+ {
+ return args;
+ }
+
+ public SharedFileManager getSharedFileManager()
+ {
+ return sharedFileManager;
+ }
+
+ @SuppressWarnings("FieldCanBeLocal")
+ public static class Args
+ {
+ @Parameter(names="-share")
+ private List sharedFilePaths = new ArrayList<>();
+
+ @Parameter(names="-known-peer")
+ private List knownPeers = new ArrayList<>();
+
+ @Parameter(names="-tcp-port")
+ private int tcpPort = 7777;
+
+ @Parameter(names="-no-upnp")
+ private boolean noUpnp = false;
+
+ @Parameter(names="-create-network")
+ private boolean createNetwork = false;
+
+ @Parameter(names = "-storage")
+ private String storageLocation = ".";
+
+ public List getSharedFilePaths()
+ {
+ return sharedFilePaths;
+ }
+
+ public List getKnownPeers()
+ {
+ return knownPeers;
+ }
+
+ public int getTcpPort()
+ {
+ return tcpPort;
+ }
+
+ public boolean isNoUpnp()
+ {
+ return noUpnp;
+ }
+
+ public boolean isCreateNetwork()
+ {
+ return createNetwork;
+ }
+
+ public String getStorageLocation()
+ {
+ return storageLocation;
+ }
+ }
}
\ No newline at end of file
diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java
index aee582f..812a8d9 100644
--- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java
+++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java
@@ -1,5 +1,6 @@
package moe.nekojimi.friendcloud;
+import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.Storable;
@@ -7,7 +8,9 @@ import moe.nekojimi.friendcloud.storage.Storable;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
import java.util.*;
+import java.util.stream.Collectors;
public class ObjectChangeRecord implements Storable
{
@@ -16,11 +19,14 @@ public class ObjectChangeRecord implements Storable
private ObjectID creatorPeer;
private Set changeHeads = new HashSet<>();
private Set changes = new HashSet<>();
+ private Instant creationTime;
public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage)
{
- ObjectChangeRecord record = new ObjectChangeRecord(); // TODO: decode creator
+ ObjectChangeRecord record = new ObjectChangeRecord();
record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList());
+ record.creatorPeer = new ObjectID(objectChangeMessage.getCreatorId());
+ record.creationTime = Instant.ofEpochMilli(objectChangeMessage.getTimestampMs());
for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList())
{
record.changes.add(Change.createFromObjectChange(objectChange));
@@ -29,16 +35,18 @@ public class ObjectChangeRecord implements Storable
long specifiedID = objectChangeMessage.getChangeId();
if (calculatedID != specifiedID)
{
- System.err.println("WARNING: didn't decode change ID correctly!");
+ throw new RuntimeException("Failed to verify change ID! specified=" + Long.toHexString(specifiedID) + " != calculated=" + Long.toHexString(calculatedID));
}
return record;
}
- public static ObjectChangeRecord createFromChanges(ObjectID creator, Set changes)
+ public static ObjectChangeRecord createFromChanges(ObjectID creator, Set changeHeads, Set changes)
{
ObjectChangeRecord record = new ObjectChangeRecord();
record.creatorPeer = creator;
+ record.creationTime = Instant.now();
record.changes.addAll(changes);
+ record.changeHeads = changeHeads;
return record;
}
@@ -59,6 +67,8 @@ public class ObjectChangeRecord implements Storable
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
builder.setChangeId(getChangeID());
builder.addAllChangeHeads(changeHeads);
+ builder.setCreatorId(creatorPeer.toLong());
+ builder.setTimestampMs(creationTime.toEpochMilli());
for (Change change : changes)
{
builder.addChanges(change.buildObjectChange());
@@ -71,26 +81,43 @@ public class ObjectChangeRecord implements Storable
{
return Map.of("changeHeads", changeHeads,
"changes", changes,
- "creator", creatorPeer.toLong());
+ "creator", creatorPeer.toLong(),
+ "creationTime", creationTime.toEpochMilli()
+ );
}
+ @SuppressWarnings("unchecked")
@Override
public void updateFromStateMap(Map map)
{
changeHeads = new HashSet<>((Collection) map.get("changeHeads"));
changes = new HashSet<>((Collection) map.get("changes"));
creatorPeer = new ObjectID((Long) map.get("creator"));
+ creationTime = Instant.ofEpochMilli((Long) map.get("creationTime"));
+ }
+
+ public void applyToLocalState()
+ {
+ for (Change change: changes)
+ {
+ NetworkObject object = Main.getInstance().getModel().getObject(change.objectID);
+ if (object == null)
+ object = Main.getInstance().getController().createObjectByID(change.objectID);
+ object.updateFromChange(change);
+ Main.getInstance().getModel().objectChanged(object);
+ }
}
public String toString()
{
StringBuilder sb = new StringBuilder();
- for (long changeHeadId: changeHeads)
+ sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";");
+ for (long changeHeadId: changeHeads.stream().sorted().toList())
{
sb.append(changeHeadId).append(",");
}
sb.append(";");
- for (Change change: changes)
+ for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList())
{
sb.append(change.toString()).append(";");
}
@@ -107,7 +134,9 @@ public class ObjectChangeRecord implements Storable
{
throw new RuntimeException(e);
}
- byte[] bytes = digest.digest(toString().getBytes(StandardCharsets.UTF_8));
+ String stringVal = toString();
+ byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8));
+ // System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal);
return Util.xorBytesToLong(bytes);
}
@@ -127,58 +156,139 @@ public class ObjectChangeRecord implements Storable
return changeHeads;
}
- public record Change(ObjectID objectID, Map beforeValues, Map afterValues)
+ public static final class Change
+ {
+ private final ObjectID objectID;
+ private final Map beforeValues;
+ private final Map afterValues;
+
+ public Change(ObjectID objectID, Map beforeValues, Map afterValues)
{
-
- public static Change createFromObjectChange(ObjectStatements.ObjectChange change)
- {
- return new Change(new ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap());
- }
-
- public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after)
- {
- Map beforeValues = new HashMap<>();
- Map afterValues = new HashMap<>();
- for (String key : after.getValuesMap().keySet())
- {
- String beforeValue = before.getValuesOrDefault(key, null);
- String afterValue = after.getValuesOrDefault(key, null);
- if (!afterValue.equals(beforeValue))
- {
- beforeValues.put(key, beforeValue);
- afterValues.put(key, afterValue);
- }
- }
- if (!afterValues.isEmpty())
- {
- return new Change(new ObjectID(before.getObjectId()), beforeValues, afterValues);
- }
- return null;
- }
-
- public String toString()
- {
- StringBuilder sb = new StringBuilder();
- sb.append(objectID.toLong()).append(";"); // The object ID, then ;
- // now all key-value pairs in alphabetical order
- List keys = new ArrayList<>(beforeValues.keySet());
- Collections.sort(keys);
- for (String key : keys)
- {
- sb.append(key).append(":").append(afterValues.get(key));
- }
- sb.append(";");
- return sb.toString();
- }
-
-
- public ObjectStatements.ObjectChange.Builder buildObjectChange()
- {
- ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();
- builder.putAllBefore(beforeValues);
- builder.putAllAfter(afterValues);
- builder.setObjectId(objectID.toLong());
- return builder;
- }
+ this.objectID = objectID;
+ this.beforeValues = beforeValues;
+ this.afterValues = afterValues;
}
+
+ public static Change createFromObjectChange(ObjectStatements.ObjectChange change)
+ {
+ return new Change(new ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap());
+ }
+
+ public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after)
+ {
+ Map beforeValues = new HashMap<>();
+ Map afterValues = new HashMap<>();
+ for (String key : after.getValuesMap().keySet())
+ {
+ String beforeValue = before.getValuesOrDefault(key, "");
+ String afterValue = after.getValuesOrDefault(key, "");
+ if (!afterValue.equals(beforeValue))
+ {
+ beforeValues.put(key, beforeValue);
+ afterValues.put(key, afterValue);
+ }
+ }
+ if (!afterValues.isEmpty())
+ {
+ return new Change(new ObjectID(after.getObjectId()), beforeValues, afterValues);
+ }
+ return null;
+ }
+
+ public String toString()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(objectID).append(";"); // The object ID, then ;
+ // now all key-value pairs in alphabetical order
+ Set keySet = new HashSet<>(beforeValues.keySet());
+ keySet.addAll(afterValues.keySet());
+ for (String key : keySet.stream().sorted().toList())
+ {
+ sb.append(key).append(":")
+ .append(beforeValues.getOrDefault(key,""))
+ .append("->")
+ .append(afterValues.getOrDefault(key,""))
+ .append(",");
+ }
+ return sb.toString();
+ }
+
+
+ public ObjectStatements.ObjectChange.Builder buildObjectChange()
+ {
+ ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();
+ builder.putAllBefore(Util.mapWithoutNullValues(beforeValues));
+ builder.putAllAfter(Util.mapWithoutNullValues(afterValues));
+ builder.setObjectId(objectID.toLong());
+ return builder;
+ }
+
+ public ObjectID objectID()
+ {
+ return objectID;
+ }
+
+ public Map beforeValues()
+ {
+ return beforeValues;
+ }
+
+ public Map afterValues()
+ {
+ return afterValues;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == this) return true;
+ if (obj == null || obj.getClass() != this.getClass()) return false;
+ var that = (Change) obj;
+ return Objects.equals(this.objectID, that.objectID) &&
+ Objects.equals(this.beforeValues, that.beforeValues) &&
+ Objects.equals(this.afterValues, that.afterValues);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(objectID, beforeValues, afterValues);
+ }
+
+ }
+
+ public static List partiallySort(Set changes)
+ {
+ LinkedList ret = new LinkedList<>();
+
+ Map idMap = new HashMap<>();
+ for (ObjectChangeRecord record : changes)
+ idMap.put(record.getChangeID(), record);
+ Set pointedIds = changes.stream()
+ .flatMap((ObjectChangeRecord objectChangeRecord1) -> objectChangeRecord1.getChangeHeads().stream())
+ .collect(Collectors.toSet());
+ Set group = changes.stream()
+ .filter(objectChangeRecord -> !pointedIds.contains(objectChangeRecord.getChangeID()))
+ .collect(Collectors.toSet());
+
+ while (!group.isEmpty())
+ {
+ // add all members of the group to the start of the list
+ for (ObjectChangeRecord record : group)
+ {
+ ret.addFirst(record);
+ }
+ // replace the group with all the objects pointed to by the group
+ group = group.stream()
+ .flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream())
+ .map(idMap::get)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ }
+
+ System.out.println("ObjectChangeRecord: Partially sorted changes: " + changes);
+ System.out.println("\tInto list: " + ret);
+ return ret;
+
+ }
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java
index 0ebefb9..3e1fe93 100644
--- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java
+++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java
@@ -1,47 +1,77 @@
package moe.nekojimi.friendcloud;
+import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.tasks.PropagateMessageTask;
+import moe.nekojimi.friendcloud.tasks.PullChangesTask;
-import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
-public class ObjectChangeTransaction implements Closeable
+public class ObjectChangeTransaction implements AutoCloseable
{
private final ObjectID creator;
private final ConnectionManager connectionManager;
private final Map beforeStates = new HashMap<>();
+ private static ObjectChangeTransaction currentTransaction = null;
+ private int openCount = 0;
private boolean ended = false;
- ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator)
+ private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator)
{
this.creator = creator;
this.connectionManager = connectionManager;
- }
- public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer, ObjectID... objects)
- {
- ObjectChangeTransaction builder = new ObjectChangeTransaction(connectionManager, creatorPeer);
- for (ObjectID id : objects)
+ System.out.println("ObjectChangeTransaction: opening transaction");
+
+ // attempt to pull changes from the network
+ Future> future = Main.getInstance().getExecutor().submit(new PullChangesTask());
+ try
{
- builder.addObjectBeforeChange(id);
+ future.get(10, TimeUnit.SECONDS);
+ } catch (InterruptedException | TimeoutException e)
+ {
+ // this is fine
+// throw new RuntimeException(e);
+ } catch (ExecutionException e)
+ {
+ e.printStackTrace(System.err);
+ throw new RuntimeException(e);
}
- return builder;
}
- public ObjectChangeTransaction addObjectBeforeChange(ObjectID id)
+ public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer)
{
- NetworkObject object = Main.getInstance().getModel().getObject(id);
- if (object != null)
- beforeStates.put(id, object.buildObjectState().build());
- return this;
+ if (currentTransaction == null)
+ currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer);
+ currentTransaction.increaseOpenCount();
+ return currentTransaction;
+ }
+
+ private void increaseOpenCount()
+ {
+ openCount++;
+ }
+
+ public void addObjectBeforeChange(NetworkObject object)
+ {
+ beforeStates.putIfAbsent(object.getObjectID(), object.buildObjectState().build());
+ }
+
+ public void addNewlyCreatedObject(NetworkObject object)
+ {
+ beforeStates.putIfAbsent(object.getObjectID(), ObjectStatements.ObjectState.getDefaultInstance());
}
public ObjectChangeRecord endTransaction()
@@ -53,21 +83,43 @@ public class ObjectChangeTransaction implements Closeable
for (Map.Entry entry : beforeStates.entrySet())
{
- ObjectStatements.ObjectState afterState = Main.getInstance().getModel().getObject(entry.getKey()).buildObjectState().build();
+ NetworkObject object = Main.getInstance().getModel().getObject(entry.getKey());
+ ObjectStatements.ObjectState afterState = object.buildObjectState().build();
ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState);
- changes.add(change);
+ if (change != null)
+ {
+ Main.getInstance().getModel().objectChanged(object);
+ changes.add(change);
+ }
}
- return ObjectChangeRecord.createFromChanges(creator, changes);
+ if (changes.isEmpty())
+ return null;
+ return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes);
+ }
+
+ public void commit()
+ {
+ // TODO: make this actually perform the changes and mark the transaction as complete; if the transaction closes before commit() make it roll back the changes
}
@Override
public void close() throws IOException
{
+ openCount--;
+ if (openCount > 0)
+ return; // still open elsewhere
// end the transaction and get the change object
ObjectChangeRecord objectChangeRecord = endTransaction();
+ currentTransaction = null;
+ if (objectChangeRecord == null)
+ {
+ System.out.println("ObjectChangeTransaction: closing transaction, no changes.");
+ return;
+ }
+ System.out.println("ObjectChangeTransaction: closing transaction, submitting change record " + Long.toHexString(objectChangeRecord.getChangeID()));
// add the new change to the model
- Main.getInstance().getModel().addChangeRecord(objectChangeRecord);
+ Main.getInstance().getModel().setCurrentChangeRecord(objectChangeRecord);
// create a task to propagate the change to other peers
Main.getInstance().getExecutor().submit(new PropagateMessageTask(objectChangeRecord.buildObjectChangeMessage().build()));
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/SharedDirectory.java b/src/main/java/moe/nekojimi/friendcloud/SharedDirectory.java
new file mode 100644
index 0000000..405ab1b
--- /dev/null
+++ b/src/main/java/moe/nekojimi/friendcloud/SharedDirectory.java
@@ -0,0 +1,55 @@
+package moe.nekojimi.friendcloud;
+
+import moe.nekojimi.friendcloud.objects.ObjectID;
+import moe.nekojimi.friendcloud.storage.Storable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class SharedDirectory implements Storable
+{
+ private final long storageID;
+ private File directory;
+ private ObjectID networkFolderID;
+
+ public SharedDirectory(long storageID)
+ {
+ this.storageID = storageID;
+ }
+
+ @Override
+ public long getStorageID()
+ {
+ return storageID;
+ }
+
+ @Override
+ public Map getStateMap()
+ {
+ String canonicalPath;
+ try
+ {
+ canonicalPath = directory.getCanonicalPath();
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return Map.of("directory", canonicalPath, "networkFolderID", networkFolderID.toLong());
+ }
+
+ @Override
+ public void updateFromStateMap(Map map)
+ {
+ }
+
+ public File getDirectory()
+ {
+ return directory;
+ }
+
+ public ObjectID getNetworkFolderID()
+ {
+ return networkFolderID;
+ }
+}
diff --git a/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java b/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java
new file mode 100644
index 0000000..0dab24e
--- /dev/null
+++ b/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java
@@ -0,0 +1,112 @@
+package moe.nekojimi.friendcloud;
+
+import engineering.swat.watch.Watch;
+import moe.nekojimi.friendcloud.objects.NetworkFile;
+import moe.nekojimi.friendcloud.objects.NetworkObject;
+import moe.nekojimi.friendcloud.protos.ObjectStatements;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+public class SharedFileManager
+{
+ private final List watches = new ArrayList<>();
+
+ public void addSharedFiles(Set files)
+ {
+ if (files.isEmpty())
+ return;
+ Controller controller = Main.getInstance().getModel();
+ try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID()))
+ {
+ List knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
+
+ for (NetworkObject knownFile: knownFiles)
+ {
+ NetworkFile f = (NetworkFile) knownFile;
+ boolean removed = files.remove(f.getLocalFile());
+ if (removed)
+ System.out.println("Identified known local file " + f.getObjectID() + " = " + f.getLocalFile());
+ }
+
+ for (File sharedFile: files)
+ {
+ if (sharedFile.exists())
+ {
+ System.out.println("Adding shared network file: " + sharedFile.getAbsolutePath());
+
+ NetworkFile networkFile = controller.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FILE);
+ transaction.addObjectBeforeChange(networkFile);
+ networkFile.updateFromLocalFile(sharedFile);
+ controller.objectChanged(networkFile);
+ }
+ }
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void scanForFileChanges()
+ {
+ List objects = Main.getInstance().getModel().listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
+ scanForFileChanges(objects);
+ }
+
+ public void scanForFileChanges(List objects)
+ {
+ // check every NetworkFile to see if any are different from our records
+ for (NetworkObject object: objects)
+ {
+ NetworkFile file = (NetworkFile) object;
+ if (!file.hasLocalFile())
+ continue;
+ File localFile = file.getLocalFile();
+ boolean changed = (localFile.length() != file.getSize());
+ if (!changed)
+ {
+ Util.HashOutput hashOutput = Util.hashFile(localFile, file.getPieceSize());
+ changed = ! Arrays.equals(hashOutput.totalDigest, file.getHash());
+ }
+ }
+ }
+
+ public void scanForDirectoryChanges()
+ {
+ List directories = Main.getInstance().getModel().getDataStore().getDAOForClass(SharedDirectory.class).getAll();
+ for (SharedDirectory directory: directories)
+ {
+ for (File file : Objects.requireNonNull(directory.getDirectory().listFiles()))
+ {
+
+ }
+ }
+ }
+
+ public boolean startWatchingFiles()
+ {
+// if (watchService == null)
+// {
+// try
+// {
+// watchService = FileSystems.getDefault().newWatchService();
+// } catch (IOException e)
+// {
+// e.printStackTrace(System.err);
+// return false;
+// }
+// }
+ throw new UnsupportedOperationException("NYI");
+ }
+
+ private void filesChanged(Collection changed)
+ {
+
+ }
+
+ private void newFiles(Collection newFiles)
+ {
+
+ }
+}
diff --git a/src/main/java/moe/nekojimi/friendcloud/Util.java b/src/main/java/moe/nekojimi/friendcloud/Util.java
index f000a5f..5a74c3e 100644
--- a/src/main/java/moe/nekojimi/friendcloud/Util.java
+++ b/src/main/java/moe/nekojimi/friendcloud/Util.java
@@ -1,18 +1,27 @@
package moe.nekojimi.friendcloud;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
public class Util
{
public static long xorBytesToLong(byte[] bytes)
{
- ByteBuffer buf = ByteBuffer.wrap(bytes);
+ ByteBuffer buf = ByteBuffer.allocate(bytes.length);
+ buf.put(bytes);
+ buf.rewind();
LongBuffer longs = buf.asLongBuffer();
long ret = 0xBEEFCAFEF00DBABEL;
- for (long l: longs.array())
+ while (longs.hasRemaining())
{
- ret = ret ^ l;
+ ret = ret ^ longs.get();
}
return ret;
}
@@ -20,13 +29,98 @@ public class Util
public static long unconditionalNumberToLong(Object number)
{
assert (number instanceof Number);
- return ((Number)number).longValue();
+ return ((Number) number).longValue();
+ }
+
+ public static HashOutput hashFile(File file)
+ {
+ return hashFile(file, 0x100000);
+ }
+
+ public static HashOutput hashFile(File file, long pieceSize)
+ {
+ HashOutput ret = new HashOutput();
+ System.out.println("Calculating hashes for file " + file.getName() + "(Piece size: " + pieceSize + ")");
+ try (FileInputStream input = new FileInputStream(file))
+ {
+ MessageDigest totalDigest = MessageDigest.getInstance("SHA-256");
+ byte[] pieceBuf = new byte[Math.toIntExact(pieceSize)];
+ int pieceIdx = 0;
+ while (true)
+ {
+ int bytesRead = input.read(pieceBuf);
+ if (bytesRead <= 0)
+ break;
+
+ // check to see if this piece is just zeroes, if so, assume it's a missing piece
+ boolean allZero = true;
+ for (byte b: pieceBuf)
+ {
+ if (b != 0)
+ {
+ allZero = false;
+ break;
+ }
+ }
+ ret.pieces.set(pieceIdx, !allZero);
+
+ MessageDigest pieceDigest = MessageDigest.getInstance("SHA-256");
+ pieceDigest.update(pieceBuf, 0, bytesRead);
+ ret.pieceDigests.add(pieceDigest.digest());
+ totalDigest.update(pieceBuf, 0, bytesRead);
+
+ pieceIdx++;
+ }
+ ret.totalDigest = totalDigest.digest();
+ System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest));
+ long pieceCount = file.length() / pieceSize;
+ System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces.");
+ return ret;
+ } catch (NoSuchAlgorithmException | IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static class HashOutput
+ {
+ public byte[] totalDigest;
+ public List pieceDigests = new ArrayList<>();
+ public BitSet pieces = new BitSet();
}
public static double unconditionalNumberToDouble(Object number)
{
assert (number instanceof Number);
- return ((Number)number).doubleValue();
+ return ((Number) number).doubleValue();
}
-}
+ public static Map stringifyMap(Map,?> map)
+ {
+ Map ret = new HashMap<>();
+ for (Map.Entry, ?> e : map.entrySet())
+ {
+ ret.put(e.getKey().toString(), e.getValue().toString());
+ }
+ return ret;
+ }
+
+ public static Map mapWithoutNullValues(Map map)
+ {
+ Map ret = new HashMap<>();
+ for (Map.Entry e: map.entrySet())
+ {
+ if (e.getKey() != null && e.getValue() != null)
+ ret.put(e.getKey(),e.getValue());
+ }
+ return ret;
+ }
+
+ public static CompletableFuture> collectFutures(Collection> futures)
+ {
+ // TODO: should handle timeouts / CancellationException
+ return CompletableFuture
+ .allOf(futures.toArray(new CompletableFuture[0]))
+ .thenApply(unused -> futures.stream().map(CompletableFuture::join).toList());
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java b/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java
index aeca1c3..a1bd1fd 100644
--- a/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java
+++ b/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java
@@ -12,7 +12,9 @@ import ru.serce.jnrfuse.FuseStubFS;
import ru.serce.jnrfuse.struct.FileStat;
import ru.serce.jnrfuse.struct.FuseFileInfo;
+import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
public class FUSEAccess extends FuseStubFS
@@ -27,14 +29,15 @@ public class FUSEAccess extends FuseStubFS
@Override
public int readdir(String path, Pointer buf, FuseFillDir filter, long offset, FuseFileInfo fi)
{
- System.out.println("FUSE: listing contents of directory " + path);
+ List networkFSNodes = Main.getInstance().getModel().listFSNodes(path);
+ System.out.println("FUSE: listing contents of directory " + path + ": " + networkFSNodes.size() + " nodes.");
int ret = 0;
filter.apply(buf, ".", null, 0);
filter.apply(buf, "..", null, 0);
// filter.apply(buf,"hello", null, 0);
- for (NetworkFSNode fsNode : Main.getInstance().getModel().listFSNodes(path))
+ for (NetworkFSNode fsNode : networkFSNodes)
{
filter.apply(buf, fsNode.getName(), null, 0);
}
@@ -42,6 +45,15 @@ public class FUSEAccess extends FuseStubFS
return ret;
}
+ @Override
+ public int mkdir(String path, long mode)
+ {
+ NetworkFolder networkFolder = Main.getInstance().getModel().makeFSFolder(path);
+ if (networkFolder == null)
+ return -ErrorCodes.EIO();
+ return 0;
+ }
+
@Override
public int getattr(String path, FileStat stat)
{
@@ -156,4 +168,25 @@ public class FUSEAccess extends FuseStubFS
// System.out.println("FUSE: Read " + bytes.length + " bytes.");
}
}
+
+ @Override
+ public int rename(String oldpath, String newpath)
+ {
+ NetworkFSNode fsNode = Main.getInstance().getModel().getFSNode(oldpath);
+ if (fsNode == null)
+ {
+ System.err.println("FUSE: Failed to rename file " + oldpath + ": not found");
+ return -ErrorCodes.ENOENT();
+ }
+ try
+ {
+ System.out.println("FUSE: Renaming " + oldpath + " to " + newpath);
+ Main.getInstance().getModel().renameFSNode(fsNode, newpath);
+ return 0;
+ } catch (Exception e)
+ {
+ e.printStackTrace(System.err);
+ return -ErrorCodes.EIO();
+ }
+ }
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/network/ConnectionBackend.java b/src/main/java/moe/nekojimi/friendcloud/network/ConnectionBackend.java
new file mode 100644
index 0000000..77caa88
--- /dev/null
+++ b/src/main/java/moe/nekojimi/friendcloud/network/ConnectionBackend.java
@@ -0,0 +1,70 @@
+package moe.nekojimi.friendcloud.network;
+
+import moe.nekojimi.friendcloud.objects.ObjectID;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+
+public abstract class ConnectionBackend extends Thread
+{
+ private final ConnectionManager connectionManager;
+ protected final String uriScheme;
+ abstract List getURIs();
+
+ public ConnectionBackend(@NotNull String name, @NotNull String uriScheme, ConnectionManager connectionManager)
+ {
+ super(name);
+ this.uriScheme = uriScheme;
+ this.connectionManager = connectionManager;
+ }
+
+ @Override
+ public final void run()
+ {
+ super.run();
+
+ while(isListening())
+ {
+ try
+ {
+ PeerConnection connection = getConnection();
+ if (connection == null)
+ break;
+ connectionManager.receiveConnection(connection);
+ }
+ catch (InterruptedException ex)
+ {
+
+ }
+ catch (IOException ex)
+ {
+ break;
+ }
+ }
+ }
+
+ protected abstract ConnectionType makeConnection(URI uri, ObjectID peer);
+
+ protected URI makeURI(String hostPart, int port)
+ {
+ try
+ {
+ return new URI(uriScheme, null, hostPart, port, null, null, null);
+ } catch (URISyntaxException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ protected abstract ConnectionType getConnection() throws IOException, InterruptedException;
+ protected abstract boolean isListening();
+
+ public String getUriScheme()
+ {
+ return uriScheme;
+ }
+
+ public abstract void shutdown();
+}
diff --git a/src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java b/src/main/java/moe/nekojimi/friendcloud/network/ConnectionManager.java
similarity index 62%
rename from src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java
rename to src/main/java/moe/nekojimi/friendcloud/network/ConnectionManager.java
index ab7e645..e86c32a 100644
--- a/src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java
+++ b/src/main/java/moe/nekojimi/friendcloud/network/ConnectionManager.java
@@ -1,60 +1,34 @@
-package moe.nekojimi.friendcloud;
+package moe.nekojimi.friendcloud.network;
-import moe.nekojimi.friendcloud.network.PeerConnection;
-import moe.nekojimi.friendcloud.network.PeerTCPConnection;
+import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
import java.net.URI;
import java.util.*;
import java.util.function.Consumer;
-public class ConnectionManager extends Thread
+public class ConnectionManager
{
// private final Executor executor = new ThreadPoolExecutor()
- //TODO: move the TCP stuff to it's own thread, which sends NodeTCPConnections to this thread
-
- private final ServerSocket serverSocket;
-
+ private final Map> backends = new HashMap<>();
private final Set activeConnections = new HashSet<>();
-
private final Set> newConnectionConsumers = new HashSet<>();
- public ConnectionManager(int portNumber) throws IOException
+ public ConnectionManager()
{
- serverSocket = new ServerSocket(portNumber);
// serverSocket.bind(new InetSocketAddress());
}
- @Override
- public void run()
+ void receiveConnection(PeerConnection connection)
{
- super.run();
- while (!serverSocket.isClosed())
+ activeConnections.add(connection);
+ connection.start();
+
+ for (Consumer consumer: newConnectionConsumers)
{
- try
- {
- Socket socket = serverSocket.accept();
-
- System.out.println("TCP Connection Manager: accepted connection from " + socket.getRemoteSocketAddress());
-
- PeerTCPConnection nodeTCPConnection = new PeerTCPConnection(socket);
- activeConnections.add(nodeTCPConnection);
- nodeTCPConnection.start();
-
- for (Consumer consumer: newConnectionConsumers)
- {
- consumer.accept(nodeTCPConnection);
- }
- } catch (Exception e)
- {
- System.err.println("ConnectionManager experienced exception:" + e.getMessage());
- e.printStackTrace(System.err);
- }
+ consumer.accept(connection);
}
- System.err.println("ConnectionManager: thread dying!");
}
public PeerConnection getNodeConnection(URI uri) throws IOException
@@ -71,14 +45,13 @@ public class ConnectionManager extends Thread
return peerConnection;
}
- PeerConnection nodeConnection = null;
- if (Objects.equals(uri.getScheme(), "tcp"))
- {
- nodeConnection = new PeerTCPConnection(uri, peer);
- nodeConnection.start();
- }
+ ConnectionBackend> backend = backends.get(uri.getScheme());
+ PeerConnection nodeConnection = backend.makeConnection(uri, peer == null ? new ObjectID(0L) : peer.getObjectID());
if (nodeConnection != null)
+ {
+ nodeConnection.start();
activeConnections.add(nodeConnection);
+ }
return nodeConnection;
}
@@ -89,7 +62,7 @@ public class ConnectionManager extends Thread
System.out.println("ConnectionManager: trying to get connection to " + peer + " (have " + activeConnections.size() + " connections open)");
for (PeerConnection peerConnection: activeConnections)
{
- if (peerConnection.getNode().equals(peer))
+ if (peerConnection.getPeerID() != null && peerConnection.getPeerID().equals(peer.getObjectID()))
return peerConnection;
}
@@ -108,9 +81,10 @@ public class ConnectionManager extends Thread
return null;
}
- public void shutdown() throws IOException
+ public void shutdown()
{
- serverSocket.close();
+ for (ConnectionBackend> backend : backends.values())
+ backend.shutdown();
for (PeerConnection nc: activeConnections)
{
nc.shutdown();
@@ -131,6 +105,12 @@ public class ConnectionManager extends Thread
activeConnections.removeAll(deadConnections);
}
+ public void addConnectionBackend(ConnectionBackend> backend)
+ {
+ backends.put(backend.getUriScheme(), backend);
+ backend.start();
+ }
+
public void addNewConnectionConsumer(Consumer consumer)
{
newConnectionConsumers.add(consumer);
@@ -141,4 +121,11 @@ public class ConnectionManager extends Thread
newConnectionConsumers.remove(consumer);
}
+ public List getURIs()
+ {
+ return backends.values().stream()
+ .filter(ConnectionBackend::isListening)
+ .flatMap(connectionBackend -> connectionBackend.getURIs().stream())
+ .toList();
+ }
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java
index ed99ba7..e3765f0 100644
--- a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java
+++ b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java
@@ -1,12 +1,10 @@
package moe.nekojimi.friendcloud.network;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
+import com.google.protobuf.*;
import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.Main;
+import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
@@ -15,23 +13,24 @@ import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.network.requests.Request;
import moe.nekojimi.friendcloud.protos.PieceMessages;
+import moe.nekojimi.friendcloud.tasks.PullChangesTask;
+import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.net.URI;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
public abstract class PeerConnection extends Thread
{
private final Map> pendingRequests = new HashMap<>();
- private Peer peer;
+ private ObjectID peerID = new ObjectID(0);
private long nextMessageId = 1;
private final URI uri;
private long artificalDelayMs = 0;
+ private final Map> messageHandlers = new HashMap<>();
+
public PeerConnection()
{
this(null);
@@ -40,12 +39,13 @@ public abstract class PeerConnection extends Thread
public PeerConnection(URI uri)
{
this.uri = uri;
+ installDefaultMessageHandlers();
}
- public PeerConnection(URI uri, Peer peer)
+ public PeerConnection(URI uri, @NotNull ObjectID peerID)
{
this(uri);
- this.peer = peer;
+ this.peerID = peerID;
}
@Override
@@ -85,9 +85,10 @@ public abstract class PeerConnection extends Thread
private CommonMessages.FriendCloudMessage wrapMessage(Message message, CommonMessages.MessageHeader inReplyTo)
{
+ Peer localPeer = Main.getInstance().getModel().getLocalPeer();
CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder()
.setMessageId(nextMessageId)
- .setSenderId(Main.getInstance().getModel().getSelfPeer().getObjectID().toLong());
+ .setSenderId(localPeer != null ? localPeer.getObjectID().toLong() : 0L);
if (inReplyTo != null)
headerBuilder.setReplyToMessageId(inReplyTo.getMessageId());
@@ -114,6 +115,10 @@ public abstract class PeerConnection extends Thread
protected void messageReceived(@org.jetbrains.annotations.NotNull CommonMessages.FriendCloudMessage message)
{
CommonMessages.MessageHeader header = message.getHeader();
+ Any body = message.getBody();
+ long replyToMessageId = header.getReplyToMessageId();
+ ObjectID senderID = new ObjectID(header.getSenderId());
+ System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
try
{
try
@@ -130,21 +135,20 @@ public abstract class PeerConnection extends Thread
}
}
- ObjectID senderID = new ObjectID(header.getSenderId());
- if (peer == null)
- peer = Main.getInstance().getModel().getOrCreateObject(senderID);
- else
+ if (!senderID.isNull())
{
- if (!senderID.equals(peer.getObjectID()))
- throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU);
+ if (peerID.isNull())
+ {
+ System.out.println("PeerConnection: Identified sender as " + senderID);
+ peerID = senderID;
+ }
+ else
+ {
+ if (!senderID.equals(peerID))
+ throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU);
+ }
}
- Any body = message.getBody();
-
- long replyToMessageId = header.getReplyToMessageId();
- System.out.println("Received message! type=" + body.getTypeUrl() + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
-
-
if (replyToMessageId != 0)
{
if (pendingRequests.containsKey(replyToMessageId))
@@ -183,61 +187,21 @@ public abstract class PeerConnection extends Thread
throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
}
+ @SuppressWarnings({"rawtypes", "unchecked"})
private void handleUnsolicitedMessage(CommonMessages.MessageHeader header, Any body) throws IOException, ReplyWithErrorException
{
- if (body.is(ObjectStatements.ObjectListRequest.class))
+ String typeUrl = body.getTypeUrl();
+ if (messageHandlers.containsKey(typeUrl))
{
- ObjectStatements.ObjectListRequest objectListRequest = body.unpack(ObjectStatements.ObjectListRequest.class);
- List objects = Main.getInstance().getModel().listObjects(new HashSet<>(objectListRequest.getTypesList()));
-
- ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
- for (NetworkObject object : objects)
- {
- objectList.addStates(object.buildObjectState());
- }
-// System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList());
- sendMessage(wrapMessage(objectList.build(), header));
+ MessageHandler handler = messageHandlers.get(typeUrl);
+ assert (body.is(handler.clazz));
+ Message unpack = body.unpack((Class) handler.clazz);
+ handler.handle(header, unpack);
}
- else if (body.is(PieceMessages.FilePiecesRequestMessage.class))
+ else
{
- PieceMessages.FilePiecesRequestMessage filePiecesRequestMessage = body.unpack(PieceMessages.FilePiecesRequestMessage.class);
- if (filePiecesRequestMessage.getPieceMod() == 0)
- {
- replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
- }
-
- NetworkFile networkFile = (NetworkFile) Main.getInstance().getModel().getObject(new ObjectID(filePiecesRequestMessage.getFileId()));
- if (networkFile == null)
- {
- replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
- }
- assert networkFile != null;
- try (FilePieceAccess filePieceAccess = new FilePieceAccess(networkFile))
- {
- int startIndex = filePiecesRequestMessage.getStartPieceIndex();
- int endIndex = (filePiecesRequestMessage.getStartPieceIndex() + filePiecesRequestMessage.getPieceCount()) - 1;
- System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex);
- for (int index = startIndex; index <= endIndex; index += filePiecesRequestMessage.getPieceMod())
- {
- byte[] buffer = filePieceAccess.readPiece(index);
- if (buffer != null)
- {
- System.out.println("Replying to file piece request with piece " + index);
- PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
- .setPieceIndex(index)
- .setFileId(networkFile.getObjectID().toLong())
- .setData(ByteString.copyFrom(buffer))
- .build();
- sendMessage(wrapMessage(filePieceMessage, header));
- }
- else
- {
- System.err.println("Don't have requested piece " + index + "!");
- replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
- break;
- }
- }
- }
+ System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!");
+ replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header);
}
}
@@ -251,15 +215,162 @@ public abstract class PeerConnection extends Thread
pendingRequests.remove(replyToMessageId);
}
- public abstract void shutdown() throws IOException;
+ public abstract void shutdown();
- public synchronized Peer getNode()
+ public ObjectID getPeerID()
{
- return peer;
+ return peerID;
}
public synchronized URI getUri()
{
return uri;
}
+
+ public void installMessageHandler(MessageHandler handler)
+ {
+ String typeUrl = "type.googleapis.com/" + Internal.getDefaultInstance(handler.clazz).getDescriptorForType().getFullName();
+ messageHandlers.put(typeUrl, handler);
+// System.out.println("PeerConnection: Installed message handler for type " + typeUrl);
+ }
+
+ private void installDefaultMessageHandlers()
+ {
+ installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class)
+ {
+ @Override
+ protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException
+ {
+ List objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList()));
+
+ ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
+ for (NetworkObject object : objects)
+ {
+ objectList.addStates(object.buildObjectState());
+ }
+
+// System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList());
+ sendMessage(wrapMessage(objectList.build(), header));
+ }
+ });
+ installMessageHandler(new MessageHandler<>(PieceMessages.FilePiecesRequestMessage.class)
+ {
+ @Override
+ protected void handle(CommonMessages.MessageHeader header, PieceMessages.FilePiecesRequestMessage message) throws IOException
+ {
+ if (message.getPieceMod() == 0)
+ {
+ replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
+ }
+
+ NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId()));
+ if (networkFile == null)
+ {
+ replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
+ }
+ assert networkFile != null;
+ try (FilePieceAccess filePieceAccess = new FilePieceAccess(networkFile, FilePieceAccess.OpenMode.READ_ONLY))
+ {
+ int startIndex = message.getStartPieceIndex();
+ int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1;
+ System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex);
+ List indices = new ArrayList<>();
+ for (int index = startIndex; index <= endIndex; index += message.getPieceMod())
+ {
+ indices.add((long) index);
+ }
+ CommonMessages.MultiObjectConfirmationMessage multiObjectConfirmationMessage = CommonMessages.MultiObjectConfirmationMessage.newBuilder().addAllExpectedReturnId(indices).build();
+ sendMessage(wrapMessage(multiObjectConfirmationMessage, header));
+ for (Long index : indices)
+ {
+ byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index));
+ if (buffer != null)
+ {
+ System.out.println("Replying to file piece request with piece " + index);
+ PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
+ .setPieceIndex(Math.toIntExact(index))
+ .setFileId(networkFile.getObjectID().toLong())
+ .setData(ByteString.copyFrom(buffer))
+ .build();
+ sendMessage(wrapMessage(filePieceMessage, header));
+ }
+ else
+ {
+ System.err.println("Don't have requested piece " + index + "!");
+ replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
+ break;
+ }
+ }
+ }
+ }
+ });
+ installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectChangeRequest.class)
+ {
+ @Override
+ protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException
+ {
+ List changesSinceList = message.getChangesSinceList();
+ System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString));
+ Set changes = Main.getInstance().getModel().findChangesSince(changesSinceList);
+ if (changes == null)
+ {
+ replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header);
+ }
+ else
+ {
+ ObjectStatements.ObjectChangeListMessage.Builder reply = ObjectStatements.ObjectChangeListMessage.newBuilder();
+ for (ObjectChangeRecord change : changes)
+ {
+ reply.addChangeMessages(change.buildObjectChangeMessage());
+ }
+ System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes");
+ sendMessage(wrapMessage(reply.build(), header));
+ }
+ }
+ });
+ installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectChangeMessage.class)
+ {
+ @Override
+ protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message)
+ {
+ ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message);
+ Main.getInstance().getModel().applyChangeRecord(record);
+ }
+ });
+ installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class)
+ {
+ @Override
+ protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message)
+ {
+ List remoteChangeHeads = message.getCurrentChangeHeadsList();
+ boolean potentialNewChanges = false;
+ for (long remoteChangeHead : remoteChangeHeads)
+ {
+ boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
+ if (!exists)
+ {
+ potentialNewChanges = true;
+ break;
+ }
+ }
+ if (potentialNewChanges)
+ {
+ PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID)));
+ Main.getInstance().getExecutor().submit(task);
+ }
+ }
+ });
+ }
+
+ public abstract static class MessageHandler
+ {
+ private final Class clazz;
+
+ public MessageHandler(Class clazz)
+ {
+ this.clazz = clazz;
+ }
+
+ protected abstract void handle(CommonMessages.MessageHeader header, MessageType message) throws IOException;
+ }
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java b/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java
new file mode 100644
index 0000000..a3c629a
--- /dev/null
+++ b/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java
@@ -0,0 +1,125 @@
+package moe.nekojimi.friendcloud.network;
+
+import com.offbynull.portmapper.PortMapperFactory;
+import com.offbynull.portmapper.gateway.Bus;
+import com.offbynull.portmapper.gateway.Gateway;
+import com.offbynull.portmapper.gateways.network.NetworkGateway;
+import com.offbynull.portmapper.gateways.process.ProcessGateway;
+import com.offbynull.portmapper.mapper.MappedPort;
+import com.offbynull.portmapper.mapper.PortMapper;
+import com.offbynull.portmapper.mapper.PortType;
+import moe.nekojimi.friendcloud.Main;
+import moe.nekojimi.friendcloud.objects.ObjectID;
+import moe.nekojimi.friendcloud.objects.Peer;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class TCPConnectionBackend extends ConnectionBackend
+{
+ private final ServerSocket serverSocket;
+ private MappedPort mappedPort;
+
+ @Override
+ List getURIs()
+ {
+ List ret = new ArrayList<>();
+ ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort()));
+ ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort()));
+ if (mappedPort != null)
+ {
+ ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort()));
+ ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort()));
+ }
+ return ret;
+ }
+
+ public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException
+ {
+ super("TCP Listen Thread", "tcp", connectionManager);
+ serverSocket = new ServerSocket(port);
+
+// setupIGP(port);
+ }
+
+ private void setupIGP(int port)
+ {
+ try
+ { // Start gateways
+ Gateway network = NetworkGateway.create();
+ Gateway process = ProcessGateway.create();
+ Bus networkBus = network.getBus();
+ Bus processBus = process.getBus();
+
+ // Discover port forwarding devices and take the first one found
+ System.out.println("Discovering port mappers...");
+ List mappers = PortMapperFactory.discover(networkBus, processBus);
+ ;
+ PortMapper mapper = mappers.getFirst();
+ System.out.println("Got mapper " + mapper + ", mapping port...");
+
+ mappedPort = mapper.mapPort(PortType.TCP, port, port, 3600);
+ System.out.println("Port mapping added: " + mappedPort);
+
+ long refreshDelay = (long) (mappedPort.getLifetime() * 0.9);
+ Main.getInstance().getExecutor().scheduleWithFixedDelay(() -> {
+ try
+ {
+ System.out.println("Refreshing UPnP port mapping.");
+ mapper.refreshPort(mappedPort, mappedPort.getLifetime());
+ } catch (InterruptedException e)
+ {
+ e.printStackTrace(System.err);
+ }
+ }, refreshDelay, refreshDelay, TimeUnit.SECONDS);
+
+ } catch (InterruptedException | IllegalStateException ignored)
+ {
+ }
+ }
+
+ @Override
+ protected TCPPeerConnection makeConnection(URI uri, ObjectID peer)
+ {
+ try
+ {
+ return new TCPPeerConnection(uri, peer);
+ } catch (IOException e)
+ {
+ System.out.println("TCPConnectionBackend: failed to connect to " + uri + ": " + e.getMessage());
+// e.printStackTrace(System.err);
+ return null;
+ }
+ }
+
+ @Override
+ protected TCPPeerConnection getConnection() throws IOException
+ {
+ Socket socket = serverSocket.accept();
+ return new TCPPeerConnection(socket);
+ }
+
+ @Override
+ protected boolean isListening()
+ {
+ return !serverSocket.isClosed();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ try
+ {
+ serverSocket.close();
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java
similarity index 73%
rename from src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java
rename to src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java
index 8abe51a..04a3f0c 100644
--- a/src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java
+++ b/src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java
@@ -1,6 +1,6 @@
package moe.nekojimi.friendcloud.network;
-import moe.nekojimi.friendcloud.objects.Peer;
+import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import java.io.IOException;
@@ -9,19 +9,19 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
-public class PeerTCPConnection extends PeerConnection
+public class TCPPeerConnection extends PeerConnection
{
private final Socket socket;
private final int keepAliveTimeS = 300;
- public PeerTCPConnection(URI tcpURL, Peer peer) throws IOException
+ public TCPPeerConnection(URI tcpURL, ObjectID peer) throws IOException
{
super(tcpURL, peer);
socket = new Socket(tcpURL.getHost(), tcpURL.getPort());
System.out.println("TCP Connection: connected to " + tcpURL + " OK!");
}
- public PeerTCPConnection(Socket openSocket)
+ public TCPPeerConnection(Socket openSocket)
{
super();
socket = openSocket;
@@ -42,8 +42,6 @@ public class PeerTCPConnection extends PeerConnection
if (message != null)
{
- System.out.println("TCP Connection: read data");
-
messageReceived(message);
}
}
@@ -60,14 +58,21 @@ public class PeerTCPConnection extends PeerConnection
protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException
{
OutputStream outputStream = socket.getOutputStream();
- System.out.println("Sending message " + message.getHeader().getMessageId());
+ System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
message.writeDelimitedTo(outputStream);
outputStream.flush();
}
@Override
- public synchronized void shutdown() throws IOException
+ public synchronized void shutdown()
{
- socket.close();
+ try
+ {
+ socket.close();
+ } catch (IOException e)
+ {
+ System.err.println("TCPPeerConnection: failed to shut down!");
+ e.printStackTrace(System.err);
+ }
}
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java
index 14e3ea3..b1477ee 100644
--- a/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java
+++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java
@@ -4,13 +4,15 @@ import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.objects.NetworkFile;
+import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.PieceMessages;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
-public class FilePiecesRequest extends Request>
+public class FilePiecesRequest extends Request>
{
private final NetworkFile file;
private final int startPiece;
@@ -18,9 +20,9 @@ public class FilePiecesRequest extends Request expectedPieces = new ArrayList<>();
- private final List receivedPieces = new ArrayList<>();
+// private int expectedPieceCount = 0;
+ private Set expectedPieces = new HashSet<>();
+ private final Set receivedPieces = new HashSet<>();
public FilePiecesRequest(NetworkFile file, int startPiece, int pieceCount, int pieceMod)
{
@@ -33,7 +35,7 @@ public class FilePiecesRequest extends Request>
+public class ObjectChangeRequest extends Request>
{
private final Set changesSinceIDs;
+ private Set expectedIDs = new HashSet<>();
+ private final Set receivedIDs = new HashSet<>();
+ private final Set receivedRecords = new HashSet<>();
public ObjectChangeRequest(Set changesSinceIDs)
{
@@ -26,14 +30,35 @@ public class ObjectChangeRequest extends Request(msg.getExpectedReturnIdList());
+ }
+ else if (reply.is(ObjectStatements.ObjectChangeListMessage.class))
+ {
+ ObjectStatements.ObjectChangeListMessage msg = reply.unpack(ObjectStatements.ObjectChangeListMessage.class);
+ for (ObjectStatements.ObjectChangeMessage m : msg.getChangeMessagesList())
+ {
+ ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(m);
+ receivedRecords.add(objectChangeRecord);
+ }
+ future.complete(receivedRecords);
+ return true;
+ }
+
+ if (receivedIDs.equals(expectedIDs))
+ {
+ future.complete(receivedRecords);
return true;
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java
index d1bfe53..d8ed429 100644
--- a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java
+++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java
@@ -2,7 +2,6 @@ package moe.nekojimi.friendcloud.network.requests;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
-import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
@@ -44,10 +43,10 @@ public class ObjectListRequest extends Request
public abstract MessageType buildMessage();
+ /**
+ * Handle a message that was received in reply to the request message.
+ * @param reply the reply message. May be one of many.
+ * @return true if no further replies are expected (or able to be processed), false if more replies are coming.
+ * @throws InvalidProtocolBufferException if reply cannot be decoded.
+ */
public boolean handleReply(Any reply) throws InvalidProtocolBufferException
{
if (reply.is(CommonMessages.ErrorMessage.class))
{
CommonMessages.ErrorMessage errorMessage = reply.unpack(CommonMessages.ErrorMessage.class);
- future.completeExceptionally(new RuntimeException("Request received error response: " + errorMessage.getError().name()));
+ future.completeExceptionally(new RequestReceivedErrorException(errorMessage));
return true;
}
return false;
diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/RequestReceivedErrorException.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/RequestReceivedErrorException.java
new file mode 100644
index 0000000..83802bf
--- /dev/null
+++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/RequestReceivedErrorException.java
@@ -0,0 +1,19 @@
+package moe.nekojimi.friendcloud.network.requests;
+
+import moe.nekojimi.friendcloud.protos.CommonMessages;
+
+public class RequestReceivedErrorException extends Exception
+{
+ private final CommonMessages.ErrorMessage errorMessage;
+
+ public RequestReceivedErrorException(CommonMessages.ErrorMessage errorMessage)
+ {
+ super("Request received error:" + errorMessage.getError().name() + "; " + errorMessage.getText());
+ this.errorMessage = errorMessage;
+ }
+
+ public CommonMessages.ErrorMessage getErrorMessage()
+ {
+ return errorMessage;
+ }
+}
diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java
index c9ec1d3..ce0c255 100644
--- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java
+++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java
@@ -17,16 +17,16 @@ public abstract class NetworkFSNode extends NetworkObject
}
@Override
- public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state)
+ public synchronized void updateFromMessageMap(Map after, Map before)
{
- super.updateFromStateMessage(state);
- if (state.containsValues("name"))
- name = state.getValuesOrThrow("name");
- if (state.containsValues("parent"))
+ super.updateFromMessageMap(after,before);
+ if (after.containsKey("name"))
+ name = after.get("name");
+ if (after.containsKey("parent"))
{
- long parentID = Long.parseLong(state.getValuesOrThrow("parent"));
+ long parentID = Long.parseLong(after.get("parent"));
if (parentID != 0)
- parent = (NetworkFolder) Main.getInstance().getModel().getObject(new ObjectID(parentID));
+ parent = Main.getInstance().getModel().getObject(new ObjectID(parentID));
else
parent = null;
}
@@ -36,7 +36,8 @@ public abstract class NetworkFSNode extends NetworkObject
public ObjectStatements.ObjectState.Builder buildObjectState()
{
return super.buildObjectState()
- .putValues("name", getName());
+ .putValues("name", getName())
+ .putValues("parent", parent != null ? Long.toString(parent.getStorageID()) : "0");
}
@Override
@@ -52,7 +53,7 @@ public abstract class NetworkFSNode extends NetworkObject
public void updateFromStateMap(Map map)
{
name = map.get("name").toString();
- parent = (NetworkFolder) Main.getInstance().getModel().getObject(new ObjectID(((Number)map.get("parent")).longValue()));
+ parent = Main.getInstance().getModel().getObject(new ObjectID(((Number)map.get("parent")).longValue()));
}
public String getName()
@@ -65,8 +66,24 @@ public abstract class NetworkFSNode extends NetworkObject
this.name = name;
}
+ @Override
+ public String getFriendlyName()
+ {
+ return getName();
+ }
+
public String getNetworkPath()
{
return (parent != null ? parent.getNetworkPath() : "") + "/" + name;
}
+
+ public NetworkFolder getParent()
+ {
+ return parent;
+ }
+
+ public void setParent(NetworkFolder newParent)
+ {
+ parent = newParent;
+ }
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFile.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFile.java
index 9872ae1..2e90dfa 100644
--- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFile.java
+++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFile.java
@@ -5,11 +5,8 @@ import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.stream.Collectors;
@@ -43,7 +40,6 @@ public class NetworkFile extends NetworkFSNode
this.localFile = localFile;
name = localFile.getName();
size = localFile.length();
- pieceSize = MIN_PIECE_SIZE;
for (pieceSize = MIN_PIECE_SIZE; pieceSize < MAX_PIECE_SIZE; pieceSize *= 2)
{
long pieceCount = size / pieceSize;
@@ -51,81 +47,34 @@ public class NetworkFile extends NetworkFSNode
break;
}
- pieces = new BitSet(Math.toIntExact(getPieceCount()));
+ Util.HashOutput hashOutput = Util.hashFile(localFile, pieceSize);
+ pieces = hashOutput.pieces;
+ hash = hashOutput.totalDigest;
- long offset = 0L;
- System.out.println("Calculating hashes for file " + localFile.getName() + "(Piece size: " + pieceSize + ")");
- try (FileInputStream input = new FileInputStream(localFile))
+ System.out.println();
+ setLocalFile(localFile);
+ size = localFile.length();
+
+ if (pieces.cardinality() >= getPieceCount())
{
- MessageDigest totalDigest = MessageDigest.getInstance("SHA-256");
- byte[] pieceBuf = new byte[Math.toIntExact(pieceSize)];
- int pieceIdx = 0;
-// List pieces = new ArrayList<>();
- while(true)
- {
- int bytesRead = input.read(pieceBuf);
- if (bytesRead <= 0)
- break;
-
- // check to see if this piece is just zeroes, if so, assume it's a missing piece
- boolean allZero = true;
- for (byte b: pieceBuf)
- {
- if (b != 0)
- {
- allZero = false;
- break;
- }
- }
- pieces.set(pieceIdx, !allZero);
-
-// MessageDigest pieceDigest = MessageDigest.getInstance("SHA-256");
-// pieceDigest.update(pieceBuf, 0, bytesRead);
-// byte[] pieceHash = pieceDigest.digest();
- totalDigest.update(pieceBuf, 0, bytesRead);
-
-// FilePiece piece = new FilePiece(pieceHash, bytesRead, localFile, offset);
-// System.out.print(HexFormat.of().formatHex(pieceHash) + ", ");
-// pieces.add(piece);
-
- pieceIdx++;
- }
- System.out.println();
- setLocalFile(localFile);
- size = localFile.length();
-// setPieces(pieces);
- setHash(totalDigest.digest());
-
- System.out.println("Total hash: " + HexFormat.of().formatHex(hash));
- System.out.println("Have " + pieces.cardinality() + " of " + getPieceCount() + " pieces.");
-
- if (pieces.cardinality() >= getPieceCount())
- {
- peersWithCopy.add(Main.getInstance().getModel().getSelfPeer().getObjectID());
- }
-
- } catch (NoSuchAlgorithmException e)
- {
- throw new RuntimeException(e);
+ peersWithCopy.add(Main.getInstance().getModel().getLocalPeer().getObjectID());
}
}
@Override
- public void updateFromStateMessage(ObjectStatements.ObjectState state)
+ public synchronized void updateFromMessageMap(Map after, Map before)
{
- super.updateFromStateMessage(state);
-// if (state.containsValues("path"))
-// path = state.getValuesOrThrow("path");
- if (state.containsValues("size"))
- size = Long.parseLong(state.getValuesOrThrow("size"));
- if (state.containsValues("hash"))
- hash = HexFormat.of().parseHex(state.getValuesOrThrow("hash"));
- if (state.containsValues("pieceSize"))
- pieceSize = Long.parseLong(state.getValuesOrThrow("pieceSize"));
- if (state.containsValues("peersWithCopy"))
+ super.updateFromMessageMap(after, before);
+ if (after.containsKey("size"))
+ size = Long.parseLong(after.get("size"));
+ if (after.containsKey("hash"))
+ hash = HexFormat.of().parseHex(after.get("hash"));
+ if (after.containsKey("pieceSize"))
+ pieceSize = Long.parseLong(after.get("pieceSize"));
+ if (after.containsKey("peersWithCopy"))
{
peersWithCopy.clear();
- String[] peers = state.getValuesOrThrow("peersWithCopy").split(",");
+ String[] peers = after.get("peersWithCopy").split(",");
for (String peer: peers)
{
peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16)));
@@ -133,6 +82,7 @@ public class NetworkFile extends NetworkFSNode
}
}
+
@Override
public ObjectStatements.ObjectState.Builder buildObjectState()
{
@@ -157,6 +107,7 @@ public class NetworkFile extends NetworkFSNode
return ret;
}
+ @SuppressWarnings("unchecked")
@Override
public void updateFromStateMap(Map map)
{
@@ -220,12 +171,7 @@ public class NetworkFile extends NetworkFSNode
return pieceSize;
}
- private void setPieceSize(long pieceSize)
- {
- this.pieceSize = pieceSize;
- }
-
-// public List getPieces()
+ // public List getPieces()
// {
// return pieces;
// }
@@ -240,11 +186,6 @@ public class NetworkFile extends NetworkFSNode
return hash;
}
- private void setHash(byte[] hash)
- {
- this.hash = hash;
- }
-
public int getPieceCount()
{
return Math.toIntExact(Math.ceilDiv(size, pieceSize));
@@ -338,6 +279,11 @@ public class NetworkFile extends NetworkFSNode
}
}
+ public boolean hasLocalFile()
+ {
+ return localFile != null;
+ }
+
public enum StorageType
{
/** The file will be stored as a complete file in the storage directory under it's own name and file path. */
diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java
index 62e6b2d..7fd2a55 100644
--- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java
+++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java
@@ -1,5 +1,6 @@
package moe.nekojimi.friendcloud.objects;
+import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.Storable;
import org.jetbrains.annotations.NotNull;
@@ -17,6 +18,18 @@ public abstract class NetworkObject implements Storable, Comparable new Peer(objectID);
+ case OBJECT_TYPE_FILE -> new NetworkFile(objectID);
+ case OBJECT_TYPE_FOLDER -> new NetworkFolder(objectID);
+ default -> throw new IllegalArgumentException("Unrecognised object type!");
+ };
+ }
+
public ObjectID getObjectID()
{
return objectID;
@@ -28,7 +41,6 @@ public abstract class NetworkObject implements Storable, Comparable getStateMap()
{
@@ -37,15 +49,31 @@ public abstract class NetworkObject implements Storable, Comparable currentValues)
+ {
+ updateFromMessageMap(currentValues,Map.of());
+ }
+
+ protected synchronized void updateFromMessageMap(Map currentValues, Map beforeValues)
+ {
+
+ }
+
+ public synchronized final void updateFromStateMessage(ObjectStatements.ObjectState state)
{
if (state.getObjectId() != objectID.toLong())
throw new IllegalArgumentException("Wrong object!");
+ updateFromMessageMap(state.getValuesMap());
}
- public synchronized ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectState a, ObjectStatements.ObjectState b)
+ public synchronized ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectChange a, ObjectStatements.ObjectChange b)
{
- return null;
+ throw new UnsupportedOperationException("NYI");
}
public ObjectStatements.ObjectState.Builder buildObjectState()
@@ -73,4 +101,12 @@ public abstract class NetworkObject implements Storable, Comparable
return typePart | systemPart | uniquePart;
}
+ public static ObjectID nullValue()
+ {
+ return new ObjectID(0L);
+ }
+
public moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType getType()
{
return type;
@@ -72,4 +78,9 @@ public class ObjectID implements Comparable
{
return Long.compare(toLong(), objectID.toLong());
}
+
+ public boolean isNull()
+ {
+ return type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED;
+ }
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java b/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java
index 46d91c7..5702683 100644
--- a/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java
+++ b/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java
@@ -13,10 +13,6 @@ public class Peer extends NetworkObject
private String userName = "";
private String systemName = "";
-// private Map fileStates = new HashMap<>();
-
- private volatile int lastTriedAddressIdx = -1;
-
public Peer(ObjectID objectID)
{
super(objectID);
@@ -27,20 +23,19 @@ public class Peer extends NetworkObject
return userName + "@" + systemName;
}
- @Override
- public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state)
- {
- super.updateFromStateMessage(state);
+ @Override
+ protected synchronized void updateFromMessageMap(Map after, Map before)
+ {
+ super.updateFromMessageMap(after, before);
- Map values = state.getValuesMap();
- if (values.containsKey("userName"))
- userName = values.get("userName");
- if (values.containsKey("systemName"))
- systemName = values.get("systemName");
- if (values.containsKey("addresses"))
+ if (after.containsKey("userName"))
+ userName = after.get("userName");
+ if (after.containsKey("systemName"))
+ systemName = after.get("systemName");
+ if (after.containsKey("addresses"))
{
addresses.clear();
- String[] split = values.get("addresses").split(",");
+ String[] split = after.get("addresses").split(",");
for (String s: split)
{
try
@@ -52,8 +47,7 @@ public class Peer extends NetworkObject
}
}
}
-// if (values.containsKey("files"))
- }
+ }
@Override
public ObjectStatements.ObjectState.Builder buildObjectState()
@@ -104,6 +98,12 @@ public class Peer extends NetworkObject
// return fileStates;
// }
+ public void setAddresses(Collection urIs)
+ {
+ addresses.clear();
+ addresses.addAll(urIs);
+ }
+
public SortedSet getAddresses()
{
return addresses;
@@ -128,4 +128,10 @@ public class Peer extends NetworkObject
{
this.systemName = systemName;
}
+
+ @Override
+ public String getFriendlyName()
+ {
+ return getNodeName();
+ }
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java b/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java
deleted file mode 100644
index 56d61ad..0000000
--- a/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package moe.nekojimi.friendcloud.objects;
-
-import moe.nekojimi.friendcloud.Util;
-import moe.nekojimi.friendcloud.protos.ObjectStatements;
-
-import java.util.Map;
-
-public class PeerFileState extends NetworkObject
-{
- private ObjectID peerID;
- private ObjectID fileID;
-
- private double progress = 0;
-
- public PeerFileState(ObjectID objectID)
- {
- super(objectID);
- }
-
- @Override
- public void updateFromStateMessage(ObjectStatements.ObjectState state)
- {
- super.updateFromStateMessage(state);
- peerID = new ObjectID(Long.parseLong(state.getValuesOrThrow("peer")));
- fileID = new ObjectID(Long.parseLong(state.getValuesOrThrow("file")));
- if (state.containsValues("progress"))
- progress = Double.parseDouble(state.getValuesOrThrow("progress"));
- }
-
- @Override
- public ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectState a, ObjectStatements.ObjectState b)
- {
- return super.mergeChanges(a, b);
- }
-
- @Override
- public ObjectStatements.ObjectState.Builder buildObjectState()
- {
- return super.buildObjectState()
-// .putValues("peer", Long.toString(peer.getObjectID().toLong()))
-// .putValues("file", Long.toString(file.getObjectID().toLong()))
- .putValues("progress", Double.toString(progress));
- }
-
-// public void setNode(Peer peer)
-// {
-// this.peer = peer;
-// }
-//
-// public void setFile(NetworkFile file)
-// {
-// this.file = file;
-// }
-
- public double getProgress()
- {
- return progress;
- }
-
- public void setProgress(double progress)
- {
- this.progress = progress;
- }
-
-// public NetworkFile getFile()
-// {
-// return file;
-// }
-//
-// public Peer getNode()
-// {
-// return peer;
-// }
-
- @Override
- public Map getStateMap()
- {
- Map ret = super.getStateMap();
-// ret.put("peer", peer == null ? 0L : peer.getObjectID().toLong());
-// ret.put("file", file == null? 0L : file.getObjectID().toLong());
- ret.put("progress", progress);
- return ret;
- }
-
- @Override
- public void updateFromStateMap(Map map)
- {
-// peer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("peer"))));
-// file = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("file"))));
- progress = Util.unconditionalNumberToDouble( map.get("progress"));
- }
-}
diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/CachingDataStore.java b/src/main/java/moe/nekojimi/friendcloud/storage/CachingDataStore.java
index 6beb1ba..9ddc152 100644
--- a/src/main/java/moe/nekojimi/friendcloud/storage/CachingDataStore.java
+++ b/src/main/java/moe/nekojimi/friendcloud/storage/CachingDataStore.java
@@ -1,6 +1,5 @@
package moe.nekojimi.friendcloud.storage;
-import java.lang.reflect.Modifier;
import java.util.*;
public class CachingDataStore extends DataStore
@@ -16,6 +15,7 @@ public class CachingDataStore extends DataStore
public synchronized DAO getDAOForClass(Class clazz)
{
if (daos.containsKey(clazz))
+ //noinspection unchecked
return (DAO) daos.get(clazz);
else
{
@@ -25,29 +25,41 @@ public class CachingDataStore extends DataStore
}
}
+ @Override
+ public void clear()
+ {
+ daos.clear();
+ backend.clear();
+ }
+
@Override
public FSNodeDAO getFSDAO()
{
return backend.getFSDAO();
}
- private Map, CachingDAO>> daos = new HashMap<>();
+ private final Map, CachingDAO>> daos = new HashMap<>();
public class CachingDAO implements DAO
{
private final DAO backendDao;
- private WeakHashMap cache = new WeakHashMap<>();
+ private final WeakHashMap cache = new WeakHashMap<>();
public CachingDAO(Class clazz)
{
this.backendDao = backend.getDAOForClass(clazz);
}
+ @Override
+ public List list()
+ {
+ return backendDao.list();
+ }
+
@Override
public synchronized List getAll()
{
List ret = new ArrayList<>();
- ret.addAll(cache.values());
for (T t : backendDao.getAll())
{
if (!cache.containsKey(t.getStorageID()))
@@ -55,6 +67,10 @@ public class CachingDataStore extends DataStore
ret.add(t);
cache.put(t.getStorageID(), t);
}
+ else
+ {
+ ret.add(cache.get(t.getStorageID()));
+ }
}
return ret;
}
@@ -76,9 +92,9 @@ public class CachingDataStore extends DataStore
@Override
public T get(long id)
{
- T t = backendDao.get(id);
- cache.put(id, t);
- return t;
+ if (!cache.containsKey(id))
+ cache.put(id, backendDao.get(id));
+ return cache.get(id);
}
@Override
diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/DataStore.java b/src/main/java/moe/nekojimi/friendcloud/storage/DataStore.java
index 2b33099..b2dc902 100644
--- a/src/main/java/moe/nekojimi/friendcloud/storage/DataStore.java
+++ b/src/main/java/moe/nekojimi/friendcloud/storage/DataStore.java
@@ -3,7 +3,6 @@ package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.objects.NetworkFSNode;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -12,10 +11,13 @@ public abstract class DataStore
public abstract DAO getDAOForClass(Class clazz);
public abstract FSNodeDAO getFSDAO();
+ public abstract void clear();
+
public interface DAO
{
+ default boolean exists(long id) {return list().contains(id);}
+ default List list() {return getAll().stream().map(Storable::getStorageID).toList();}
List getAll();
- boolean exists(long id);
T create(long id);
T get(long id);
default T getOrCreate(long id)
@@ -47,6 +49,15 @@ public abstract class DataStore
this.subclasses = Set.of(subclasses);
}
+ @Override
+ public List list()
+ {
+ List ret = new ArrayList<>();
+ for (Class extends T> subclass : subclasses)
+ ret.addAll(getDAOForClass(subclass).list());
+ return ret;
+ }
+
@Override
public List getAll()
{
diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java b/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java
index 41cc4fd..b8f6820 100644
--- a/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java
+++ b/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java
@@ -64,6 +64,6 @@ public class LocalData implements Storable
localPeer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.getOrDefault("localPeer",0))));
currentChangeRecord = Main.getInstance().getModel().getChangeRecord(Util.unconditionalNumberToLong(map.getOrDefault("currentChangeRecord",0)));
systemID = (int) map.getOrDefault("systemID", 0);
- System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + currentChangeRecord + ", systemID=" + Integer.toHexString(systemID));
+ System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + Long.toHexString(currentChangeRecord.getChangeID()) + ", systemID=" + Integer.toHexString(systemID));
}
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/Model.java b/src/main/java/moe/nekojimi/friendcloud/storage/Model.java
deleted file mode 100644
index 9d982a9..0000000
--- a/src/main/java/moe/nekojimi/friendcloud/storage/Model.java
+++ /dev/null
@@ -1,202 +0,0 @@
-package moe.nekojimi.friendcloud.storage;
-
-import moe.nekojimi.friendcloud.ObjectChangeRecord;
-import moe.nekojimi.friendcloud.objects.*;
-import moe.nekojimi.friendcloud.protos.ObjectStatements;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-public class Model
-{
-
- private final CachingDataStore dataStore;
- private LocalData localData;
-
- public Model(DataStore dataStore)
- {
- this.dataStore = new CachingDataStore(dataStore);
- }
-
- public synchronized void init()
- {
-
- List localDataList = dataStore.getDAOForClass(LocalData.class).getAll();
- if (localDataList.isEmpty())
- {
- localData = dataStore.getDAOForClass(LocalData.class).create(0);
- }
- else if (localDataList.size() == 1)
- {
- localData = localDataList.getFirst();
- }
- else
- {
- throw new IllegalStateException("We have more than one LocalData somehow!!");
- }
- if (localData.getSystemID() == 0)
- {
- Random ran = new Random();
- localData.setSystemID(ran.nextInt() & 0x00FFFFFF);
- objectChanged(localData);
- }
- }
-
- public synchronized Peer getSelfPeer()
- {
- if (localData.getLocalPeer() == null)
- {
- localData.setLocalPeer(createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER));
- objectChanged(localData);
- }
- return localData.getLocalPeer();
- }
- // private Map nodes = new HashMap<>();
-
- public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type)
- {
- Random ran = new Random();
- int randomNumber = ran.nextInt();
- ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber);
- System.out.println("Assigned new object ID: " + objectID);
- return objectID;
- }
-
- public static Class extends NetworkObject> getNetworkObjectClassByType(ObjectStatements.ObjectType type)
- {
- return switch (type)
- {
- case OBJECT_TYPE_FILE -> NetworkFile.class;
- case OBJECT_TYPE_FOLDER -> NetworkFolder.class;
- case OBJECT_TYPE_PEER -> Peer.class;
- case OBJECT_TYPE_PEER_FILE_STATE -> PeerFileState.class;
- case OBJECT_TYPE_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("???");
- default -> throw new UnsupportedOperationException("NYI");
- };
- }
-
- public synchronized T createObjectByID(ObjectID id)
- {
- if (id.toLong() == 0)
- throw new IllegalArgumentException("Cannot create an object with ID=0!");
- ObjectStatements.ObjectType type = id.getType();
- System.out.println("Creating new object with type: " + type.name());
- if (type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED)
- throw new IllegalArgumentException();
- T ret = (T) dataStore.getDAOForClass(getNetworkObjectClassByType(type)).create(id.toLong());
- return ret;
- }
-
- public synchronized T createObjectByType(ObjectStatements.ObjectType type)
- {
- return createObjectByID(getNextObjectID(type));
- }
-
- public synchronized T getObject(ObjectID id)
- {
- if (id.toLong() == 0)
- return null;
- Class clazz = (Class) getNetworkObjectClassByType(id.getType());
- return dataStore.getDAOForClass(clazz).get(id.toLong());
- }
-
- public synchronized T getOrCreateObject(ObjectID id)
- {
- if (id.toLong() == 0)
- return null;
- Class clazz = (Class) getNetworkObjectClassByType(id.getType());
- return dataStore.getDAOForClass(clazz).getOrCreate(id.toLong());
- }
-
- public synchronized List listObjects(Set types)
- {
- List ret = new ArrayList<>();
- Set> classes = types.stream().map(Model::getNetworkObjectClassByType).collect(Collectors.toSet());
- for (Class extends NetworkObject> clazz: classes)
- {
- List extends NetworkObject> list = dataStore.getDAOForClass(clazz).getAll();
- ret.addAll(list);
- }
- return ret;
- }
-
- public synchronized List listFSNodes(String path)
- {
- //TODO: dumbest algorithm in the world
-
- List ret = new ArrayList<>();
- for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
- {
- NetworkFSNode fsNode = (NetworkFSNode) object;
- String networkPath = fsNode.getNetworkPath();
- if (networkPath.substring(0, networkPath.lastIndexOf("/")+1).equals(path))
- ret.add(fsNode);
- }
- return ret;
- }
-
- public synchronized NetworkFSNode getFSNode(String path)
- {
- for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
- {
- NetworkFSNode fsNode = (NetworkFSNode) object;
- String networkPath = fsNode.getNetworkPath();
- if (networkPath.equals(path))
- return fsNode;
- }
- return null;
- }
-
- public synchronized void addChangeRecord(ObjectChangeRecord record)
- {
- dataStore.getDAOForClass(ObjectChangeRecord.class).update(record);
- }
-
- public ObjectChangeRecord getChangeRecord(long id)
- {
- if (id == 0)
- return null;
- return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id);
- }
-
- public void applyChangeRecord(ObjectChangeRecord record)
- {
- if (!record.getChangeHeads().contains(localData.getCurrentChangeRecord().getChangeID()))
- throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord().getChangeID());
- addChangeRecord(record);
-
-
-// if (record == null)
-// throw new IllegalArgumentException("Cannot apply unknown change!");
- }
-
- public Set getChangeHeads()
- {
- // stupid algorithm - start with all of the changes, then remove the ones that are referenced by something
- // TODO: better algorithm
- Set ret = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
-// for (ObjectChangeRecord record : changeRecords.values())
-// {
-// throw new UnsupportedOperationException("NYI");
-// }
- throw new UnsupportedOperationException("NYI");
- }
-
- public Set listOtherPeers()
- {
- Set ret = new HashSet<>();
- for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER)))
- {
- Peer peer = (Peer) object;
- if (peer != getSelfPeer())
- ret.add(peer);
- }
- return ret;
- }
-
- public void objectChanged(T storable)
- {
- Class clazz = (Class) storable.getClass();
- dataStore.getDAOForClass(clazz).update(storable);
- }
-}
diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java b/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java
index 4ea043f..42feebc 100644
--- a/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java
+++ b/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java
@@ -1,6 +1,7 @@
package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
+import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.objects.*;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
@@ -31,6 +32,14 @@ public class StupidJSONFileStore extends DataStore
}
}
+ @Override
+ public void clear()
+ {
+ daos.clear();
+ storageDirectory.delete();
+ storageDirectory.mkdirs();
+ }
+
@Override
public DAO getDAOForClass(Class clazz)
{
@@ -44,8 +53,6 @@ public class StupidJSONFileStore extends DataStore
ret = new NetworkFolderDAO();
else if (clazz.equals(Peer.class))
ret = new PeerDAO();
- else if (clazz.equals(PeerFileState.class))
- ret = new PeerFileStateDAO();
else if (clazz.equals(NetworkFSNode.class))
ret = new NetworkFSNodeDAO();
else if (clazz.equals(LocalData.class))
@@ -74,7 +81,8 @@ public class StupidJSONFileStore extends DataStore
{
File ret = new File(storageDirectory, getNamespace());
if (!ret.exists())
- ret.mkdir();
+ ret.mkdirs();
+ assert (ret.exists() && ret.isDirectory());
return ret;
}
protected abstract T makeBlank(long id);
@@ -174,6 +182,20 @@ public class StupidJSONFileStore extends DataStore
return file.exists();
}
+ @Override
+ public List list()
+ {
+ List ret = new ArrayList<>();
+ // get all files in the storage directory
+ for (File file : Objects.requireNonNull(getNamespaceDirectory().listFiles()))
+ {
+ String name = file.getName();
+ String nameWithoutExt = name.substring(0, name.indexOf('.'));
+ ret.add(Long.parseLong(nameWithoutExt, 16));
+ }
+ return ret;
+ }
+
@Override
public List getAll()
{
@@ -301,9 +323,8 @@ public class StupidJSONFileStore extends DataStore
@Override
protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException
{
- if (value instanceof URI)
+ if (value instanceof URI uri)
{
- URI uri = (URI) value;
return new JSONObject().put("weirdObjectClass", URI.class.getCanonicalName()).put("uri",uri.toString());
}
return super.serialiseWeirdObject(value);
@@ -330,20 +351,6 @@ public class StupidJSONFileStore extends DataStore
}
}
- private class PeerFileStateDAO extends NetworkObjectDAO
- {
- @Override
- protected String getNamespace()
- {
- return super.getNamespace() + "/peerFileStates";
- }
- @Override
- protected PeerFileState makeBlank(long id)
- {
- return new PeerFileState(new ObjectID(id));
- }
- }
-
private class LocalDataDAO extends JSONObjectDAO
{
@Override
@@ -372,5 +379,35 @@ public class StupidJSONFileStore extends DataStore
{
return new ObjectChangeRecord();
}
+
+ @Override
+ protected Object deserialiseWeirdObject(JSONObject json)
+ {
+ String weirdType = json.getString("weirdObjectClass");
+// Class> weirdClass = Class.forName(weirdType);
+ if (weirdType.equals(ObjectChangeRecord.Change.class.getCanonicalName()))
+ {
+ return new ObjectChangeRecord.Change(
+ new ObjectID(json.getLong("objectID")),
+ Util.stringifyMap(json.getJSONObject("before").toMap()),
+ Util.stringifyMap(json.getJSONObject("after").toMap()));
+ }
+ return super.deserialiseWeirdObject(json);
+ }
+
+ @Override
+ protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException
+ {
+ if (value instanceof ObjectChangeRecord.Change change)
+ {
+ JSONObject ret = new JSONObject();
+ ret.put("weirdObjectClass", ObjectChangeRecord.Change.class.getCanonicalName());
+ ret.put("objectID", change.objectID().toLong());
+ ret.put("before", change.beforeValues());
+ ret.put("after", change.afterValues());
+ return ret;
+ }
+ return super.serialiseWeirdObject(value);
+ }
}
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java
index 4584a4b..d0ce01c 100644
--- a/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java
+++ b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java
@@ -1,9 +1,8 @@
package moe.nekojimi.friendcloud.tasks;
-import moe.nekojimi.friendcloud.ConnectionManager;
+import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.Main;
-import moe.nekojimi.friendcloud.NotificationManager;
import moe.nekojimi.friendcloud.ObjectChangeTransaction;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest;
@@ -15,7 +14,6 @@ import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
-import java.util.stream.Collectors;
public class FileDownloadTask implements RunnableFuture
{
@@ -64,7 +62,7 @@ public class FileDownloadTask implements RunnableFuture
String connectionLine = "";
String progressLine = "";
- Peer selfPeer = Main.getInstance().getModel().getSelfPeer();
+ Peer selfPeer = Main.getInstance().getModel().getLocalPeer();
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
{
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces.");
@@ -122,11 +120,11 @@ public class FileDownloadTask implements RunnableFuture
System.out.println("FileDownloadTask: Will download pieces from " + runStart + " to " + runEnd);
// make one request per connectable peer, striping the needed pieces among them
- List>> fileFutures = new ArrayList<>();
+ List>> fileFutures = new ArrayList<>();
int offset = 0;
for (PeerConnection connection : connections)
{
- CompletableFuture> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size()));
+ CompletableFuture> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size()));
fileFutures.add(future);
offset++;
}
@@ -134,12 +132,12 @@ public class FileDownloadTask implements RunnableFuture
long timeout = timeoutPerPieceMs * (missingPieceIndices.size() / connections.size());
// wait for all the requests to complete
- for (CompletableFuture> future : fileFutures)
+ for (CompletableFuture> future : fileFutures)
{
try
{
- List receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS);
- receivedPieces.forEach(missingPieceIndices::remove);
+ Set receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS);
+ missingPieceIndices.removeAll(receivedPieces);
} catch (InterruptedException e)
{
future.cancel(true);
@@ -160,10 +158,10 @@ public class FileDownloadTask implements RunnableFuture
{
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID()))
{
- transaction.addObjectBeforeChange(file.getObjectID());
+ transaction.addObjectBeforeChange(file);
file.addPeerWithCopy(selfPeer);
}
- catch (IOException ex)
+ catch (IOException ignored)
{
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java
index 1858310..4427279 100644
--- a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java
+++ b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java
@@ -1,12 +1,19 @@
package moe.nekojimi.friendcloud.tasks;
+import com.kstruct.gethostname4j.Hostname;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeTransaction;
+import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
+import moe.nekojimi.friendcloud.Controller;
import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
public class JoinNetworkTask implements Runnable
{
@@ -14,25 +21,57 @@ public class JoinNetworkTask implements Runnable
@Override
public void run()
{
- // generate new peer ID
- ObjectID peerID = null;
- try (ObjectChangeTransaction builder = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), peerID))
+ System.out.println("JoinNetworkTask: Joining the network!");
+ Controller controller = Main.getInstance().getModel();
+
+ boolean firstJoin = false;
+
+ ObjectID peerID;
+ if (controller.getLocalPeer() != null)
+ peerID = controller.getLocalPeer().getObjectID();
+ else
{
- Peer selfPeer = Main.getInstance().getModel().getSelfPeer();
- if (selfPeer != null)
- peerID = selfPeer.getObjectID();
- else
- peerID = Main.getInstance().getModel().getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER);
+ peerID = controller.getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER);
+ firstJoin = true;
+ }
- // synchronise with the network
- SyncWithNetworkTask syncWithNetworkTask = new SyncWithNetworkTask();
- syncWithNetworkTask.run();
-
-
-
- } catch (IOException e)
+ ConnectionManager connectionManager = Main.getInstance().getConnectionManager();
+ if (firstJoin)
{
- throw new RuntimeException(e);
+ System.out.println("JoinNetworkTask: Performing first time setup...");
+ // download the entire state
+ PullStateTask pullStateTask = new PullStateTask();
+ Future> future = Main.getInstance().getExecutor().submit(pullStateTask);
+ try
+ {
+ future.get(30, TimeUnit.SECONDS);
+ } catch (InterruptedException | TimeoutException e)
+ {
+// throw new RuntimeException(e);
+ } catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ // create our local peer object
+ try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID))
+ {
+ // create and submit our Peer object if it doesn't exist
+ Peer selfPeer = controller.getLocalData().getLocalPeer();
+ if (selfPeer == null)
+ {
+ selfPeer = controller.createLocalPeer(peerID);
+ selfPeer.setUserName(System.getProperty("user.name"));
+ String hostname = Hostname.getHostname();
+ selfPeer.setSystemName(hostname);
+ selfPeer.setAddresses(connectionManager.getURIs());
+ controller.objectChanged(selfPeer);
+ controller.getLocalData().setLocalPeer(selfPeer);
+ transaction.addNewlyCreatedObject(selfPeer);
+ }
+ } catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
}
}
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java
index 48a52f8..19b7205 100644
--- a/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java
+++ b/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java
@@ -1,7 +1,7 @@
package moe.nekojimi.friendcloud.tasks;
import com.google.protobuf.Message;
-import moe.nekojimi.friendcloud.ConnectionManager;
+import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.objects.Peer;
@@ -21,17 +21,22 @@ public class PropagateMessageTask implements Runnable
public void run()
{
ConnectionManager connectionManager = Main.getInstance().getConnectionManager();
+ int messagesSent = 0;
for (Peer peer: Main.getInstance().getModel().listOtherPeers())
{
try
{
PeerConnection connection = connectionManager.getNodeConnection(peer);
if (connection != null)
+ {
connection.sendUnsolicitedMessage(message);
+ messagesSent++;
+ }
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
+ System.out.println("PropagateMessageTask: Sent " + message.getDescriptorForType().getFullName() + " to " + messagesSent + " peers.");
}
}
diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/PullChangesTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/PullChangesTask.java
new file mode 100644
index 0000000..dbb3585
--- /dev/null
+++ b/src/main/java/moe/nekojimi/friendcloud/tasks/PullChangesTask.java
@@ -0,0 +1,95 @@
+package moe.nekojimi.friendcloud.tasks;
+
+import moe.nekojimi.friendcloud.Main;
+import moe.nekojimi.friendcloud.ObjectChangeRecord;
+import moe.nekojimi.friendcloud.Util;
+import moe.nekojimi.friendcloud.network.PeerConnection;
+import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
+import moe.nekojimi.friendcloud.network.requests.RequestReceivedErrorException;
+import moe.nekojimi.friendcloud.objects.Peer;
+import moe.nekojimi.friendcloud.protos.CommonMessages;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
+
+public class PullChangesTask implements Callable
+{
+ private final Set peers;
+
+ public PullChangesTask()
+ {
+ this(Main.getInstance().getModel().listOtherPeers());
+ }
+
+ public PullChangesTask(Set peers)
+ {
+ this.peers = peers;
+ }
+
+ @Override
+ public Boolean call()
+ {
+ // for each other peer:
+
+ List> futures = new ArrayList<>();
+ Set changesSinceIDs = Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet());
+ System.out.println("PullChangesTask: Requesting changes since: " + changesSinceIDs.stream().map(Long::toHexString).collect(Collectors.toSet()));
+
+ for (Peer peer : peers)
+ {
+ System.out.println("PullChangesTask: Attempting to pull changes from " + peer);
+ // open a connection
+ PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
+ if (connection == null)
+ continue;
+ // send a ObjectChangeRequest
+ ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(changesSinceIDs);
+ CompletableFuture future = connection.makeRequest(objectChangeRequest)
+ .handle((changes, ex) ->
+ {
+ // integrate the returned changes with our change graph
+ if (ex == null)
+ {
+ Main.getInstance().getModel().applyChangeRecords(changes);
+ return PullResult.OK;
+ }
+ else
+ {
+ if (ex instanceof RequestReceivedErrorException re)
+ {
+ if (re.getErrorMessage().getError() == CommonMessages.Error.ERROR_END_OF_HISTORY)
+ {
+ return PullResult.END_OF_HISTORY;
+ }
+ }
+ ex.printStackTrace(System.err);
+ return PullResult.FAILED;
+ }
+ });
+ futures.add(future);
+ }
+
+ List results = Util.collectFutures(futures).join();
+
+ long peersGotChanges = results.stream().filter(pullResult -> pullResult == PullResult.OK).count();
+ long peersAtEndOfHistory = results.stream().filter(pullResult -> pullResult == PullResult.END_OF_HISTORY).count();
+ // if no peers could be contacted:
+ if (peersGotChanges == 0)
+ {
+ // if everyone reported end of history, we aren't synced (need to do a state pull)
+ // otherwise everyone else is offline so we might as well be synced (we'll cause a fork if we change anything but that's fine)
+ return peersAtEndOfHistory <= 0;
+ }
+ return true;
+ }
+
+ protected enum PullResult
+ {
+ OK,
+ END_OF_HISTORY,
+ FAILED;
+ }
+}
diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java
new file mode 100644
index 0000000..097dd7d
--- /dev/null
+++ b/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java
@@ -0,0 +1,106 @@
+package moe.nekojimi.friendcloud.tasks;
+
+import moe.nekojimi.friendcloud.Main;
+import moe.nekojimi.friendcloud.Util;
+import moe.nekojimi.friendcloud.network.PeerConnection;
+import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
+import moe.nekojimi.friendcloud.network.requests.ObjectListRequest;
+import moe.nekojimi.friendcloud.objects.NetworkObject;
+import moe.nekojimi.friendcloud.objects.Peer;
+import moe.nekojimi.friendcloud.protos.ObjectStatements;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+/**
+ * Task to clear our entire local state and re-download it from peers.
+ */
+public class PullStateTask implements Runnable
+{
+
+ @Override
+ public void run()
+ {
+ System.out.println("PullStateTask: Pulling state from peers...");
+ Set connections = new HashSet<>();
+
+ for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers())
+ {
+ String[] split = knownPeerAddress.split(":");
+ if (split.length != 2)
+ {
+ System.err.println("ERROR: " + knownPeerAddress + " isn't a valid address.");
+ continue;
+ }
+ InetSocketAddress address = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
+
+ try
+ {
+ URI uri = new URI("tcp", null, address.getHostString(), address.getPort(), null, null, null);
+ PeerConnection nodeConnection = Main.getInstance().getConnectionManager().getNodeConnection(uri);
+
+ if (nodeConnection != null)
+ {
+ connections.add(nodeConnection);
+ }
+ } catch (URISyntaxException | IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ for (Peer peer : Main.getInstance().getModel().listOtherPeers())
+ {
+ PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
+ if (connection != null)
+ {
+ connections.add(connection);
+ }
+ }
+
+ if (connections.isEmpty())
+ {
+ // if we can't connect to anyone, don't replace our state since we can't get a new one
+ System.out.println("PullStateTask: Have no peers to connect to, giving up.");
+ return;
+ }
+
+ System.out.println("PullStateTask: Have " + connections.size() + " peers to get state from.");
+
+ Main.getInstance().getModel().clearEverything();
+
+ List> futures = new ArrayList<>();
+
+ for (PeerConnection connection : connections)
+ {
+ futures.add(connection.makeRequest(new ObjectListRequest(Set.of(
+ ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
+ ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER,
+ ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects ->
+ {
+
+ System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects.");
+ for (NetworkObject object : networkObjects)
+ {
+ Main.getInstance().getModel().addNetworkObject(object);
+ }
+ }));
+
+ futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
+ {
+ System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
+ Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
+ }));
+ }
+
+ Util.collectFutures(futures).join();
+
+ }
+}
diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java
deleted file mode 100644
index d6f01db..0000000
--- a/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java
+++ /dev/null
@@ -1,38 +0,0 @@
-package moe.nekojimi.friendcloud.tasks;
-
-import moe.nekojimi.friendcloud.Main;
-import moe.nekojimi.friendcloud.ObjectChangeRecord;
-import moe.nekojimi.friendcloud.network.PeerConnection;
-import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
-import moe.nekojimi.friendcloud.objects.Peer;
-import moe.nekojimi.friendcloud.protos.ObjectStatements;
-
-import java.io.IOException;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-public class SyncWithNetworkTask implements Runnable
-{
-
- @Override
- public void run()
- {
- // for each other peer:
- for (Peer peer : Main.getInstance().getModel().listOtherPeers())
- {
- // open a connection
- PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
- // send a ObjectChangeRequest
- ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()));
- CompletableFuture> future = connection.makeRequest(objectChangeRequest);
-
- // integrate the returned changes with our change graph
- }
- // if no peers could be contacted:
- // return success (everyone's offline)
- // if everyone reported end of history:
- // delete our change graph
- // perform a full sync
- }
-}
diff --git a/src/main/protobuf/CommonMessages.proto b/src/main/protobuf/CommonMessages.proto
index 879790d..dedfdbd 100644
--- a/src/main/protobuf/CommonMessages.proto
+++ b/src/main/protobuf/CommonMessages.proto
@@ -17,10 +17,10 @@ message FriendCloudMessage {
message HelloMessage {
uint32 protocol_version = 1; // this is the version of the FriendCloud protocol I speak
-
}
-message LoginMessage {
+message CheckInMessage {
+ repeated uint64 current_change_heads = 1;
}
enum Error {
@@ -44,8 +44,6 @@ message ErrorMessage {
string text = 2;
}
-message PingMessage {
-}
-
-message PongMessage {
+message MultiObjectConfirmationMessage {
+ repeated uint64 expected_return_id = 1;
}
\ No newline at end of file
diff --git a/src/main/protobuf/ObjectStatements.proto b/src/main/protobuf/ObjectStatements.proto
index 75cbf9d..e81f05e 100644
--- a/src/main/protobuf/ObjectStatements.proto
+++ b/src/main/protobuf/ObjectStatements.proto
@@ -10,7 +10,6 @@ enum ObjectType {
OBJECT_TYPE_PEER = 2;
OBJECT_TYPE_FILE = 3;
OBJECT_TYPE_FOLDER = 4;
- OBJECT_TYPE_PEER_FILE_STATE = 5;
}
message ObjectState {
@@ -38,6 +37,8 @@ message ObjectChangeMessage {
uint64 change_id = 1;
repeated uint64 change_heads = 2;
repeated ObjectChange changes = 3;
+ uint64 creator_id = 4;
+ uint64 timestamp_ms = 5;
}
message ObjectChangeListMessage {
@@ -49,7 +50,7 @@ message ObjectChangeRequest {
}
message ObjectList {
- uint64 change_heads = 1;
+ uint64 change_head = 1;
repeated ObjectState states = 2;
}
diff --git a/src/main/protobuf/TestMessage.proto b/src/main/protobuf/TestMessage.proto
deleted file mode 100644
index a253a7a..0000000
--- a/src/main/protobuf/TestMessage.proto
+++ /dev/null
@@ -1,9 +0,0 @@
-syntax = "proto3";
-
-option java_package = "moe.nekojimi.friendcloud.protos";
-
-message SearchRequest {
- string query = 1;
- int32 page_number = 2;
- int32 results_per_page = 3;
-}
\ No newline at end of file