From cae44b6f853a57f7ac2c00171fb44b450e043e01 Mon Sep 17 00:00:00 2001 From: Nekojimi Date: Tue, 30 Sep 2025 14:10:06 +0100 Subject: [PATCH] Change tracking now mostly works probably --- pom.xml | 5 + .../moe/nekojimi/friendcloud/Controller.java | 411 ++++++++++++++++++ .../nekojimi/friendcloud/FilePieceAccess.java | 19 +- .../java/moe/nekojimi/friendcloud/Main.java | 346 ++++++--------- .../friendcloud/ObjectChangeRecord.java | 228 +++++++--- .../friendcloud/ObjectChangeTransaction.java | 90 +++- .../nekojimi/friendcloud/SharedDirectory.java | 55 +++ .../friendcloud/SharedFileManager.java | 112 +++++ .../java/moe/nekojimi/friendcloud/Util.java | 106 ++++- .../friendcloud/filesystem/FUSEAccess.java | 37 +- .../network/ConnectionBackend.java | 70 +++ .../{ => network}/ConnectionManager.java | 79 ++-- .../friendcloud/network/PeerConnection.java | 265 +++++++---- .../network/TCPConnectionBackend.java | 125 ++++++ ...Connection.java => TCPPeerConnection.java} | 23 +- .../network/requests/FilePiecesRequest.java | 29 +- .../network/requests/ObjectChangeRequest.java | 35 +- .../network/requests/ObjectListRequest.java | 7 +- .../friendcloud/network/requests/Request.java | 8 +- .../RequestReceivedErrorException.java | 19 + .../friendcloud/objects/NetworkFSNode.java | 35 +- .../friendcloud/objects/NetworkFile.java | 108 ++--- .../friendcloud/objects/NetworkObject.java | 44 +- .../friendcloud/objects/ObjectID.java | 11 + .../nekojimi/friendcloud/objects/Peer.java | 40 +- .../friendcloud/objects/PeerFileState.java | 92 ---- .../friendcloud/storage/CachingDataStore.java | 30 +- .../friendcloud/storage/DataStore.java | 15 +- .../friendcloud/storage/LocalData.java | 2 +- .../nekojimi/friendcloud/storage/Model.java | 202 --------- .../storage/StupidJSONFileStore.java | 75 +++- .../friendcloud/tasks/FileDownloadTask.java | 20 +- .../friendcloud/tasks/JoinNetworkTask.java | 71 ++- .../tasks/PropagateMessageTask.java | 7 +- .../friendcloud/tasks/PullChangesTask.java | 95 ++++ .../friendcloud/tasks/PullStateTask.java | 106 +++++ .../tasks/SyncWithNetworkTask.java | 38 -- src/main/protobuf/CommonMessages.proto | 10 +- src/main/protobuf/ObjectStatements.proto | 5 +- src/main/protobuf/TestMessage.proto | 9 - 40 files changed, 2120 insertions(+), 964 deletions(-) create mode 100644 src/main/java/moe/nekojimi/friendcloud/Controller.java create mode 100644 src/main/java/moe/nekojimi/friendcloud/SharedDirectory.java create mode 100644 src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java create mode 100644 src/main/java/moe/nekojimi/friendcloud/network/ConnectionBackend.java rename src/main/java/moe/nekojimi/friendcloud/{ => network}/ConnectionManager.java (62%) create mode 100644 src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java rename src/main/java/moe/nekojimi/friendcloud/network/{PeerTCPConnection.java => TCPPeerConnection.java} (73%) create mode 100644 src/main/java/moe/nekojimi/friendcloud/network/requests/RequestReceivedErrorException.java delete mode 100644 src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java delete mode 100644 src/main/java/moe/nekojimi/friendcloud/storage/Model.java create mode 100644 src/main/java/moe/nekojimi/friendcloud/tasks/PullChangesTask.java create mode 100644 src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java delete mode 100644 src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java delete mode 100644 src/main/protobuf/TestMessage.proto diff --git a/pom.xml b/pom.xml index ea57383..3d2b4e5 100644 --- a/pom.xml +++ b/pom.xml @@ -74,6 +74,11 @@ jlibnotify 1.1.0 + + engineering.swat + java-watch + 0.9.5 + diff --git a/src/main/java/moe/nekojimi/friendcloud/Controller.java b/src/main/java/moe/nekojimi/friendcloud/Controller.java new file mode 100644 index 0000000..84a2137 --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/Controller.java @@ -0,0 +1,411 @@ +package moe.nekojimi.friendcloud; + +import moe.nekojimi.friendcloud.objects.*; +import moe.nekojimi.friendcloud.protos.ObjectStatements; +import moe.nekojimi.friendcloud.storage.CachingDataStore; +import moe.nekojimi.friendcloud.storage.DataStore; +import moe.nekojimi.friendcloud.storage.LocalData; +import moe.nekojimi.friendcloud.storage.Storable; + +import java.io.IOException; +import java.util.*; +import java.util.stream.Collectors; + +public class Controller +{ + + private final CachingDataStore dataStore; + private LocalData localData; + + private Set changeHeads = null; + + public Controller(DataStore dataStore) + { + this.dataStore = new CachingDataStore(dataStore); + } + + public synchronized void init() + { + + List localDataList = dataStore.getDAOForClass(LocalData.class).getAll(); + if (localDataList.isEmpty()) + { + localData = dataStore.getDAOForClass(LocalData.class).create(0); + } + else if (localDataList.size() == 1) + { + localData = localDataList.getFirst(); + } + else + { + throw new IllegalStateException("We have more than one LocalData somehow!!"); + } + if (localData.getSystemID() == 0) + { + Random ran = new Random(); + localData.setSystemID(ran.nextInt() & 0x00FFFFFF); + objectChanged(localData); + } + } + + public LocalData getLocalData() + { + return localData; + } + + public Peer getLocalPeer() + { + return localData.getLocalPeer(); + } + + public Peer createLocalPeer(ObjectID id) + { + if (localData.getLocalPeer() == null) + { + localData.setLocalPeer(dataStore.getDAOForClass(Peer.class).create(id.toLong())); + objectChanged(localData); + } + return localData.getLocalPeer(); + } + + // private Map nodes = new HashMap<>(); + + public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type) + { + Random ran = new Random(); + int randomNumber = ran.nextInt(); + ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber); + System.out.println("Assigned new object ID: " + objectID); + return objectID; + } + + public static Class getNetworkObjectClassByType(ObjectStatements.ObjectType type) + { + return switch (type) + { + case OBJECT_TYPE_FILE -> NetworkFile.class; + case OBJECT_TYPE_FOLDER -> NetworkFolder.class; + case OBJECT_TYPE_PEER -> Peer.class; + case OBJECT_TYPE_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("???"); + default -> throw new UnsupportedOperationException("NYI: " + type); + }; + } + + public synchronized T createObjectByID(ObjectID id) + { + if (id.toLong() == 0) + throw new IllegalArgumentException("Cannot create an object with ID=0!"); + ObjectStatements.ObjectType type = id.getType(); + System.out.println("Creating new object with type: " + type.name()); + if (type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED) + throw new IllegalArgumentException(); + T ret = (T) dataStore.getDAOForClass(getNetworkObjectClassByType(type)).create(id.toLong()); + return ret; + } + + public synchronized T createObjectByType(ObjectStatements.ObjectType type) + { + return createObjectByID(getNextObjectID(type)); + } + + public synchronized T getObject(ObjectID id) + { + if (id.toLong() == 0) + return null; + Class clazz = (Class) getNetworkObjectClassByType(id.getType()); + return dataStore.getDAOForClass(clazz).get(id.toLong()); + } + +// public synchronized T getOrCreateObject(ObjectID id) +// { +// if (id.toLong() == 0) +// return null; +// Class clazz = (Class) getNetworkObjectClassByType(id.getType()); +// return dataStore.getDAOForClass(clazz).getOrCreate(id.toLong()); +// } + + public synchronized List listObjects(Set types) + { + List ret = new ArrayList<>(); + Set> classes = types.stream().map(Controller::getNetworkObjectClassByType).collect(Collectors.toSet()); + for (Class clazz: classes) + { + List list = dataStore.getDAOForClass(clazz).getAll(); + ret.addAll(list); + } + return ret; + } + + public synchronized List listFSNodes(String path) + { + //TODO: dumbest algorithm in the world + + NetworkFolder folder = (NetworkFolder) getFSNode(path); + + List ret = new ArrayList<>(); + for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER))) + { + NetworkFSNode fsNode = (NetworkFSNode) object; + if (Objects.equals(fsNode.getParent(), folder)) + ret.add(fsNode); + } + return ret; + } + + public synchronized void addChangeRecord(ObjectChangeRecord record) + { + DataStore.DAO dao = dataStore.getDAOForClass(ObjectChangeRecord.class); + dao.update(record); + // update the change heads; if any of this change's heads are included in ours, then this change replaces them + if (changeHeads != null) + { + boolean recordIsNewHead = changeHeads.isEmpty(); + if (!recordIsNewHead) + { + for (long changeHeadID : record.getChangeHeads()) + { + ObjectChangeRecord headRecord = dao.get(changeHeadID); + recordIsNewHead = changeHeads.remove(headRecord); + } + } + if (recordIsNewHead) + { + changeHeads.add(record); + System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet())); + } + } + } + + public void setCurrentChangeRecord(ObjectChangeRecord record) + { + addChangeRecord(record); + long oldChangeID = localData.getCurrentChangeRecord() == null ? 0L : localData.getCurrentChangeRecord().getChangeID(); + localData.setCurrentChangeRecord(record); + objectChanged(localData); + System.out.println("Controller: Change record set; old= " + Long.toHexString(oldChangeID) + " current= " + Long.toHexString(localData.getCurrentChangeRecord().getChangeID())); + } + + public ObjectChangeRecord getChangeRecord(long id) + { + if (id == 0) + return null; + return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id); + } + + public void addChangeRecords(Set objectChangeRecords) + { + List sorted = ObjectChangeRecord.partiallySort(objectChangeRecords); + for (ObjectChangeRecord record : sorted) + addChangeRecord(record); + } + + public void applyChangeRecord(ObjectChangeRecord record) + { + System.out.println("Controller: Applying change record " + record); + long changeID = localData.getCurrentChangeRecord().getChangeID(); + if (!record.getChangeHeads().contains(changeID)) + throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID)); + addChangeRecord(record); + + record.applyToLocalState(); + + setCurrentChangeRecord(record); + objectChanged(localData); + +// if (record == null) +// throw new IllegalArgumentException("Cannot apply unknown change!"); + } + + public void applyChangeRecords(Set changes) + { + System.out.println("Controller: applying " + changes.size() + " change records."); + addChangeRecords(changes); + +// List record = ObjectChangeRecord.partiallySort(changes); + + + Set pendingChanges = new HashSet<>(changes); + + while (!pendingChanges.isEmpty()) + { + // find all changes whose change heads = our current change heads + Set applicableChanges = pendingChanges.stream() + .filter(objectChangeRecord -> objectChangeRecord.getChangeHeads().equals(getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()))) + .collect(Collectors.toSet()); + + // if there are none, we've reached the present (or a present), break + if (applicableChanges.isEmpty()) + break; + + // if there's just one, follow that one + // TODO: if there's more than one, that's a fork, we should select a current branch by some logic + ObjectChangeRecord changeToApply = applicableChanges.iterator().next(); + applyChangeRecord(changeToApply); + + // the change we took (and any untaken forks) are no longer pending + pendingChanges.removeAll(applicableChanges); + } + } + + public Set getChangeHeads() + { + // stupid algorithm - start with all of the changes, then remove the ones that are referenced by something + // TODO: better algorithm + if (changeHeads == null) + { + changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll()); + Set referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet()); + changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID())); + System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet())); + } + return changeHeads; + } + + public Set listOtherPeers() + { + Set ret = new HashSet<>(); + for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER))) + { + Peer peer = (Peer) object; + if (localData.getLocalPeer() == null || !peer.equals(localData.getLocalPeer())) + ret.add(peer); + } + return ret; + } + + public void objectChanged(T storable) + { + Class clazz = (Class) storable.getClass(); + dataStore.getDAOForClass(clazz).update(storable); + } + + public DataStore getDataStore() + { + return dataStore; + } + + public void clearEverything() + { + dataStore.clear(); + } + + public void addNetworkObject(NetworkObject object) + { + objectChanged(object); + } + + public Set findChangesSince(List changeIDs) + { + // check that the specified change IDs are actually present in our history + DataStore.DAO ocrDao = dataStore.getDAOForClass(ObjectChangeRecord.class); + if (!changeIDs.stream().allMatch(ocrDao::exists)) + return null; + + // start with the current change heads + Set ret = new HashSet<>(); + + Deque openSet = new LinkedList<>(getChangeHeads()); + while (!openSet.isEmpty()) + { + ObjectChangeRecord record = openSet.poll(); + if (changeIDs.contains(record.getChangeID())) + continue; + ret.add(record); + for (long changeHeadID: record.getChangeHeads()) + { + openSet.add(ocrDao.get(changeHeadID)); + } + } + + return ret; + } + + /** + * Changes the network name and/or path of a given network FS node. + * Performs both renaming and moving operations. If the destination folder(s) don't exist, they will be created as per makeFolder. + * @param fsNode the file or folder being renamed or moved. + * @param newpath the new path of the node, starting with and separated by /. + */ + public synchronized boolean renameFSNode(NetworkFSNode fsNode, String newpath) throws IOException + { + if (fsNode.getNetworkPath().equals(newpath)) + return true; + int lastSlash = newpath.lastIndexOf("/"); + String name = newpath.substring(lastSlash+1); + String path = newpath.substring(0, lastSlash); + if (path.isEmpty()) + path = "/"; + try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID())) + { + transaction.addObjectBeforeChange(fsNode); + if (!name.equals(fsNode.getName())) + fsNode.setName(name); + NetworkFolder parent = fsNode.getParent(); + if (!path.equals(parent != null ? parent.getNetworkPath() : "/")) + { + NetworkFSNode fsn = getFSNode(path); + if (fsn != null && !(fsn instanceof NetworkFolder)) + throw new IllegalArgumentException("Given parent path " + fsn.getNetworkPath() + " is a file!"); + NetworkFolder newParent = (NetworkFolder) fsn; + if (newParent == null && !path.equals("/")) + { + newParent = makeFSFolder(path); + } + fsNode.setParent(newParent); + } + + } catch (IOException e) + { + e.printStackTrace(System.err); + return false; + } + return true; + } + + /** + * Creates (or ensures the existence of) a NetworkFolder at the given path, along with all non-existent parent folders. + * @param path the path of the new NetworkFolder; must start with "/" + * @return the new NetworkFolder; or null, if path is "/" + */ + public synchronized NetworkFolder makeFSFolder(String path) + { + if (path.equals("/") || path.isEmpty()) + return null; + NetworkFolder ret = null; + try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID())) + { + NetworkFSNode node = getFSNode(path); + if (node != null && !(node instanceof NetworkFolder)) + throw new IllegalArgumentException("Given parent path " + node.getNetworkPath() + " is a file!"); + ret = (NetworkFolder) node; + if (ret != null) + return ret; // already exists + + int lastSlash = path.lastIndexOf("/"); + String name = path.substring(lastSlash+1); + String parentPath = path.substring(0, lastSlash); + NetworkFolder parent = makeFSFolder(parentPath); + ret = createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER); + ret.setParent(parent); + ret.setName(name); + transaction.addNewlyCreatedObject(ret); + return ret; + } catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public synchronized NetworkFSNode getFSNode(String path) + { + // TODO: bad algorithm make better + for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER))) + { + NetworkFSNode fsNode = (NetworkFSNode) object; + String networkPath = fsNode.getNetworkPath(); + if (networkPath.equals(path)) + return fsNode; + } + return null; + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java b/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java index 80cecb1..f03175a 100644 --- a/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java +++ b/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java @@ -12,13 +12,16 @@ public class FilePieceAccess implements Closeable private final NetworkFile networkFile; private final File file; private final RandomAccessFile randomAccessFile; + private final OpenMode openMode; - public FilePieceAccess(NetworkFile networkFile) throws IOException + public FilePieceAccess(NetworkFile networkFile, OpenMode openMode) throws IOException { this.networkFile = networkFile; this.file = networkFile.getOrCreateLocalFile(); - this.randomAccessFile = new RandomAccessFile(file,"rw"); - randomAccessFile.setLength(file.length()); + this.randomAccessFile = new RandomAccessFile(file,openMode == OpenMode.READ_WRITE ? "rw" : "r"); + this.openMode = openMode; + if (openMode == OpenMode.READ_WRITE) + randomAccessFile.setLength(file.length()); } public long getPieceOffset(int index) @@ -56,7 +59,9 @@ public class FilePieceAccess implements Closeable public void writePiece(int index, byte[] buffer) throws IOException { - if (buffer.length != getPieceSize(index)) + if (openMode == OpenMode.READ_ONLY) + throw new IllegalStateException("File was opened read-only!"); + else if (buffer.length != getPieceSize(index)) throw new IllegalArgumentException("Received a file piece that's the wrong size!! Length = " + buffer.length + " != Piece Size = " + getPieceSize(index)); else if (index >= networkFile.getPieceCount()) throw new IllegalArgumentException("Received a file piece with an index past the end of the file!!"); @@ -72,4 +77,10 @@ public class FilePieceAccess implements Closeable { randomAccessFile.close(); } + + public enum OpenMode + { + READ_ONLY, + READ_WRITE; + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/Main.java b/src/main/java/moe/nekojimi/friendcloud/Main.java index 6f19f50..b1802d5 100644 --- a/src/main/java/moe/nekojimi/friendcloud/Main.java +++ b/src/main/java/moe/nekojimi/friendcloud/Main.java @@ -2,37 +2,17 @@ package moe.nekojimi.friendcloud; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; -import com.kstruct.gethostname4j.Hostname; -import com.offbynull.portmapper.PortMapperFactory; -import com.offbynull.portmapper.gateway.Bus; -import com.offbynull.portmapper.gateway.Gateway; -import com.offbynull.portmapper.gateways.network.NetworkGateway; -import com.offbynull.portmapper.gateways.process.ProcessGateway; -import com.offbynull.portmapper.mapper.MappedPort; -import com.offbynull.portmapper.mapper.PortMapper; -import com.offbynull.portmapper.mapper.PortType; -import es.blackleg.jlibnotify.JLibnotify; -import es.blackleg.jlibnotify.JLibnotifyNotification; -import es.blackleg.jlibnotify.core.DefaultJLibnotify; -import es.blackleg.jlibnotify.core.DefaultJLibnotifyLoader; -import es.blackleg.jlibnotify.exception.JLibnotifyInitException; -import es.blackleg.jlibnotify.exception.JLibnotifyLoadException; import jnr.ffi.Platform; import moe.nekojimi.friendcloud.filesystem.FUSEAccess; -import moe.nekojimi.friendcloud.network.PeerConnection; -import moe.nekojimi.friendcloud.network.requests.ObjectListRequest; -import moe.nekojimi.friendcloud.objects.NetworkFile; -import moe.nekojimi.friendcloud.objects.NetworkObject; -import moe.nekojimi.friendcloud.objects.Peer; -import moe.nekojimi.friendcloud.objects.PeerFileState; -import moe.nekojimi.friendcloud.protos.ObjectStatements; +import moe.nekojimi.friendcloud.network.ConnectionManager; +import moe.nekojimi.friendcloud.network.TCPConnectionBackend; +import moe.nekojimi.friendcloud.protos.CommonMessages; import moe.nekojimi.friendcloud.storage.DataStore; -import moe.nekojimi.friendcloud.storage.Model; import moe.nekojimi.friendcloud.storage.StupidJSONFileStore; import moe.nekojimi.friendcloud.tasks.JoinNetworkTask; +import moe.nekojimi.friendcloud.tasks.PropagateMessageTask; import org.slf4j.simple.SimpleLogger; -import java.awt.*; import java.io.File; import java.io.IOException; import java.net.*; @@ -43,41 +23,31 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.*; +import java.util.stream.Collectors; public class Main { private static Main instance; - @Parameter(names="-share") - private List sharedFilePaths = new ArrayList<>(); - @Parameter(names="-known-peer") - private List knownPeers = new ArrayList<>(); - - @Parameter(names="-tcp-port") - private int tcpPort = 7777; - - @Parameter(names="-no-upnp") - private boolean noUpnp = false; - - @Parameter(names="-create-network") - private boolean createNetwork = false; - - @Parameter(names = "-storage") - private String storageLocation = "."; // @Parameter(names="-file") + private Args args; private ConnectionManager connectionManager; private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(16); + private final Set> scheduledFutures = new HashSet<>(); private final FUSEAccess fuseAccess = new FUSEAccess(); - private Model model; + private Controller controller; private final NotificationManager notificationManager = new NotificationManager(); + private final SharedFileManager sharedFileManager = new SharedFileManager(); - public static void main(String[] args) + public static void main(String[] argv) { instance = new Main(); - JCommander.newBuilder().addObject(instance).build().parse(args); + Args args = new Args(); + JCommander.newBuilder().addObject(args).build().parse(argv); + instance.args = args; System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info"); @@ -88,25 +58,19 @@ public class Main { System.err.println("main() received exception, dying horribly!!"); e.printStackTrace(System.err); - try - { - instance.shutdown(); - } catch (IOException f) - { - throw new RuntimeException(f); - } + instance.shutdown(); System.exit(1); } // TestMessage.SearchRequest request = TestMessage.SearchRequest.newBuilder().setQuery("bees!").setPageNumber(316).setResultsPerPage(42069).build(); } - private void run() throws IOException, InterruptedException, JLibnotifyLoadException, JLibnotifyInitException + private void run() throws IOException { - DataStore dataStore = new StupidJSONFileStore(new File(storageLocation)); - model = new Model(dataStore); + DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation)); + controller = new Controller(dataStore); - model.init(); - connectionManager = new ConnectionManager(tcpPort); + controller.init(); + connectionManager = new ConnectionManager(); Path mountPoint; if (Platform.getNativePlatform().getOS() == Platform.OS.WINDOWS) @@ -115,23 +79,14 @@ public class Main } else { - mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + tcpPort); - boolean created = mountPoint.toFile().mkdirs(); + mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + args.tcpPort); + mountPoint.toFile().mkdirs(); System.out.println("Created FUSE mount point " + mountPoint); } fuseAccess.mount(mountPoint); System.out.println("Mounted virtual filesystem at " + mountPoint); - Runtime.getRuntime().addShutdownHook(new Thread(() -> - { - try - { - shutdown(); - } catch (IOException e) - { - throw new RuntimeException(e); - } - })); + Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown)); // if (Desktop.isDesktopSupported()) // { @@ -139,15 +94,9 @@ public class Main // desktop.browse(mountPoint.toFile().toURI()); // } - connectionManager.addNewConnectionConsumer(this::requestCompleteState); +// connectionManager.addNewConnectionConsumer(this::requestCompleteState); - connectionManager.start(); - - String hostname = Hostname.getHostname(); - model.getSelfPeer().setSystemName(hostname); - model.getSelfPeer().setUserName(System.getProperty("user.name") + "-" + tcpPort); - addHostAddress(InetAddress.getLocalHost()); - model.objectChanged(model.getSelfPeer()); + connectionManager.addConnectionBackend(new TCPConnectionBackend(args.tcpPort, connectionManager)); /* Startup procedure: @@ -159,149 +108,71 @@ public class Main - Publish local file changes */ - if (!noUpnp) - setupIGP(); + JoinNetworkTask joinNetworkTask = new JoinNetworkTask(); + CompletableFuture.runAsync(joinNetworkTask,executor) + .thenRun(this::shareInitialFiles) + .thenRun(this::scheduleCheckins) + .handle((unused, throwable) -> + { + if (throwable != null) + { + System.err.println("Error in initial task!"); + throwable.printStackTrace(System.err); + shutdown(); + } + return null; + }); + } + private void scheduleCheckins() + { + executor.scheduleWithFixedDelay(() -> { + System.out.println("Checking in with friends..."); + CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder() + .addAllCurrentChangeHeads(controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet())) + .build(); + PropagateMessageTask propagateMessageTask = new PropagateMessageTask(checkInMessage); + executor.submit(propagateMessageTask); + }, 0,5, TimeUnit.MINUTES); + } + + private void shareInitialFiles() + { + System.out.println("Sharing files given on command line..."); Set sharedFiles = new HashSet<>(); - for (String sharedFilePath: sharedFilePaths) + for (String sharedFilePath: args.sharedFilePaths) { sharedFiles.add(new File(sharedFilePath)); } - List knownFiles = model.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE)); - - for (NetworkObject knownFile: knownFiles) - { - NetworkFile f = (NetworkFile) knownFile; - boolean removed = sharedFiles.remove(f.getLocalFile()); - if (removed) - System.out.println("Identified known local file " + f.getObjectID() + " = " + f.getLocalFile()); - } - - for (File sharedFile: sharedFiles) - { - if (sharedFile.exists()) - { - System.out.println("Adding shared network file: " + sharedFile.getAbsolutePath()); - - NetworkFile networkFile = (NetworkFile) model.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FILE); - networkFile.updateFromLocalFile(sharedFile); - model.objectChanged(networkFile); - -// PeerFileState peerFileState = (PeerFileState) model.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE); -// peerFileState.setNode(model.getSelfPeer()); -// peerFileState.setFile(networkFile); -// peerFileState.setProgress(100); -// model.objectChanged(peerFileState); - } - } - -// JoinNetworkTask joinNetworkTask = new JoinNetworkTask(); -// executor.submit(joinNetworkTask); - - for (String knownPeerAddress : knownPeers) - { - String[] split = knownPeerAddress.split(":"); - if (split.length != 2) - { - System.err.println("ERROR: " + knownPeerAddress + " isn't a valid address."); - continue; - } - InetSocketAddress address = new InetSocketAddress(split[0],Integer.parseInt(split[1])); - - try - { - URI uri = new URI("tcp", null, address.getHostString(), address.getPort(), null, null, null); - PeerConnection nodeConnection = connectionManager.getNodeConnection(uri); - - requestCompleteState(nodeConnection); - } catch (ConnectException ex) - { - System.out.println("Couldn't connect to host " + address); - } - catch (URISyntaxException e) - { - throw new RuntimeException(e); - } - } - } - - private void requestCompleteState(PeerConnection nodeConnection) - { - CompletableFuture> objectListFuture = nodeConnection.makeRequest(new ObjectListRequest(Set.of( - ObjectStatements.ObjectType.OBJECT_TYPE_FILE, - ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE, - ObjectStatements.ObjectType.OBJECT_TYPE_PEER))); + sharedFileManager.addSharedFiles(sharedFiles); } - private void addHostAddress(InetAddress address) + private void shutdown() { - String host = address.getCanonicalHostName(); - Peer selfNode = model.getSelfPeer(); try { - URI uri = new URI("tcp", null, host, tcpPort, null, null, null); - System.out.println("Added local address " + uri); - selfNode.addAddress(uri); - } catch (URISyntaxException e) + fuseAccess.umount(); + connectionManager.shutdown(); + executor.shutdown(); + System.out.println("Waiting 10 seconds to complete tasks..."); + boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS); + + if (!terminated) + { + System.out.println("Timed out, ending tasks now. Goodbye!"); + executor.shutdownNow(); + } + else + { + System.out.println("Finished everything. Goodbye!"); + } + } catch (Exception e) { throw new RuntimeException(e); } } - private void setupIGP() throws InterruptedException - { - try - { - // Start gateways - Gateway network = NetworkGateway.create(); - Gateway process = ProcessGateway.create(); - Bus networkBus = network.getBus(); - Bus processBus = process.getBus(); - - // Discover port forwarding devices and take the first one found - System.out.println("Discovering port mappers..."); - List mappers = PortMapperFactory.discover(networkBus, processBus);; - PortMapper mapper = mappers.getFirst(); - System.out.println("Got mapper " + mapper + ", mapping port..."); - - MappedPort mappedPort = mapper.mapPort(PortType.TCP, tcpPort, tcpPort, 60); - System.out.println("Port mapping added: " + mappedPort); - - addHostAddress(mappedPort.getExternalAddress()); - } - catch (IllegalStateException ex) - { - System.err.println("Failed to map port! error=" + ex.getMessage()); - ex.printStackTrace(System.err); - } - } - - private void shutdown() throws IOException - { - fuseAccess.umount(); - connectionManager.shutdown(); - executor.shutdown(); - System.out.println("Waiting 10 seconds to complete tasks..."); - boolean terminated = false; - try - { - terminated = executor.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException e) - { - throw new RuntimeException(e); - } - if (!terminated) - { - System.out.println("Timed out, ending tasks now. Goodbye!"); - executor.shutdownNow(); - } - else - { - System.out.println("Finished everything. Goodbye!"); - } - } - public static Main getInstance() { return instance; @@ -317,13 +188,80 @@ public class Main return connectionManager; } - public Model getModel() + public Controller getModel() { - return model; + return controller; } public NotificationManager getNotificationManager() { return notificationManager; } + + public Controller getController() + { + return controller; + } + + public Args getArgs() + { + return args; + } + + public SharedFileManager getSharedFileManager() + { + return sharedFileManager; + } + + @SuppressWarnings("FieldCanBeLocal") + public static class Args + { + @Parameter(names="-share") + private List sharedFilePaths = new ArrayList<>(); + + @Parameter(names="-known-peer") + private List knownPeers = new ArrayList<>(); + + @Parameter(names="-tcp-port") + private int tcpPort = 7777; + + @Parameter(names="-no-upnp") + private boolean noUpnp = false; + + @Parameter(names="-create-network") + private boolean createNetwork = false; + + @Parameter(names = "-storage") + private String storageLocation = "."; + + public List getSharedFilePaths() + { + return sharedFilePaths; + } + + public List getKnownPeers() + { + return knownPeers; + } + + public int getTcpPort() + { + return tcpPort; + } + + public boolean isNoUpnp() + { + return noUpnp; + } + + public boolean isCreateNetwork() + { + return createNetwork; + } + + public String getStorageLocation() + { + return storageLocation; + } + } } \ No newline at end of file diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java index aee582f..812a8d9 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java +++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java @@ -1,5 +1,6 @@ package moe.nekojimi.friendcloud; +import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.storage.Storable; @@ -7,7 +8,9 @@ import moe.nekojimi.friendcloud.storage.Storable; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; +import java.time.Instant; import java.util.*; +import java.util.stream.Collectors; public class ObjectChangeRecord implements Storable { @@ -16,11 +19,14 @@ public class ObjectChangeRecord implements Storable private ObjectID creatorPeer; private Set changeHeads = new HashSet<>(); private Set changes = new HashSet<>(); + private Instant creationTime; public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage) { - ObjectChangeRecord record = new ObjectChangeRecord(); // TODO: decode creator + ObjectChangeRecord record = new ObjectChangeRecord(); record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList()); + record.creatorPeer = new ObjectID(objectChangeMessage.getCreatorId()); + record.creationTime = Instant.ofEpochMilli(objectChangeMessage.getTimestampMs()); for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList()) { record.changes.add(Change.createFromObjectChange(objectChange)); @@ -29,16 +35,18 @@ public class ObjectChangeRecord implements Storable long specifiedID = objectChangeMessage.getChangeId(); if (calculatedID != specifiedID) { - System.err.println("WARNING: didn't decode change ID correctly!"); + throw new RuntimeException("Failed to verify change ID! specified=" + Long.toHexString(specifiedID) + " != calculated=" + Long.toHexString(calculatedID)); } return record; } - public static ObjectChangeRecord createFromChanges(ObjectID creator, Set changes) + public static ObjectChangeRecord createFromChanges(ObjectID creator, Set changeHeads, Set changes) { ObjectChangeRecord record = new ObjectChangeRecord(); record.creatorPeer = creator; + record.creationTime = Instant.now(); record.changes.addAll(changes); + record.changeHeads = changeHeads; return record; } @@ -59,6 +67,8 @@ public class ObjectChangeRecord implements Storable ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder(); builder.setChangeId(getChangeID()); builder.addAllChangeHeads(changeHeads); + builder.setCreatorId(creatorPeer.toLong()); + builder.setTimestampMs(creationTime.toEpochMilli()); for (Change change : changes) { builder.addChanges(change.buildObjectChange()); @@ -71,26 +81,43 @@ public class ObjectChangeRecord implements Storable { return Map.of("changeHeads", changeHeads, "changes", changes, - "creator", creatorPeer.toLong()); + "creator", creatorPeer.toLong(), + "creationTime", creationTime.toEpochMilli() + ); } + @SuppressWarnings("unchecked") @Override public void updateFromStateMap(Map map) { changeHeads = new HashSet<>((Collection) map.get("changeHeads")); changes = new HashSet<>((Collection) map.get("changes")); creatorPeer = new ObjectID((Long) map.get("creator")); + creationTime = Instant.ofEpochMilli((Long) map.get("creationTime")); + } + + public void applyToLocalState() + { + for (Change change: changes) + { + NetworkObject object = Main.getInstance().getModel().getObject(change.objectID); + if (object == null) + object = Main.getInstance().getController().createObjectByID(change.objectID); + object.updateFromChange(change); + Main.getInstance().getModel().objectChanged(object); + } } public String toString() { StringBuilder sb = new StringBuilder(); - for (long changeHeadId: changeHeads) + sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";"); + for (long changeHeadId: changeHeads.stream().sorted().toList()) { sb.append(changeHeadId).append(","); } sb.append(";"); - for (Change change: changes) + for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList()) { sb.append(change.toString()).append(";"); } @@ -107,7 +134,9 @@ public class ObjectChangeRecord implements Storable { throw new RuntimeException(e); } - byte[] bytes = digest.digest(toString().getBytes(StandardCharsets.UTF_8)); + String stringVal = toString(); + byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8)); + // System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal); return Util.xorBytesToLong(bytes); } @@ -127,58 +156,139 @@ public class ObjectChangeRecord implements Storable return changeHeads; } - public record Change(ObjectID objectID, Map beforeValues, Map afterValues) + public static final class Change + { + private final ObjectID objectID; + private final Map beforeValues; + private final Map afterValues; + + public Change(ObjectID objectID, Map beforeValues, Map afterValues) { - - public static Change createFromObjectChange(ObjectStatements.ObjectChange change) - { - return new Change(new ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap()); - } - - public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after) - { - Map beforeValues = new HashMap<>(); - Map afterValues = new HashMap<>(); - for (String key : after.getValuesMap().keySet()) - { - String beforeValue = before.getValuesOrDefault(key, null); - String afterValue = after.getValuesOrDefault(key, null); - if (!afterValue.equals(beforeValue)) - { - beforeValues.put(key, beforeValue); - afterValues.put(key, afterValue); - } - } - if (!afterValues.isEmpty()) - { - return new Change(new ObjectID(before.getObjectId()), beforeValues, afterValues); - } - return null; - } - - public String toString() - { - StringBuilder sb = new StringBuilder(); - sb.append(objectID.toLong()).append(";"); // The object ID, then ; - // now all key-value pairs in alphabetical order - List keys = new ArrayList<>(beforeValues.keySet()); - Collections.sort(keys); - for (String key : keys) - { - sb.append(key).append(":").append(afterValues.get(key)); - } - sb.append(";"); - return sb.toString(); - } - - - public ObjectStatements.ObjectChange.Builder buildObjectChange() - { - ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder(); - builder.putAllBefore(beforeValues); - builder.putAllAfter(afterValues); - builder.setObjectId(objectID.toLong()); - return builder; - } + this.objectID = objectID; + this.beforeValues = beforeValues; + this.afterValues = afterValues; } + + public static Change createFromObjectChange(ObjectStatements.ObjectChange change) + { + return new Change(new ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap()); + } + + public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after) + { + Map beforeValues = new HashMap<>(); + Map afterValues = new HashMap<>(); + for (String key : after.getValuesMap().keySet()) + { + String beforeValue = before.getValuesOrDefault(key, ""); + String afterValue = after.getValuesOrDefault(key, ""); + if (!afterValue.equals(beforeValue)) + { + beforeValues.put(key, beforeValue); + afterValues.put(key, afterValue); + } + } + if (!afterValues.isEmpty()) + { + return new Change(new ObjectID(after.getObjectId()), beforeValues, afterValues); + } + return null; + } + + public String toString() + { + StringBuilder sb = new StringBuilder(); + sb.append(objectID).append(";"); // The object ID, then ; + // now all key-value pairs in alphabetical order + Set keySet = new HashSet<>(beforeValues.keySet()); + keySet.addAll(afterValues.keySet()); + for (String key : keySet.stream().sorted().toList()) + { + sb.append(key).append(":") + .append(beforeValues.getOrDefault(key,"")) + .append("->") + .append(afterValues.getOrDefault(key,"")) + .append(","); + } + return sb.toString(); + } + + + public ObjectStatements.ObjectChange.Builder buildObjectChange() + { + ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder(); + builder.putAllBefore(Util.mapWithoutNullValues(beforeValues)); + builder.putAllAfter(Util.mapWithoutNullValues(afterValues)); + builder.setObjectId(objectID.toLong()); + return builder; + } + + public ObjectID objectID() + { + return objectID; + } + + public Map beforeValues() + { + return beforeValues; + } + + public Map afterValues() + { + return afterValues; + } + + @Override + public boolean equals(Object obj) + { + if (obj == this) return true; + if (obj == null || obj.getClass() != this.getClass()) return false; + var that = (Change) obj; + return Objects.equals(this.objectID, that.objectID) && + Objects.equals(this.beforeValues, that.beforeValues) && + Objects.equals(this.afterValues, that.afterValues); + } + + @Override + public int hashCode() + { + return Objects.hash(objectID, beforeValues, afterValues); + } + + } + + public static List partiallySort(Set changes) + { + LinkedList ret = new LinkedList<>(); + + Map idMap = new HashMap<>(); + for (ObjectChangeRecord record : changes) + idMap.put(record.getChangeID(), record); + Set pointedIds = changes.stream() + .flatMap((ObjectChangeRecord objectChangeRecord1) -> objectChangeRecord1.getChangeHeads().stream()) + .collect(Collectors.toSet()); + Set group = changes.stream() + .filter(objectChangeRecord -> !pointedIds.contains(objectChangeRecord.getChangeID())) + .collect(Collectors.toSet()); + + while (!group.isEmpty()) + { + // add all members of the group to the start of the list + for (ObjectChangeRecord record : group) + { + ret.addFirst(record); + } + // replace the group with all the objects pointed to by the group + group = group.stream() + .flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()) + .map(idMap::get) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + } + + System.out.println("ObjectChangeRecord: Partially sorted changes: " + changes); + System.out.println("\tInto list: " + ret); + return ret; + + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java index 0ebefb9..3e1fe93 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java +++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java @@ -1,47 +1,77 @@ package moe.nekojimi.friendcloud; +import moe.nekojimi.friendcloud.network.ConnectionManager; import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.tasks.PropagateMessageTask; +import moe.nekojimi.friendcloud.tasks.PullChangesTask; -import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; -public class ObjectChangeTransaction implements Closeable +public class ObjectChangeTransaction implements AutoCloseable { private final ObjectID creator; private final ConnectionManager connectionManager; private final Map beforeStates = new HashMap<>(); + private static ObjectChangeTransaction currentTransaction = null; + private int openCount = 0; private boolean ended = false; - ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator) + private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator) { this.creator = creator; this.connectionManager = connectionManager; - } - public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer, ObjectID... objects) - { - ObjectChangeTransaction builder = new ObjectChangeTransaction(connectionManager, creatorPeer); - for (ObjectID id : objects) + System.out.println("ObjectChangeTransaction: opening transaction"); + + // attempt to pull changes from the network + Future future = Main.getInstance().getExecutor().submit(new PullChangesTask()); + try { - builder.addObjectBeforeChange(id); + future.get(10, TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) + { + // this is fine +// throw new RuntimeException(e); + } catch (ExecutionException e) + { + e.printStackTrace(System.err); + throw new RuntimeException(e); } - return builder; } - public ObjectChangeTransaction addObjectBeforeChange(ObjectID id) + public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer) { - NetworkObject object = Main.getInstance().getModel().getObject(id); - if (object != null) - beforeStates.put(id, object.buildObjectState().build()); - return this; + if (currentTransaction == null) + currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer); + currentTransaction.increaseOpenCount(); + return currentTransaction; + } + + private void increaseOpenCount() + { + openCount++; + } + + public void addObjectBeforeChange(NetworkObject object) + { + beforeStates.putIfAbsent(object.getObjectID(), object.buildObjectState().build()); + } + + public void addNewlyCreatedObject(NetworkObject object) + { + beforeStates.putIfAbsent(object.getObjectID(), ObjectStatements.ObjectState.getDefaultInstance()); } public ObjectChangeRecord endTransaction() @@ -53,21 +83,43 @@ public class ObjectChangeTransaction implements Closeable for (Map.Entry entry : beforeStates.entrySet()) { - ObjectStatements.ObjectState afterState = Main.getInstance().getModel().getObject(entry.getKey()).buildObjectState().build(); + NetworkObject object = Main.getInstance().getModel().getObject(entry.getKey()); + ObjectStatements.ObjectState afterState = object.buildObjectState().build(); ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState); - changes.add(change); + if (change != null) + { + Main.getInstance().getModel().objectChanged(object); + changes.add(change); + } } - return ObjectChangeRecord.createFromChanges(creator, changes); + if (changes.isEmpty()) + return null; + return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes); + } + + public void commit() + { + // TODO: make this actually perform the changes and mark the transaction as complete; if the transaction closes before commit() make it roll back the changes } @Override public void close() throws IOException { + openCount--; + if (openCount > 0) + return; // still open elsewhere // end the transaction and get the change object ObjectChangeRecord objectChangeRecord = endTransaction(); + currentTransaction = null; + if (objectChangeRecord == null) + { + System.out.println("ObjectChangeTransaction: closing transaction, no changes."); + return; + } + System.out.println("ObjectChangeTransaction: closing transaction, submitting change record " + Long.toHexString(objectChangeRecord.getChangeID())); // add the new change to the model - Main.getInstance().getModel().addChangeRecord(objectChangeRecord); + Main.getInstance().getModel().setCurrentChangeRecord(objectChangeRecord); // create a task to propagate the change to other peers Main.getInstance().getExecutor().submit(new PropagateMessageTask(objectChangeRecord.buildObjectChangeMessage().build())); } diff --git a/src/main/java/moe/nekojimi/friendcloud/SharedDirectory.java b/src/main/java/moe/nekojimi/friendcloud/SharedDirectory.java new file mode 100644 index 0000000..405ab1b --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/SharedDirectory.java @@ -0,0 +1,55 @@ +package moe.nekojimi.friendcloud; + +import moe.nekojimi.friendcloud.objects.ObjectID; +import moe.nekojimi.friendcloud.storage.Storable; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +public class SharedDirectory implements Storable +{ + private final long storageID; + private File directory; + private ObjectID networkFolderID; + + public SharedDirectory(long storageID) + { + this.storageID = storageID; + } + + @Override + public long getStorageID() + { + return storageID; + } + + @Override + public Map getStateMap() + { + String canonicalPath; + try + { + canonicalPath = directory.getCanonicalPath(); + } catch (IOException e) + { + throw new RuntimeException(e); + } + return Map.of("directory", canonicalPath, "networkFolderID", networkFolderID.toLong()); + } + + @Override + public void updateFromStateMap(Map map) + { + } + + public File getDirectory() + { + return directory; + } + + public ObjectID getNetworkFolderID() + { + return networkFolderID; + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java b/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java new file mode 100644 index 0000000..0dab24e --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/SharedFileManager.java @@ -0,0 +1,112 @@ +package moe.nekojimi.friendcloud; + +import engineering.swat.watch.Watch; +import moe.nekojimi.friendcloud.objects.NetworkFile; +import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.protos.ObjectStatements; + +import java.io.File; +import java.io.IOException; +import java.util.*; + +public class SharedFileManager +{ + private final List watches = new ArrayList<>(); + + public void addSharedFiles(Set files) + { + if (files.isEmpty()) + return; + Controller controller = Main.getInstance().getModel(); + try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID())) + { + List knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE)); + + for (NetworkObject knownFile: knownFiles) + { + NetworkFile f = (NetworkFile) knownFile; + boolean removed = files.remove(f.getLocalFile()); + if (removed) + System.out.println("Identified known local file " + f.getObjectID() + " = " + f.getLocalFile()); + } + + for (File sharedFile: files) + { + if (sharedFile.exists()) + { + System.out.println("Adding shared network file: " + sharedFile.getAbsolutePath()); + + NetworkFile networkFile = controller.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FILE); + transaction.addObjectBeforeChange(networkFile); + networkFile.updateFromLocalFile(sharedFile); + controller.objectChanged(networkFile); + } + } + } catch (IOException e) + { + throw new RuntimeException(e); + } + } + + public void scanForFileChanges() + { + List objects = Main.getInstance().getModel().listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE)); + scanForFileChanges(objects); + } + + public void scanForFileChanges(List objects) + { + // check every NetworkFile to see if any are different from our records + for (NetworkObject object: objects) + { + NetworkFile file = (NetworkFile) object; + if (!file.hasLocalFile()) + continue; + File localFile = file.getLocalFile(); + boolean changed = (localFile.length() != file.getSize()); + if (!changed) + { + Util.HashOutput hashOutput = Util.hashFile(localFile, file.getPieceSize()); + changed = ! Arrays.equals(hashOutput.totalDigest, file.getHash()); + } + } + } + + public void scanForDirectoryChanges() + { + List directories = Main.getInstance().getModel().getDataStore().getDAOForClass(SharedDirectory.class).getAll(); + for (SharedDirectory directory: directories) + { + for (File file : Objects.requireNonNull(directory.getDirectory().listFiles())) + { + + } + } + } + + public boolean startWatchingFiles() + { +// if (watchService == null) +// { +// try +// { +// watchService = FileSystems.getDefault().newWatchService(); +// } catch (IOException e) +// { +// e.printStackTrace(System.err); +// return false; +// } +// } + throw new UnsupportedOperationException("NYI"); + } + + private void filesChanged(Collection changed) + { + + } + + private void newFiles(Collection newFiles) + { + + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/Util.java b/src/main/java/moe/nekojimi/friendcloud/Util.java index f000a5f..5a74c3e 100644 --- a/src/main/java/moe/nekojimi/friendcloud/Util.java +++ b/src/main/java/moe/nekojimi/friendcloud/Util.java @@ -1,18 +1,27 @@ package moe.nekojimi.friendcloud; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; import java.nio.ByteBuffer; import java.nio.LongBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.*; +import java.util.concurrent.CompletableFuture; public class Util { public static long xorBytesToLong(byte[] bytes) { - ByteBuffer buf = ByteBuffer.wrap(bytes); + ByteBuffer buf = ByteBuffer.allocate(bytes.length); + buf.put(bytes); + buf.rewind(); LongBuffer longs = buf.asLongBuffer(); long ret = 0xBEEFCAFEF00DBABEL; - for (long l: longs.array()) + while (longs.hasRemaining()) { - ret = ret ^ l; + ret = ret ^ longs.get(); } return ret; } @@ -20,13 +29,98 @@ public class Util public static long unconditionalNumberToLong(Object number) { assert (number instanceof Number); - return ((Number)number).longValue(); + return ((Number) number).longValue(); + } + + public static HashOutput hashFile(File file) + { + return hashFile(file, 0x100000); + } + + public static HashOutput hashFile(File file, long pieceSize) + { + HashOutput ret = new HashOutput(); + System.out.println("Calculating hashes for file " + file.getName() + "(Piece size: " + pieceSize + ")"); + try (FileInputStream input = new FileInputStream(file)) + { + MessageDigest totalDigest = MessageDigest.getInstance("SHA-256"); + byte[] pieceBuf = new byte[Math.toIntExact(pieceSize)]; + int pieceIdx = 0; + while (true) + { + int bytesRead = input.read(pieceBuf); + if (bytesRead <= 0) + break; + + // check to see if this piece is just zeroes, if so, assume it's a missing piece + boolean allZero = true; + for (byte b: pieceBuf) + { + if (b != 0) + { + allZero = false; + break; + } + } + ret.pieces.set(pieceIdx, !allZero); + + MessageDigest pieceDigest = MessageDigest.getInstance("SHA-256"); + pieceDigest.update(pieceBuf, 0, bytesRead); + ret.pieceDigests.add(pieceDigest.digest()); + totalDigest.update(pieceBuf, 0, bytesRead); + + pieceIdx++; + } + ret.totalDigest = totalDigest.digest(); + System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest)); + long pieceCount = file.length() / pieceSize; + System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces."); + return ret; + } catch (NoSuchAlgorithmException | IOException e) + { + throw new RuntimeException(e); + } + } + + public static class HashOutput + { + public byte[] totalDigest; + public List pieceDigests = new ArrayList<>(); + public BitSet pieces = new BitSet(); } public static double unconditionalNumberToDouble(Object number) { assert (number instanceof Number); - return ((Number)number).doubleValue(); + return ((Number) number).doubleValue(); } -} + public static Map stringifyMap(Map map) + { + Map ret = new HashMap<>(); + for (Map.Entry e : map.entrySet()) + { + ret.put(e.getKey().toString(), e.getValue().toString()); + } + return ret; + } + + public static Map mapWithoutNullValues(Map map) + { + Map ret = new HashMap<>(); + for (Map.Entry e: map.entrySet()) + { + if (e.getKey() != null && e.getValue() != null) + ret.put(e.getKey(),e.getValue()); + } + return ret; + } + + public static CompletableFuture> collectFutures(Collection> futures) + { + // TODO: should handle timeouts / CancellationException + return CompletableFuture + .allOf(futures.toArray(new CompletableFuture[0])) + .thenApply(unused -> futures.stream().map(CompletableFuture::join).toList()); + } +} \ No newline at end of file diff --git a/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java b/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java index aeca1c3..a1bd1fd 100644 --- a/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java +++ b/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java @@ -12,7 +12,9 @@ import ru.serce.jnrfuse.FuseStubFS; import ru.serce.jnrfuse.struct.FileStat; import ru.serce.jnrfuse.struct.FuseFileInfo; +import java.io.IOException; import java.util.HashMap; +import java.util.List; import java.util.Map; public class FUSEAccess extends FuseStubFS @@ -27,14 +29,15 @@ public class FUSEAccess extends FuseStubFS @Override public int readdir(String path, Pointer buf, FuseFillDir filter, long offset, FuseFileInfo fi) { - System.out.println("FUSE: listing contents of directory " + path); + List networkFSNodes = Main.getInstance().getModel().listFSNodes(path); + System.out.println("FUSE: listing contents of directory " + path + ": " + networkFSNodes.size() + " nodes."); int ret = 0; filter.apply(buf, ".", null, 0); filter.apply(buf, "..", null, 0); // filter.apply(buf,"hello", null, 0); - for (NetworkFSNode fsNode : Main.getInstance().getModel().listFSNodes(path)) + for (NetworkFSNode fsNode : networkFSNodes) { filter.apply(buf, fsNode.getName(), null, 0); } @@ -42,6 +45,15 @@ public class FUSEAccess extends FuseStubFS return ret; } + @Override + public int mkdir(String path, long mode) + { + NetworkFolder networkFolder = Main.getInstance().getModel().makeFSFolder(path); + if (networkFolder == null) + return -ErrorCodes.EIO(); + return 0; + } + @Override public int getattr(String path, FileStat stat) { @@ -156,4 +168,25 @@ public class FUSEAccess extends FuseStubFS // System.out.println("FUSE: Read " + bytes.length + " bytes."); } } + + @Override + public int rename(String oldpath, String newpath) + { + NetworkFSNode fsNode = Main.getInstance().getModel().getFSNode(oldpath); + if (fsNode == null) + { + System.err.println("FUSE: Failed to rename file " + oldpath + ": not found"); + return -ErrorCodes.ENOENT(); + } + try + { + System.out.println("FUSE: Renaming " + oldpath + " to " + newpath); + Main.getInstance().getModel().renameFSNode(fsNode, newpath); + return 0; + } catch (Exception e) + { + e.printStackTrace(System.err); + return -ErrorCodes.EIO(); + } + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/network/ConnectionBackend.java b/src/main/java/moe/nekojimi/friendcloud/network/ConnectionBackend.java new file mode 100644 index 0000000..77caa88 --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/network/ConnectionBackend.java @@ -0,0 +1,70 @@ +package moe.nekojimi.friendcloud.network; + +import moe.nekojimi.friendcloud.objects.ObjectID; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +public abstract class ConnectionBackend extends Thread +{ + private final ConnectionManager connectionManager; + protected final String uriScheme; + abstract List getURIs(); + + public ConnectionBackend(@NotNull String name, @NotNull String uriScheme, ConnectionManager connectionManager) + { + super(name); + this.uriScheme = uriScheme; + this.connectionManager = connectionManager; + } + + @Override + public final void run() + { + super.run(); + + while(isListening()) + { + try + { + PeerConnection connection = getConnection(); + if (connection == null) + break; + connectionManager.receiveConnection(connection); + } + catch (InterruptedException ex) + { + + } + catch (IOException ex) + { + break; + } + } + } + + protected abstract ConnectionType makeConnection(URI uri, ObjectID peer); + + protected URI makeURI(String hostPart, int port) + { + try + { + return new URI(uriScheme, null, hostPart, port, null, null, null); + } catch (URISyntaxException e) + { + throw new RuntimeException(e); + } + } + protected abstract ConnectionType getConnection() throws IOException, InterruptedException; + protected abstract boolean isListening(); + + public String getUriScheme() + { + return uriScheme; + } + + public abstract void shutdown(); +} diff --git a/src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java b/src/main/java/moe/nekojimi/friendcloud/network/ConnectionManager.java similarity index 62% rename from src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java rename to src/main/java/moe/nekojimi/friendcloud/network/ConnectionManager.java index ab7e645..e86c32a 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/ConnectionManager.java @@ -1,60 +1,34 @@ -package moe.nekojimi.friendcloud; +package moe.nekojimi.friendcloud.network; -import moe.nekojimi.friendcloud.network.PeerConnection; -import moe.nekojimi.friendcloud.network.PeerTCPConnection; +import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.Peer; import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; import java.net.URI; import java.util.*; import java.util.function.Consumer; -public class ConnectionManager extends Thread +public class ConnectionManager { // private final Executor executor = new ThreadPoolExecutor() - //TODO: move the TCP stuff to it's own thread, which sends NodeTCPConnections to this thread - - private final ServerSocket serverSocket; - + private final Map> backends = new HashMap<>(); private final Set activeConnections = new HashSet<>(); - private final Set> newConnectionConsumers = new HashSet<>(); - public ConnectionManager(int portNumber) throws IOException + public ConnectionManager() { - serverSocket = new ServerSocket(portNumber); // serverSocket.bind(new InetSocketAddress()); } - @Override - public void run() + void receiveConnection(PeerConnection connection) { - super.run(); - while (!serverSocket.isClosed()) + activeConnections.add(connection); + connection.start(); + + for (Consumer consumer: newConnectionConsumers) { - try - { - Socket socket = serverSocket.accept(); - - System.out.println("TCP Connection Manager: accepted connection from " + socket.getRemoteSocketAddress()); - - PeerTCPConnection nodeTCPConnection = new PeerTCPConnection(socket); - activeConnections.add(nodeTCPConnection); - nodeTCPConnection.start(); - - for (Consumer consumer: newConnectionConsumers) - { - consumer.accept(nodeTCPConnection); - } - } catch (Exception e) - { - System.err.println("ConnectionManager experienced exception:" + e.getMessage()); - e.printStackTrace(System.err); - } + consumer.accept(connection); } - System.err.println("ConnectionManager: thread dying!"); } public PeerConnection getNodeConnection(URI uri) throws IOException @@ -71,14 +45,13 @@ public class ConnectionManager extends Thread return peerConnection; } - PeerConnection nodeConnection = null; - if (Objects.equals(uri.getScheme(), "tcp")) - { - nodeConnection = new PeerTCPConnection(uri, peer); - nodeConnection.start(); - } + ConnectionBackend backend = backends.get(uri.getScheme()); + PeerConnection nodeConnection = backend.makeConnection(uri, peer == null ? new ObjectID(0L) : peer.getObjectID()); if (nodeConnection != null) + { + nodeConnection.start(); activeConnections.add(nodeConnection); + } return nodeConnection; } @@ -89,7 +62,7 @@ public class ConnectionManager extends Thread System.out.println("ConnectionManager: trying to get connection to " + peer + " (have " + activeConnections.size() + " connections open)"); for (PeerConnection peerConnection: activeConnections) { - if (peerConnection.getNode().equals(peer)) + if (peerConnection.getPeerID() != null && peerConnection.getPeerID().equals(peer.getObjectID())) return peerConnection; } @@ -108,9 +81,10 @@ public class ConnectionManager extends Thread return null; } - public void shutdown() throws IOException + public void shutdown() { - serverSocket.close(); + for (ConnectionBackend backend : backends.values()) + backend.shutdown(); for (PeerConnection nc: activeConnections) { nc.shutdown(); @@ -131,6 +105,12 @@ public class ConnectionManager extends Thread activeConnections.removeAll(deadConnections); } + public void addConnectionBackend(ConnectionBackend backend) + { + backends.put(backend.getUriScheme(), backend); + backend.start(); + } + public void addNewConnectionConsumer(Consumer consumer) { newConnectionConsumers.add(consumer); @@ -141,4 +121,11 @@ public class ConnectionManager extends Thread newConnectionConsumers.remove(consumer); } + public List getURIs() + { + return backends.values().stream() + .filter(ConnectionBackend::isListening) + .flatMap(connectionBackend -> connectionBackend.getURIs().stream()) + .toList(); + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java index ed99ba7..e3765f0 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java @@ -1,12 +1,10 @@ package moe.nekojimi.friendcloud.network; -import com.google.protobuf.Any; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; +import com.google.protobuf.*; import moe.nekojimi.friendcloud.FilePieceAccess; import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.ObjectChangeRecord; import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.ObjectID; @@ -15,23 +13,24 @@ import moe.nekojimi.friendcloud.protos.CommonMessages; import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.network.requests.Request; import moe.nekojimi.friendcloud.protos.PieceMessages; +import moe.nekojimi.friendcloud.tasks.PullChangesTask; +import org.jetbrains.annotations.NotNull; import java.io.IOException; import java.net.URI; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CompletableFuture; public abstract class PeerConnection extends Thread { private final Map> pendingRequests = new HashMap<>(); - private Peer peer; + private ObjectID peerID = new ObjectID(0); private long nextMessageId = 1; private final URI uri; private long artificalDelayMs = 0; + private final Map> messageHandlers = new HashMap<>(); + public PeerConnection() { this(null); @@ -40,12 +39,13 @@ public abstract class PeerConnection extends Thread public PeerConnection(URI uri) { this.uri = uri; + installDefaultMessageHandlers(); } - public PeerConnection(URI uri, Peer peer) + public PeerConnection(URI uri, @NotNull ObjectID peerID) { this(uri); - this.peer = peer; + this.peerID = peerID; } @Override @@ -85,9 +85,10 @@ public abstract class PeerConnection extends Thread private CommonMessages.FriendCloudMessage wrapMessage(Message message, CommonMessages.MessageHeader inReplyTo) { + Peer localPeer = Main.getInstance().getModel().getLocalPeer(); CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder() .setMessageId(nextMessageId) - .setSenderId(Main.getInstance().getModel().getSelfPeer().getObjectID().toLong()); + .setSenderId(localPeer != null ? localPeer.getObjectID().toLong() : 0L); if (inReplyTo != null) headerBuilder.setReplyToMessageId(inReplyTo.getMessageId()); @@ -114,6 +115,10 @@ public abstract class PeerConnection extends Thread protected void messageReceived(@org.jetbrains.annotations.NotNull CommonMessages.FriendCloudMessage message) { CommonMessages.MessageHeader header = message.getHeader(); + Any body = message.getBody(); + long replyToMessageId = header.getReplyToMessageId(); + ObjectID senderID = new ObjectID(header.getSenderId()); + System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId); try { try @@ -130,21 +135,20 @@ public abstract class PeerConnection extends Thread } } - ObjectID senderID = new ObjectID(header.getSenderId()); - if (peer == null) - peer = Main.getInstance().getModel().getOrCreateObject(senderID); - else + if (!senderID.isNull()) { - if (!senderID.equals(peer.getObjectID())) - throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU); + if (peerID.isNull()) + { + System.out.println("PeerConnection: Identified sender as " + senderID); + peerID = senderID; + } + else + { + if (!senderID.equals(peerID)) + throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU); + } } - Any body = message.getBody(); - - long replyToMessageId = header.getReplyToMessageId(); - System.out.println("Received message! type=" + body.getTypeUrl() + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId); - - if (replyToMessageId != 0) { if (pendingRequests.containsKey(replyToMessageId)) @@ -183,61 +187,21 @@ public abstract class PeerConnection extends Thread throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name()); } + @SuppressWarnings({"rawtypes", "unchecked"}) private void handleUnsolicitedMessage(CommonMessages.MessageHeader header, Any body) throws IOException, ReplyWithErrorException { - if (body.is(ObjectStatements.ObjectListRequest.class)) + String typeUrl = body.getTypeUrl(); + if (messageHandlers.containsKey(typeUrl)) { - ObjectStatements.ObjectListRequest objectListRequest = body.unpack(ObjectStatements.ObjectListRequest.class); - List objects = Main.getInstance().getModel().listObjects(new HashSet<>(objectListRequest.getTypesList())); - - ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder(); - for (NetworkObject object : objects) - { - objectList.addStates(object.buildObjectState()); - } -// System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList()); - sendMessage(wrapMessage(objectList.build(), header)); + MessageHandler handler = messageHandlers.get(typeUrl); + assert (body.is(handler.clazz)); + Message unpack = body.unpack((Class) handler.clazz); + handler.handle(header, unpack); } - else if (body.is(PieceMessages.FilePiecesRequestMessage.class)) + else { - PieceMessages.FilePiecesRequestMessage filePiecesRequestMessage = body.unpack(PieceMessages.FilePiecesRequestMessage.class); - if (filePiecesRequestMessage.getPieceMod() == 0) - { - replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header); - } - - NetworkFile networkFile = (NetworkFile) Main.getInstance().getModel().getObject(new ObjectID(filePiecesRequestMessage.getFileId())); - if (networkFile == null) - { - replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header); - } - assert networkFile != null; - try (FilePieceAccess filePieceAccess = new FilePieceAccess(networkFile)) - { - int startIndex = filePiecesRequestMessage.getStartPieceIndex(); - int endIndex = (filePiecesRequestMessage.getStartPieceIndex() + filePiecesRequestMessage.getPieceCount()) - 1; - System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex); - for (int index = startIndex; index <= endIndex; index += filePiecesRequestMessage.getPieceMod()) - { - byte[] buffer = filePieceAccess.readPiece(index); - if (buffer != null) - { - System.out.println("Replying to file piece request with piece " + index); - PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder() - .setPieceIndex(index) - .setFileId(networkFile.getObjectID().toLong()) - .setData(ByteString.copyFrom(buffer)) - .build(); - sendMessage(wrapMessage(filePieceMessage, header)); - } - else - { - System.err.println("Don't have requested piece " + index + "!"); - replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header); - break; - } - } - } + System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!"); + replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header); } } @@ -251,15 +215,162 @@ public abstract class PeerConnection extends Thread pendingRequests.remove(replyToMessageId); } - public abstract void shutdown() throws IOException; + public abstract void shutdown(); - public synchronized Peer getNode() + public ObjectID getPeerID() { - return peer; + return peerID; } public synchronized URI getUri() { return uri; } + + public void installMessageHandler(MessageHandler handler) + { + String typeUrl = "type.googleapis.com/" + Internal.getDefaultInstance(handler.clazz).getDescriptorForType().getFullName(); + messageHandlers.put(typeUrl, handler); +// System.out.println("PeerConnection: Installed message handler for type " + typeUrl); + } + + private void installDefaultMessageHandlers() + { + installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class) + { + @Override + protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException + { + List objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList())); + + ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder(); + for (NetworkObject object : objects) + { + objectList.addStates(object.buildObjectState()); + } + +// System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList()); + sendMessage(wrapMessage(objectList.build(), header)); + } + }); + installMessageHandler(new MessageHandler<>(PieceMessages.FilePiecesRequestMessage.class) + { + @Override + protected void handle(CommonMessages.MessageHeader header, PieceMessages.FilePiecesRequestMessage message) throws IOException + { + if (message.getPieceMod() == 0) + { + replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header); + } + + NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId())); + if (networkFile == null) + { + replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header); + } + assert networkFile != null; + try (FilePieceAccess filePieceAccess = new FilePieceAccess(networkFile, FilePieceAccess.OpenMode.READ_ONLY)) + { + int startIndex = message.getStartPieceIndex(); + int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1; + System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex); + List indices = new ArrayList<>(); + for (int index = startIndex; index <= endIndex; index += message.getPieceMod()) + { + indices.add((long) index); + } + CommonMessages.MultiObjectConfirmationMessage multiObjectConfirmationMessage = CommonMessages.MultiObjectConfirmationMessage.newBuilder().addAllExpectedReturnId(indices).build(); + sendMessage(wrapMessage(multiObjectConfirmationMessage, header)); + for (Long index : indices) + { + byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index)); + if (buffer != null) + { + System.out.println("Replying to file piece request with piece " + index); + PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder() + .setPieceIndex(Math.toIntExact(index)) + .setFileId(networkFile.getObjectID().toLong()) + .setData(ByteString.copyFrom(buffer)) + .build(); + sendMessage(wrapMessage(filePieceMessage, header)); + } + else + { + System.err.println("Don't have requested piece " + index + "!"); + replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header); + break; + } + } + } + } + }); + installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectChangeRequest.class) + { + @Override + protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException + { + List changesSinceList = message.getChangesSinceList(); + System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString)); + Set changes = Main.getInstance().getModel().findChangesSince(changesSinceList); + if (changes == null) + { + replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header); + } + else + { + ObjectStatements.ObjectChangeListMessage.Builder reply = ObjectStatements.ObjectChangeListMessage.newBuilder(); + for (ObjectChangeRecord change : changes) + { + reply.addChangeMessages(change.buildObjectChangeMessage()); + } + System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes"); + sendMessage(wrapMessage(reply.build(), header)); + } + } + }); + installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectChangeMessage.class) + { + @Override + protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message) + { + ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message); + Main.getInstance().getModel().applyChangeRecord(record); + } + }); + installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class) + { + @Override + protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message) + { + List remoteChangeHeads = message.getCurrentChangeHeadsList(); + boolean potentialNewChanges = false; + for (long remoteChangeHead : remoteChangeHeads) + { + boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead); + if (!exists) + { + potentialNewChanges = true; + break; + } + } + if (potentialNewChanges) + { + PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID))); + Main.getInstance().getExecutor().submit(task); + } + } + }); + } + + public abstract static class MessageHandler + { + private final Class clazz; + + public MessageHandler(Class clazz) + { + this.clazz = clazz; + } + + protected abstract void handle(CommonMessages.MessageHeader header, MessageType message) throws IOException; + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java b/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java new file mode 100644 index 0000000..a3c629a --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/network/TCPConnectionBackend.java @@ -0,0 +1,125 @@ +package moe.nekojimi.friendcloud.network; + +import com.offbynull.portmapper.PortMapperFactory; +import com.offbynull.portmapper.gateway.Bus; +import com.offbynull.portmapper.gateway.Gateway; +import com.offbynull.portmapper.gateways.network.NetworkGateway; +import com.offbynull.portmapper.gateways.process.ProcessGateway; +import com.offbynull.portmapper.mapper.MappedPort; +import com.offbynull.portmapper.mapper.PortMapper; +import com.offbynull.portmapper.mapper.PortType; +import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.objects.ObjectID; +import moe.nekojimi.friendcloud.objects.Peer; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +public class TCPConnectionBackend extends ConnectionBackend +{ + private final ServerSocket serverSocket; + private MappedPort mappedPort; + + @Override + List getURIs() + { + List ret = new ArrayList<>(); + ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort())); + ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort())); + if (mappedPort != null) + { + ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort())); + ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort())); + } + return ret; + } + + public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException + { + super("TCP Listen Thread", "tcp", connectionManager); + serverSocket = new ServerSocket(port); + +// setupIGP(port); + } + + private void setupIGP(int port) + { + try + { // Start gateways + Gateway network = NetworkGateway.create(); + Gateway process = ProcessGateway.create(); + Bus networkBus = network.getBus(); + Bus processBus = process.getBus(); + + // Discover port forwarding devices and take the first one found + System.out.println("Discovering port mappers..."); + List mappers = PortMapperFactory.discover(networkBus, processBus); + ; + PortMapper mapper = mappers.getFirst(); + System.out.println("Got mapper " + mapper + ", mapping port..."); + + mappedPort = mapper.mapPort(PortType.TCP, port, port, 3600); + System.out.println("Port mapping added: " + mappedPort); + + long refreshDelay = (long) (mappedPort.getLifetime() * 0.9); + Main.getInstance().getExecutor().scheduleWithFixedDelay(() -> { + try + { + System.out.println("Refreshing UPnP port mapping."); + mapper.refreshPort(mappedPort, mappedPort.getLifetime()); + } catch (InterruptedException e) + { + e.printStackTrace(System.err); + } + }, refreshDelay, refreshDelay, TimeUnit.SECONDS); + + } catch (InterruptedException | IllegalStateException ignored) + { + } + } + + @Override + protected TCPPeerConnection makeConnection(URI uri, ObjectID peer) + { + try + { + return new TCPPeerConnection(uri, peer); + } catch (IOException e) + { + System.out.println("TCPConnectionBackend: failed to connect to " + uri + ": " + e.getMessage()); +// e.printStackTrace(System.err); + return null; + } + } + + @Override + protected TCPPeerConnection getConnection() throws IOException + { + Socket socket = serverSocket.accept(); + return new TCPPeerConnection(socket); + } + + @Override + protected boolean isListening() + { + return !serverSocket.isClosed(); + } + + @Override + public void shutdown() + { + try + { + serverSocket.close(); + } catch (IOException e) + { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java similarity index 73% rename from src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java rename to src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java index 8abe51a..04a3f0c 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/TCPPeerConnection.java @@ -1,6 +1,6 @@ package moe.nekojimi.friendcloud.network; -import moe.nekojimi.friendcloud.objects.Peer; +import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.CommonMessages; import java.io.IOException; @@ -9,19 +9,19 @@ import java.io.OutputStream; import java.net.Socket; import java.net.URI; -public class PeerTCPConnection extends PeerConnection +public class TCPPeerConnection extends PeerConnection { private final Socket socket; private final int keepAliveTimeS = 300; - public PeerTCPConnection(URI tcpURL, Peer peer) throws IOException + public TCPPeerConnection(URI tcpURL, ObjectID peer) throws IOException { super(tcpURL, peer); socket = new Socket(tcpURL.getHost(), tcpURL.getPort()); System.out.println("TCP Connection: connected to " + tcpURL + " OK!"); } - public PeerTCPConnection(Socket openSocket) + public TCPPeerConnection(Socket openSocket) { super(); socket = openSocket; @@ -42,8 +42,6 @@ public class PeerTCPConnection extends PeerConnection if (message != null) { - System.out.println("TCP Connection: read data"); - messageReceived(message); } } @@ -60,14 +58,21 @@ public class PeerTCPConnection extends PeerConnection protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException { OutputStream outputStream = socket.getOutputStream(); - System.out.println("Sending message " + message.getHeader().getMessageId()); + System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl()); message.writeDelimitedTo(outputStream); outputStream.flush(); } @Override - public synchronized void shutdown() throws IOException + public synchronized void shutdown() { - socket.close(); + try + { + socket.close(); + } catch (IOException e) + { + System.err.println("TCPPeerConnection: failed to shut down!"); + e.printStackTrace(System.err); + } } } diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java index 14e3ea3..b1477ee 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java @@ -4,13 +4,15 @@ import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import moe.nekojimi.friendcloud.FilePieceAccess; import moe.nekojimi.friendcloud.objects.NetworkFile; +import moe.nekojimi.friendcloud.protos.CommonMessages; import moe.nekojimi.friendcloud.protos.PieceMessages; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; -public class FilePiecesRequest extends Request> +public class FilePiecesRequest extends Request> { private final NetworkFile file; private final int startPiece; @@ -18,9 +20,9 @@ public class FilePiecesRequest extends Request expectedPieces = new ArrayList<>(); - private final List receivedPieces = new ArrayList<>(); +// private int expectedPieceCount = 0; + private Set expectedPieces = new HashSet<>(); + private final Set receivedPieces = new HashSet<>(); public FilePiecesRequest(NetworkFile file, int startPiece, int pieceCount, int pieceMod) { @@ -33,7 +35,7 @@ public class FilePiecesRequest extends Request> +public class ObjectChangeRequest extends Request> { private final Set changesSinceIDs; + private Set expectedIDs = new HashSet<>(); + private final Set receivedIDs = new HashSet<>(); + private final Set receivedRecords = new HashSet<>(); public ObjectChangeRequest(Set changesSinceIDs) { @@ -26,14 +30,35 @@ public class ObjectChangeRequest extends Request(msg.getExpectedReturnIdList()); + } + else if (reply.is(ObjectStatements.ObjectChangeListMessage.class)) + { + ObjectStatements.ObjectChangeListMessage msg = reply.unpack(ObjectStatements.ObjectChangeListMessage.class); + for (ObjectStatements.ObjectChangeMessage m : msg.getChangeMessagesList()) + { + ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(m); + receivedRecords.add(objectChangeRecord); + } + future.complete(receivedRecords); + return true; + } + + if (receivedIDs.equals(expectedIDs)) + { + future.complete(receivedRecords); return true; } diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java index d1bfe53..d8ed429 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java @@ -2,7 +2,6 @@ package moe.nekojimi.friendcloud.network.requests; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; -import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.ObjectStatements; @@ -44,10 +43,10 @@ public class ObjectListRequest extends Request public abstract MessageType buildMessage(); + /** + * Handle a message that was received in reply to the request message. + * @param reply the reply message. May be one of many. + * @return true if no further replies are expected (or able to be processed), false if more replies are coming. + * @throws InvalidProtocolBufferException if reply cannot be decoded. + */ public boolean handleReply(Any reply) throws InvalidProtocolBufferException { if (reply.is(CommonMessages.ErrorMessage.class)) { CommonMessages.ErrorMessage errorMessage = reply.unpack(CommonMessages.ErrorMessage.class); - future.completeExceptionally(new RuntimeException("Request received error response: " + errorMessage.getError().name())); + future.completeExceptionally(new RequestReceivedErrorException(errorMessage)); return true; } return false; diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/RequestReceivedErrorException.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/RequestReceivedErrorException.java new file mode 100644 index 0000000..83802bf --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/RequestReceivedErrorException.java @@ -0,0 +1,19 @@ +package moe.nekojimi.friendcloud.network.requests; + +import moe.nekojimi.friendcloud.protos.CommonMessages; + +public class RequestReceivedErrorException extends Exception +{ + private final CommonMessages.ErrorMessage errorMessage; + + public RequestReceivedErrorException(CommonMessages.ErrorMessage errorMessage) + { + super("Request received error:" + errorMessage.getError().name() + "; " + errorMessage.getText()); + this.errorMessage = errorMessage; + } + + public CommonMessages.ErrorMessage getErrorMessage() + { + return errorMessage; + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java index c9ec1d3..ce0c255 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFSNode.java @@ -17,16 +17,16 @@ public abstract class NetworkFSNode extends NetworkObject } @Override - public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state) + public synchronized void updateFromMessageMap(Map after, Map before) { - super.updateFromStateMessage(state); - if (state.containsValues("name")) - name = state.getValuesOrThrow("name"); - if (state.containsValues("parent")) + super.updateFromMessageMap(after,before); + if (after.containsKey("name")) + name = after.get("name"); + if (after.containsKey("parent")) { - long parentID = Long.parseLong(state.getValuesOrThrow("parent")); + long parentID = Long.parseLong(after.get("parent")); if (parentID != 0) - parent = (NetworkFolder) Main.getInstance().getModel().getObject(new ObjectID(parentID)); + parent = Main.getInstance().getModel().getObject(new ObjectID(parentID)); else parent = null; } @@ -36,7 +36,8 @@ public abstract class NetworkFSNode extends NetworkObject public ObjectStatements.ObjectState.Builder buildObjectState() { return super.buildObjectState() - .putValues("name", getName()); + .putValues("name", getName()) + .putValues("parent", parent != null ? Long.toString(parent.getStorageID()) : "0"); } @Override @@ -52,7 +53,7 @@ public abstract class NetworkFSNode extends NetworkObject public void updateFromStateMap(Map map) { name = map.get("name").toString(); - parent = (NetworkFolder) Main.getInstance().getModel().getObject(new ObjectID(((Number)map.get("parent")).longValue())); + parent = Main.getInstance().getModel().getObject(new ObjectID(((Number)map.get("parent")).longValue())); } public String getName() @@ -65,8 +66,24 @@ public abstract class NetworkFSNode extends NetworkObject this.name = name; } + @Override + public String getFriendlyName() + { + return getName(); + } + public String getNetworkPath() { return (parent != null ? parent.getNetworkPath() : "") + "/" + name; } + + public NetworkFolder getParent() + { + return parent; + } + + public void setParent(NetworkFolder newParent) + { + parent = newParent; + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFile.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFile.java index 9872ae1..2e90dfa 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFile.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFile.java @@ -5,11 +5,8 @@ import moe.nekojimi.friendcloud.Util; import moe.nekojimi.friendcloud.protos.ObjectStatements; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.nio.file.Files; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.*; import java.util.stream.Collectors; @@ -43,7 +40,6 @@ public class NetworkFile extends NetworkFSNode this.localFile = localFile; name = localFile.getName(); size = localFile.length(); - pieceSize = MIN_PIECE_SIZE; for (pieceSize = MIN_PIECE_SIZE; pieceSize < MAX_PIECE_SIZE; pieceSize *= 2) { long pieceCount = size / pieceSize; @@ -51,81 +47,34 @@ public class NetworkFile extends NetworkFSNode break; } - pieces = new BitSet(Math.toIntExact(getPieceCount())); + Util.HashOutput hashOutput = Util.hashFile(localFile, pieceSize); + pieces = hashOutput.pieces; + hash = hashOutput.totalDigest; - long offset = 0L; - System.out.println("Calculating hashes for file " + localFile.getName() + "(Piece size: " + pieceSize + ")"); - try (FileInputStream input = new FileInputStream(localFile)) + System.out.println(); + setLocalFile(localFile); + size = localFile.length(); + + if (pieces.cardinality() >= getPieceCount()) { - MessageDigest totalDigest = MessageDigest.getInstance("SHA-256"); - byte[] pieceBuf = new byte[Math.toIntExact(pieceSize)]; - int pieceIdx = 0; -// List pieces = new ArrayList<>(); - while(true) - { - int bytesRead = input.read(pieceBuf); - if (bytesRead <= 0) - break; - - // check to see if this piece is just zeroes, if so, assume it's a missing piece - boolean allZero = true; - for (byte b: pieceBuf) - { - if (b != 0) - { - allZero = false; - break; - } - } - pieces.set(pieceIdx, !allZero); - -// MessageDigest pieceDigest = MessageDigest.getInstance("SHA-256"); -// pieceDigest.update(pieceBuf, 0, bytesRead); -// byte[] pieceHash = pieceDigest.digest(); - totalDigest.update(pieceBuf, 0, bytesRead); - -// FilePiece piece = new FilePiece(pieceHash, bytesRead, localFile, offset); -// System.out.print(HexFormat.of().formatHex(pieceHash) + ", "); -// pieces.add(piece); - - pieceIdx++; - } - System.out.println(); - setLocalFile(localFile); - size = localFile.length(); -// setPieces(pieces); - setHash(totalDigest.digest()); - - System.out.println("Total hash: " + HexFormat.of().formatHex(hash)); - System.out.println("Have " + pieces.cardinality() + " of " + getPieceCount() + " pieces."); - - if (pieces.cardinality() >= getPieceCount()) - { - peersWithCopy.add(Main.getInstance().getModel().getSelfPeer().getObjectID()); - } - - } catch (NoSuchAlgorithmException e) - { - throw new RuntimeException(e); + peersWithCopy.add(Main.getInstance().getModel().getLocalPeer().getObjectID()); } } @Override - public void updateFromStateMessage(ObjectStatements.ObjectState state) + public synchronized void updateFromMessageMap(Map after, Map before) { - super.updateFromStateMessage(state); -// if (state.containsValues("path")) -// path = state.getValuesOrThrow("path"); - if (state.containsValues("size")) - size = Long.parseLong(state.getValuesOrThrow("size")); - if (state.containsValues("hash")) - hash = HexFormat.of().parseHex(state.getValuesOrThrow("hash")); - if (state.containsValues("pieceSize")) - pieceSize = Long.parseLong(state.getValuesOrThrow("pieceSize")); - if (state.containsValues("peersWithCopy")) + super.updateFromMessageMap(after, before); + if (after.containsKey("size")) + size = Long.parseLong(after.get("size")); + if (after.containsKey("hash")) + hash = HexFormat.of().parseHex(after.get("hash")); + if (after.containsKey("pieceSize")) + pieceSize = Long.parseLong(after.get("pieceSize")); + if (after.containsKey("peersWithCopy")) { peersWithCopy.clear(); - String[] peers = state.getValuesOrThrow("peersWithCopy").split(","); + String[] peers = after.get("peersWithCopy").split(","); for (String peer: peers) { peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16))); @@ -133,6 +82,7 @@ public class NetworkFile extends NetworkFSNode } } + @Override public ObjectStatements.ObjectState.Builder buildObjectState() { @@ -157,6 +107,7 @@ public class NetworkFile extends NetworkFSNode return ret; } + @SuppressWarnings("unchecked") @Override public void updateFromStateMap(Map map) { @@ -220,12 +171,7 @@ public class NetworkFile extends NetworkFSNode return pieceSize; } - private void setPieceSize(long pieceSize) - { - this.pieceSize = pieceSize; - } - -// public List getPieces() + // public List getPieces() // { // return pieces; // } @@ -240,11 +186,6 @@ public class NetworkFile extends NetworkFSNode return hash; } - private void setHash(byte[] hash) - { - this.hash = hash; - } - public int getPieceCount() { return Math.toIntExact(Math.ceilDiv(size, pieceSize)); @@ -338,6 +279,11 @@ public class NetworkFile extends NetworkFSNode } } + public boolean hasLocalFile() + { + return localFile != null; + } + public enum StorageType { /** The file will be stored as a complete file in the storage directory under it's own name and file path. */ diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java index 62e6b2d..7fd2a55 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java @@ -1,5 +1,6 @@ package moe.nekojimi.friendcloud.objects; +import moe.nekojimi.friendcloud.ObjectChangeRecord; import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.storage.Storable; import org.jetbrains.annotations.NotNull; @@ -17,6 +18,18 @@ public abstract class NetworkObject implements Storable, Comparable new Peer(objectID); + case OBJECT_TYPE_FILE -> new NetworkFile(objectID); + case OBJECT_TYPE_FOLDER -> new NetworkFolder(objectID); + default -> throw new IllegalArgumentException("Unrecognised object type!"); + }; + } + public ObjectID getObjectID() { return objectID; @@ -28,7 +41,6 @@ public abstract class NetworkObject implements Storable, Comparable getStateMap() { @@ -37,15 +49,31 @@ public abstract class NetworkObject implements Storable, Comparable currentValues) + { + updateFromMessageMap(currentValues,Map.of()); + } + + protected synchronized void updateFromMessageMap(Map currentValues, Map beforeValues) + { + + } + + public synchronized final void updateFromStateMessage(ObjectStatements.ObjectState state) { if (state.getObjectId() != objectID.toLong()) throw new IllegalArgumentException("Wrong object!"); + updateFromMessageMap(state.getValuesMap()); } - public synchronized ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectState a, ObjectStatements.ObjectState b) + public synchronized ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectChange a, ObjectStatements.ObjectChange b) { - return null; + throw new UnsupportedOperationException("NYI"); } public ObjectStatements.ObjectState.Builder buildObjectState() @@ -73,4 +101,12 @@ public abstract class NetworkObject implements Storable, Comparable return typePart | systemPart | uniquePart; } + public static ObjectID nullValue() + { + return new ObjectID(0L); + } + public moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType getType() { return type; @@ -72,4 +78,9 @@ public class ObjectID implements Comparable { return Long.compare(toLong(), objectID.toLong()); } + + public boolean isNull() + { + return type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED; + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java b/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java index 46d91c7..5702683 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java @@ -13,10 +13,6 @@ public class Peer extends NetworkObject private String userName = ""; private String systemName = ""; -// private Map fileStates = new HashMap<>(); - - private volatile int lastTriedAddressIdx = -1; - public Peer(ObjectID objectID) { super(objectID); @@ -27,20 +23,19 @@ public class Peer extends NetworkObject return userName + "@" + systemName; } - @Override - public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state) - { - super.updateFromStateMessage(state); + @Override + protected synchronized void updateFromMessageMap(Map after, Map before) + { + super.updateFromMessageMap(after, before); - Map values = state.getValuesMap(); - if (values.containsKey("userName")) - userName = values.get("userName"); - if (values.containsKey("systemName")) - systemName = values.get("systemName"); - if (values.containsKey("addresses")) + if (after.containsKey("userName")) + userName = after.get("userName"); + if (after.containsKey("systemName")) + systemName = after.get("systemName"); + if (after.containsKey("addresses")) { addresses.clear(); - String[] split = values.get("addresses").split(","); + String[] split = after.get("addresses").split(","); for (String s: split) { try @@ -52,8 +47,7 @@ public class Peer extends NetworkObject } } } -// if (values.containsKey("files")) - } + } @Override public ObjectStatements.ObjectState.Builder buildObjectState() @@ -104,6 +98,12 @@ public class Peer extends NetworkObject // return fileStates; // } + public void setAddresses(Collection urIs) + { + addresses.clear(); + addresses.addAll(urIs); + } + public SortedSet getAddresses() { return addresses; @@ -128,4 +128,10 @@ public class Peer extends NetworkObject { this.systemName = systemName; } + + @Override + public String getFriendlyName() + { + return getNodeName(); + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java b/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java deleted file mode 100644 index 56d61ad..0000000 --- a/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java +++ /dev/null @@ -1,92 +0,0 @@ -package moe.nekojimi.friendcloud.objects; - -import moe.nekojimi.friendcloud.Util; -import moe.nekojimi.friendcloud.protos.ObjectStatements; - -import java.util.Map; - -public class PeerFileState extends NetworkObject -{ - private ObjectID peerID; - private ObjectID fileID; - - private double progress = 0; - - public PeerFileState(ObjectID objectID) - { - super(objectID); - } - - @Override - public void updateFromStateMessage(ObjectStatements.ObjectState state) - { - super.updateFromStateMessage(state); - peerID = new ObjectID(Long.parseLong(state.getValuesOrThrow("peer"))); - fileID = new ObjectID(Long.parseLong(state.getValuesOrThrow("file"))); - if (state.containsValues("progress")) - progress = Double.parseDouble(state.getValuesOrThrow("progress")); - } - - @Override - public ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectState a, ObjectStatements.ObjectState b) - { - return super.mergeChanges(a, b); - } - - @Override - public ObjectStatements.ObjectState.Builder buildObjectState() - { - return super.buildObjectState() -// .putValues("peer", Long.toString(peer.getObjectID().toLong())) -// .putValues("file", Long.toString(file.getObjectID().toLong())) - .putValues("progress", Double.toString(progress)); - } - -// public void setNode(Peer peer) -// { -// this.peer = peer; -// } -// -// public void setFile(NetworkFile file) -// { -// this.file = file; -// } - - public double getProgress() - { - return progress; - } - - public void setProgress(double progress) - { - this.progress = progress; - } - -// public NetworkFile getFile() -// { -// return file; -// } -// -// public Peer getNode() -// { -// return peer; -// } - - @Override - public Map getStateMap() - { - Map ret = super.getStateMap(); -// ret.put("peer", peer == null ? 0L : peer.getObjectID().toLong()); -// ret.put("file", file == null? 0L : file.getObjectID().toLong()); - ret.put("progress", progress); - return ret; - } - - @Override - public void updateFromStateMap(Map map) - { -// peer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("peer")))); -// file = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("file")))); - progress = Util.unconditionalNumberToDouble( map.get("progress")); - } -} diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/CachingDataStore.java b/src/main/java/moe/nekojimi/friendcloud/storage/CachingDataStore.java index 6beb1ba..9ddc152 100644 --- a/src/main/java/moe/nekojimi/friendcloud/storage/CachingDataStore.java +++ b/src/main/java/moe/nekojimi/friendcloud/storage/CachingDataStore.java @@ -1,6 +1,5 @@ package moe.nekojimi.friendcloud.storage; -import java.lang.reflect.Modifier; import java.util.*; public class CachingDataStore extends DataStore @@ -16,6 +15,7 @@ public class CachingDataStore extends DataStore public synchronized DAO getDAOForClass(Class clazz) { if (daos.containsKey(clazz)) + //noinspection unchecked return (DAO) daos.get(clazz); else { @@ -25,29 +25,41 @@ public class CachingDataStore extends DataStore } } + @Override + public void clear() + { + daos.clear(); + backend.clear(); + } + @Override public FSNodeDAO getFSDAO() { return backend.getFSDAO(); } - private Map, CachingDAO> daos = new HashMap<>(); + private final Map, CachingDAO> daos = new HashMap<>(); public class CachingDAO implements DAO { private final DAO backendDao; - private WeakHashMap cache = new WeakHashMap<>(); + private final WeakHashMap cache = new WeakHashMap<>(); public CachingDAO(Class clazz) { this.backendDao = backend.getDAOForClass(clazz); } + @Override + public List list() + { + return backendDao.list(); + } + @Override public synchronized List getAll() { List ret = new ArrayList<>(); - ret.addAll(cache.values()); for (T t : backendDao.getAll()) { if (!cache.containsKey(t.getStorageID())) @@ -55,6 +67,10 @@ public class CachingDataStore extends DataStore ret.add(t); cache.put(t.getStorageID(), t); } + else + { + ret.add(cache.get(t.getStorageID())); + } } return ret; } @@ -76,9 +92,9 @@ public class CachingDataStore extends DataStore @Override public T get(long id) { - T t = backendDao.get(id); - cache.put(id, t); - return t; + if (!cache.containsKey(id)) + cache.put(id, backendDao.get(id)); + return cache.get(id); } @Override diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/DataStore.java b/src/main/java/moe/nekojimi/friendcloud/storage/DataStore.java index 2b33099..b2dc902 100644 --- a/src/main/java/moe/nekojimi/friendcloud/storage/DataStore.java +++ b/src/main/java/moe/nekojimi/friendcloud/storage/DataStore.java @@ -3,7 +3,6 @@ package moe.nekojimi.friendcloud.storage; import moe.nekojimi.friendcloud.objects.NetworkFSNode; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -12,10 +11,13 @@ public abstract class DataStore public abstract DAO getDAOForClass(Class clazz); public abstract FSNodeDAO getFSDAO(); + public abstract void clear(); + public interface DAO { + default boolean exists(long id) {return list().contains(id);} + default List list() {return getAll().stream().map(Storable::getStorageID).toList();} List getAll(); - boolean exists(long id); T create(long id); T get(long id); default T getOrCreate(long id) @@ -47,6 +49,15 @@ public abstract class DataStore this.subclasses = Set.of(subclasses); } + @Override + public List list() + { + List ret = new ArrayList<>(); + for (Class subclass : subclasses) + ret.addAll(getDAOForClass(subclass).list()); + return ret; + } + @Override public List getAll() { diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java b/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java index 41cc4fd..b8f6820 100644 --- a/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java +++ b/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java @@ -64,6 +64,6 @@ public class LocalData implements Storable localPeer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.getOrDefault("localPeer",0)))); currentChangeRecord = Main.getInstance().getModel().getChangeRecord(Util.unconditionalNumberToLong(map.getOrDefault("currentChangeRecord",0))); systemID = (int) map.getOrDefault("systemID", 0); - System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + currentChangeRecord + ", systemID=" + Integer.toHexString(systemID)); + System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + Long.toHexString(currentChangeRecord.getChangeID()) + ", systemID=" + Integer.toHexString(systemID)); } } diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/Model.java b/src/main/java/moe/nekojimi/friendcloud/storage/Model.java deleted file mode 100644 index 9d982a9..0000000 --- a/src/main/java/moe/nekojimi/friendcloud/storage/Model.java +++ /dev/null @@ -1,202 +0,0 @@ -package moe.nekojimi.friendcloud.storage; - -import moe.nekojimi.friendcloud.ObjectChangeRecord; -import moe.nekojimi.friendcloud.objects.*; -import moe.nekojimi.friendcloud.protos.ObjectStatements; - -import java.util.*; -import java.util.stream.Collectors; - -public class Model -{ - - private final CachingDataStore dataStore; - private LocalData localData; - - public Model(DataStore dataStore) - { - this.dataStore = new CachingDataStore(dataStore); - } - - public synchronized void init() - { - - List localDataList = dataStore.getDAOForClass(LocalData.class).getAll(); - if (localDataList.isEmpty()) - { - localData = dataStore.getDAOForClass(LocalData.class).create(0); - } - else if (localDataList.size() == 1) - { - localData = localDataList.getFirst(); - } - else - { - throw new IllegalStateException("We have more than one LocalData somehow!!"); - } - if (localData.getSystemID() == 0) - { - Random ran = new Random(); - localData.setSystemID(ran.nextInt() & 0x00FFFFFF); - objectChanged(localData); - } - } - - public synchronized Peer getSelfPeer() - { - if (localData.getLocalPeer() == null) - { - localData.setLocalPeer(createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER)); - objectChanged(localData); - } - return localData.getLocalPeer(); - } - // private Map nodes = new HashMap<>(); - - public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type) - { - Random ran = new Random(); - int randomNumber = ran.nextInt(); - ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber); - System.out.println("Assigned new object ID: " + objectID); - return objectID; - } - - public static Class getNetworkObjectClassByType(ObjectStatements.ObjectType type) - { - return switch (type) - { - case OBJECT_TYPE_FILE -> NetworkFile.class; - case OBJECT_TYPE_FOLDER -> NetworkFolder.class; - case OBJECT_TYPE_PEER -> Peer.class; - case OBJECT_TYPE_PEER_FILE_STATE -> PeerFileState.class; - case OBJECT_TYPE_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("???"); - default -> throw new UnsupportedOperationException("NYI"); - }; - } - - public synchronized T createObjectByID(ObjectID id) - { - if (id.toLong() == 0) - throw new IllegalArgumentException("Cannot create an object with ID=0!"); - ObjectStatements.ObjectType type = id.getType(); - System.out.println("Creating new object with type: " + type.name()); - if (type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED) - throw new IllegalArgumentException(); - T ret = (T) dataStore.getDAOForClass(getNetworkObjectClassByType(type)).create(id.toLong()); - return ret; - } - - public synchronized T createObjectByType(ObjectStatements.ObjectType type) - { - return createObjectByID(getNextObjectID(type)); - } - - public synchronized T getObject(ObjectID id) - { - if (id.toLong() == 0) - return null; - Class clazz = (Class) getNetworkObjectClassByType(id.getType()); - return dataStore.getDAOForClass(clazz).get(id.toLong()); - } - - public synchronized T getOrCreateObject(ObjectID id) - { - if (id.toLong() == 0) - return null; - Class clazz = (Class) getNetworkObjectClassByType(id.getType()); - return dataStore.getDAOForClass(clazz).getOrCreate(id.toLong()); - } - - public synchronized List listObjects(Set types) - { - List ret = new ArrayList<>(); - Set> classes = types.stream().map(Model::getNetworkObjectClassByType).collect(Collectors.toSet()); - for (Class clazz: classes) - { - List list = dataStore.getDAOForClass(clazz).getAll(); - ret.addAll(list); - } - return ret; - } - - public synchronized List listFSNodes(String path) - { - //TODO: dumbest algorithm in the world - - List ret = new ArrayList<>(); - for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER))) - { - NetworkFSNode fsNode = (NetworkFSNode) object; - String networkPath = fsNode.getNetworkPath(); - if (networkPath.substring(0, networkPath.lastIndexOf("/")+1).equals(path)) - ret.add(fsNode); - } - return ret; - } - - public synchronized NetworkFSNode getFSNode(String path) - { - for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER))) - { - NetworkFSNode fsNode = (NetworkFSNode) object; - String networkPath = fsNode.getNetworkPath(); - if (networkPath.equals(path)) - return fsNode; - } - return null; - } - - public synchronized void addChangeRecord(ObjectChangeRecord record) - { - dataStore.getDAOForClass(ObjectChangeRecord.class).update(record); - } - - public ObjectChangeRecord getChangeRecord(long id) - { - if (id == 0) - return null; - return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id); - } - - public void applyChangeRecord(ObjectChangeRecord record) - { - if (!record.getChangeHeads().contains(localData.getCurrentChangeRecord().getChangeID())) - throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord().getChangeID()); - addChangeRecord(record); - - -// if (record == null) -// throw new IllegalArgumentException("Cannot apply unknown change!"); - } - - public Set getChangeHeads() - { - // stupid algorithm - start with all of the changes, then remove the ones that are referenced by something - // TODO: better algorithm - Set ret = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll()); -// for (ObjectChangeRecord record : changeRecords.values()) -// { -// throw new UnsupportedOperationException("NYI"); -// } - throw new UnsupportedOperationException("NYI"); - } - - public Set listOtherPeers() - { - Set ret = new HashSet<>(); - for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER))) - { - Peer peer = (Peer) object; - if (peer != getSelfPeer()) - ret.add(peer); - } - return ret; - } - - public void objectChanged(T storable) - { - Class clazz = (Class) storable.getClass(); - dataStore.getDAOForClass(clazz).update(storable); - } -} diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java b/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java index 4ea043f..42feebc 100644 --- a/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java +++ b/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java @@ -1,6 +1,7 @@ package moe.nekojimi.friendcloud.storage; import moe.nekojimi.friendcloud.ObjectChangeRecord; +import moe.nekojimi.friendcloud.Util; import moe.nekojimi.friendcloud.objects.*; import org.jetbrains.annotations.NotNull; import org.json.JSONArray; @@ -31,6 +32,14 @@ public class StupidJSONFileStore extends DataStore } } + @Override + public void clear() + { + daos.clear(); + storageDirectory.delete(); + storageDirectory.mkdirs(); + } + @Override public DAO getDAOForClass(Class clazz) { @@ -44,8 +53,6 @@ public class StupidJSONFileStore extends DataStore ret = new NetworkFolderDAO(); else if (clazz.equals(Peer.class)) ret = new PeerDAO(); - else if (clazz.equals(PeerFileState.class)) - ret = new PeerFileStateDAO(); else if (clazz.equals(NetworkFSNode.class)) ret = new NetworkFSNodeDAO(); else if (clazz.equals(LocalData.class)) @@ -74,7 +81,8 @@ public class StupidJSONFileStore extends DataStore { File ret = new File(storageDirectory, getNamespace()); if (!ret.exists()) - ret.mkdir(); + ret.mkdirs(); + assert (ret.exists() && ret.isDirectory()); return ret; } protected abstract T makeBlank(long id); @@ -174,6 +182,20 @@ public class StupidJSONFileStore extends DataStore return file.exists(); } + @Override + public List list() + { + List ret = new ArrayList<>(); + // get all files in the storage directory + for (File file : Objects.requireNonNull(getNamespaceDirectory().listFiles())) + { + String name = file.getName(); + String nameWithoutExt = name.substring(0, name.indexOf('.')); + ret.add(Long.parseLong(nameWithoutExt, 16)); + } + return ret; + } + @Override public List getAll() { @@ -301,9 +323,8 @@ public class StupidJSONFileStore extends DataStore @Override protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException { - if (value instanceof URI) + if (value instanceof URI uri) { - URI uri = (URI) value; return new JSONObject().put("weirdObjectClass", URI.class.getCanonicalName()).put("uri",uri.toString()); } return super.serialiseWeirdObject(value); @@ -330,20 +351,6 @@ public class StupidJSONFileStore extends DataStore } } - private class PeerFileStateDAO extends NetworkObjectDAO - { - @Override - protected String getNamespace() - { - return super.getNamespace() + "/peerFileStates"; - } - @Override - protected PeerFileState makeBlank(long id) - { - return new PeerFileState(new ObjectID(id)); - } - } - private class LocalDataDAO extends JSONObjectDAO { @Override @@ -372,5 +379,35 @@ public class StupidJSONFileStore extends DataStore { return new ObjectChangeRecord(); } + + @Override + protected Object deserialiseWeirdObject(JSONObject json) + { + String weirdType = json.getString("weirdObjectClass"); +// Class weirdClass = Class.forName(weirdType); + if (weirdType.equals(ObjectChangeRecord.Change.class.getCanonicalName())) + { + return new ObjectChangeRecord.Change( + new ObjectID(json.getLong("objectID")), + Util.stringifyMap(json.getJSONObject("before").toMap()), + Util.stringifyMap(json.getJSONObject("after").toMap())); + } + return super.deserialiseWeirdObject(json); + } + + @Override + protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException + { + if (value instanceof ObjectChangeRecord.Change change) + { + JSONObject ret = new JSONObject(); + ret.put("weirdObjectClass", ObjectChangeRecord.Change.class.getCanonicalName()); + ret.put("objectID", change.objectID().toLong()); + ret.put("before", change.beforeValues()); + ret.put("after", change.afterValues()); + return ret; + } + return super.serialiseWeirdObject(value); + } } } diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java index 4584a4b..d0ce01c 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java @@ -1,9 +1,8 @@ package moe.nekojimi.friendcloud.tasks; -import moe.nekojimi.friendcloud.ConnectionManager; +import moe.nekojimi.friendcloud.network.ConnectionManager; import moe.nekojimi.friendcloud.Main; -import moe.nekojimi.friendcloud.NotificationManager; import moe.nekojimi.friendcloud.ObjectChangeTransaction; import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest; @@ -15,7 +14,6 @@ import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.*; -import java.util.stream.Collectors; public class FileDownloadTask implements RunnableFuture { @@ -64,7 +62,7 @@ public class FileDownloadTask implements RunnableFuture String connectionLine = ""; String progressLine = ""; - Peer selfPeer = Main.getInstance().getModel().getSelfPeer(); + Peer selfPeer = Main.getInstance().getModel().getLocalPeer(); while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done) { System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces."); @@ -122,11 +120,11 @@ public class FileDownloadTask implements RunnableFuture System.out.println("FileDownloadTask: Will download pieces from " + runStart + " to " + runEnd); // make one request per connectable peer, striping the needed pieces among them - List>> fileFutures = new ArrayList<>(); + List>> fileFutures = new ArrayList<>(); int offset = 0; for (PeerConnection connection : connections) { - CompletableFuture> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size())); + CompletableFuture> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size())); fileFutures.add(future); offset++; } @@ -134,12 +132,12 @@ public class FileDownloadTask implements RunnableFuture long timeout = timeoutPerPieceMs * (missingPieceIndices.size() / connections.size()); // wait for all the requests to complete - for (CompletableFuture> future : fileFutures) + for (CompletableFuture> future : fileFutures) { try { - List receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS); - receivedPieces.forEach(missingPieceIndices::remove); + Set receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS); + missingPieceIndices.removeAll(receivedPieces); } catch (InterruptedException e) { future.cancel(true); @@ -160,10 +158,10 @@ public class FileDownloadTask implements RunnableFuture { try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID())) { - transaction.addObjectBeforeChange(file.getObjectID()); + transaction.addObjectBeforeChange(file); file.addPeerWithCopy(selfPeer); } - catch (IOException ex) + catch (IOException ignored) { } diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java index 1858310..4427279 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java @@ -1,12 +1,19 @@ package moe.nekojimi.friendcloud.tasks; +import com.kstruct.gethostname4j.Hostname; import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.ObjectChangeTransaction; +import moe.nekojimi.friendcloud.network.ConnectionManager; import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.protos.ObjectStatements; +import moe.nekojimi.friendcloud.Controller; import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class JoinNetworkTask implements Runnable { @@ -14,25 +21,57 @@ public class JoinNetworkTask implements Runnable @Override public void run() { - // generate new peer ID - ObjectID peerID = null; - try (ObjectChangeTransaction builder = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), peerID)) + System.out.println("JoinNetworkTask: Joining the network!"); + Controller controller = Main.getInstance().getModel(); + + boolean firstJoin = false; + + ObjectID peerID; + if (controller.getLocalPeer() != null) + peerID = controller.getLocalPeer().getObjectID(); + else { - Peer selfPeer = Main.getInstance().getModel().getSelfPeer(); - if (selfPeer != null) - peerID = selfPeer.getObjectID(); - else - peerID = Main.getInstance().getModel().getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER); + peerID = controller.getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER); + firstJoin = true; + } - // synchronise with the network - SyncWithNetworkTask syncWithNetworkTask = new SyncWithNetworkTask(); - syncWithNetworkTask.run(); - - - - } catch (IOException e) + ConnectionManager connectionManager = Main.getInstance().getConnectionManager(); + if (firstJoin) { - throw new RuntimeException(e); + System.out.println("JoinNetworkTask: Performing first time setup..."); + // download the entire state + PullStateTask pullStateTask = new PullStateTask(); + Future future = Main.getInstance().getExecutor().submit(pullStateTask); + try + { + future.get(30, TimeUnit.SECONDS); + } catch (InterruptedException | TimeoutException e) + { +// throw new RuntimeException(e); + } catch (ExecutionException e) + { + throw new RuntimeException(e); + } + // create our local peer object + try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID)) + { + // create and submit our Peer object if it doesn't exist + Peer selfPeer = controller.getLocalData().getLocalPeer(); + if (selfPeer == null) + { + selfPeer = controller.createLocalPeer(peerID); + selfPeer.setUserName(System.getProperty("user.name")); + String hostname = Hostname.getHostname(); + selfPeer.setSystemName(hostname); + selfPeer.setAddresses(connectionManager.getURIs()); + controller.objectChanged(selfPeer); + controller.getLocalData().setLocalPeer(selfPeer); + transaction.addNewlyCreatedObject(selfPeer); + } + } catch (IOException e) + { + throw new RuntimeException(e); + } } } } diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java index 48a52f8..19b7205 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/PropagateMessageTask.java @@ -1,7 +1,7 @@ package moe.nekojimi.friendcloud.tasks; import com.google.protobuf.Message; -import moe.nekojimi.friendcloud.ConnectionManager; +import moe.nekojimi.friendcloud.network.ConnectionManager; import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.objects.Peer; @@ -21,17 +21,22 @@ public class PropagateMessageTask implements Runnable public void run() { ConnectionManager connectionManager = Main.getInstance().getConnectionManager(); + int messagesSent = 0; for (Peer peer: Main.getInstance().getModel().listOtherPeers()) { try { PeerConnection connection = connectionManager.getNodeConnection(peer); if (connection != null) + { connection.sendUnsolicitedMessage(message); + messagesSent++; + } } catch (IOException e) { throw new RuntimeException(e); } } + System.out.println("PropagateMessageTask: Sent " + message.getDescriptorForType().getFullName() + " to " + messagesSent + " peers."); } } diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/PullChangesTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/PullChangesTask.java new file mode 100644 index 0000000..dbb3585 --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/PullChangesTask.java @@ -0,0 +1,95 @@ +package moe.nekojimi.friendcloud.tasks; + +import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.ObjectChangeRecord; +import moe.nekojimi.friendcloud.Util; +import moe.nekojimi.friendcloud.network.PeerConnection; +import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest; +import moe.nekojimi.friendcloud.network.requests.RequestReceivedErrorException; +import moe.nekojimi.friendcloud.objects.Peer; +import moe.nekojimi.friendcloud.protos.CommonMessages; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +public class PullChangesTask implements Callable +{ + private final Set peers; + + public PullChangesTask() + { + this(Main.getInstance().getModel().listOtherPeers()); + } + + public PullChangesTask(Set peers) + { + this.peers = peers; + } + + @Override + public Boolean call() + { + // for each other peer: + + List> futures = new ArrayList<>(); + Set changesSinceIDs = Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()); + System.out.println("PullChangesTask: Requesting changes since: " + changesSinceIDs.stream().map(Long::toHexString).collect(Collectors.toSet())); + + for (Peer peer : peers) + { + System.out.println("PullChangesTask: Attempting to pull changes from " + peer); + // open a connection + PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer); + if (connection == null) + continue; + // send a ObjectChangeRequest + ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(changesSinceIDs); + CompletableFuture future = connection.makeRequest(objectChangeRequest) + .handle((changes, ex) -> + { + // integrate the returned changes with our change graph + if (ex == null) + { + Main.getInstance().getModel().applyChangeRecords(changes); + return PullResult.OK; + } + else + { + if (ex instanceof RequestReceivedErrorException re) + { + if (re.getErrorMessage().getError() == CommonMessages.Error.ERROR_END_OF_HISTORY) + { + return PullResult.END_OF_HISTORY; + } + } + ex.printStackTrace(System.err); + return PullResult.FAILED; + } + }); + futures.add(future); + } + + List results = Util.collectFutures(futures).join(); + + long peersGotChanges = results.stream().filter(pullResult -> pullResult == PullResult.OK).count(); + long peersAtEndOfHistory = results.stream().filter(pullResult -> pullResult == PullResult.END_OF_HISTORY).count(); + // if no peers could be contacted: + if (peersGotChanges == 0) + { + // if everyone reported end of history, we aren't synced (need to do a state pull) + // otherwise everyone else is offline so we might as well be synced (we'll cause a fork if we change anything but that's fine) + return peersAtEndOfHistory <= 0; + } + return true; + } + + protected enum PullResult + { + OK, + END_OF_HISTORY, + FAILED; + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java new file mode 100644 index 0000000..097dd7d --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/PullStateTask.java @@ -0,0 +1,106 @@ +package moe.nekojimi.friendcloud.tasks; + +import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.Util; +import moe.nekojimi.friendcloud.network.PeerConnection; +import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest; +import moe.nekojimi.friendcloud.network.requests.ObjectListRequest; +import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.objects.Peer; +import moe.nekojimi.friendcloud.protos.ObjectStatements; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +/** + * Task to clear our entire local state and re-download it from peers. + */ +public class PullStateTask implements Runnable +{ + + @Override + public void run() + { + System.out.println("PullStateTask: Pulling state from peers..."); + Set connections = new HashSet<>(); + + for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers()) + { + String[] split = knownPeerAddress.split(":"); + if (split.length != 2) + { + System.err.println("ERROR: " + knownPeerAddress + " isn't a valid address."); + continue; + } + InetSocketAddress address = new InetSocketAddress(split[0], Integer.parseInt(split[1])); + + try + { + URI uri = new URI("tcp", null, address.getHostString(), address.getPort(), null, null, null); + PeerConnection nodeConnection = Main.getInstance().getConnectionManager().getNodeConnection(uri); + + if (nodeConnection != null) + { + connections.add(nodeConnection); + } + } catch (URISyntaxException | IOException e) + { + throw new RuntimeException(e); + } + } + + for (Peer peer : Main.getInstance().getModel().listOtherPeers()) + { + PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer); + if (connection != null) + { + connections.add(connection); + } + } + + if (connections.isEmpty()) + { + // if we can't connect to anyone, don't replace our state since we can't get a new one + System.out.println("PullStateTask: Have no peers to connect to, giving up."); + return; + } + + System.out.println("PullStateTask: Have " + connections.size() + " peers to get state from."); + + Main.getInstance().getModel().clearEverything(); + + List> futures = new ArrayList<>(); + + for (PeerConnection connection : connections) + { + futures.add(connection.makeRequest(new ObjectListRequest(Set.of( + ObjectStatements.ObjectType.OBJECT_TYPE_FILE, + ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER, + ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects -> + { + + System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects."); + for (NetworkObject object : networkObjects) + { + Main.getInstance().getModel().addNetworkObject(object); + } + })); + + futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords -> + { + System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records."); + Main.getInstance().getModel().addChangeRecords(objectChangeRecords); + })); + } + + Util.collectFutures(futures).join(); + + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java deleted file mode 100644 index d6f01db..0000000 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java +++ /dev/null @@ -1,38 +0,0 @@ -package moe.nekojimi.friendcloud.tasks; - -import moe.nekojimi.friendcloud.Main; -import moe.nekojimi.friendcloud.ObjectChangeRecord; -import moe.nekojimi.friendcloud.network.PeerConnection; -import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest; -import moe.nekojimi.friendcloud.objects.Peer; -import moe.nekojimi.friendcloud.protos.ObjectStatements; - -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; - -public class SyncWithNetworkTask implements Runnable -{ - - @Override - public void run() - { - // for each other peer: - for (Peer peer : Main.getInstance().getModel().listOtherPeers()) - { - // open a connection - PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer); - // send a ObjectChangeRequest - ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet())); - CompletableFuture> future = connection.makeRequest(objectChangeRequest); - - // integrate the returned changes with our change graph - } - // if no peers could be contacted: - // return success (everyone's offline) - // if everyone reported end of history: - // delete our change graph - // perform a full sync - } -} diff --git a/src/main/protobuf/CommonMessages.proto b/src/main/protobuf/CommonMessages.proto index 879790d..dedfdbd 100644 --- a/src/main/protobuf/CommonMessages.proto +++ b/src/main/protobuf/CommonMessages.proto @@ -17,10 +17,10 @@ message FriendCloudMessage { message HelloMessage { uint32 protocol_version = 1; // this is the version of the FriendCloud protocol I speak - } -message LoginMessage { +message CheckInMessage { + repeated uint64 current_change_heads = 1; } enum Error { @@ -44,8 +44,6 @@ message ErrorMessage { string text = 2; } -message PingMessage { -} - -message PongMessage { +message MultiObjectConfirmationMessage { + repeated uint64 expected_return_id = 1; } \ No newline at end of file diff --git a/src/main/protobuf/ObjectStatements.proto b/src/main/protobuf/ObjectStatements.proto index 75cbf9d..e81f05e 100644 --- a/src/main/protobuf/ObjectStatements.proto +++ b/src/main/protobuf/ObjectStatements.proto @@ -10,7 +10,6 @@ enum ObjectType { OBJECT_TYPE_PEER = 2; OBJECT_TYPE_FILE = 3; OBJECT_TYPE_FOLDER = 4; - OBJECT_TYPE_PEER_FILE_STATE = 5; } message ObjectState { @@ -38,6 +37,8 @@ message ObjectChangeMessage { uint64 change_id = 1; repeated uint64 change_heads = 2; repeated ObjectChange changes = 3; + uint64 creator_id = 4; + uint64 timestamp_ms = 5; } message ObjectChangeListMessage { @@ -49,7 +50,7 @@ message ObjectChangeRequest { } message ObjectList { - uint64 change_heads = 1; + uint64 change_head = 1; repeated ObjectState states = 2; } diff --git a/src/main/protobuf/TestMessage.proto b/src/main/protobuf/TestMessage.proto deleted file mode 100644 index a253a7a..0000000 --- a/src/main/protobuf/TestMessage.proto +++ /dev/null @@ -1,9 +0,0 @@ -syntax = "proto3"; - -option java_package = "moe.nekojimi.friendcloud.protos"; - -message SearchRequest { - string query = 1; - int32 page_number = 2; - int32 results_per_page = 3; -} \ No newline at end of file