Change tracking now mostly works probably

This commit is contained in:
Nekojimi 2025-09-30 14:10:06 +01:00
parent 855bca957f
commit cae44b6f85
40 changed files with 2120 additions and 964 deletions

View file

@ -74,6 +74,11 @@
<artifactId>jlibnotify</artifactId> <artifactId>jlibnotify</artifactId>
<version>1.1.0</version> <version>1.1.0</version>
</dependency> </dependency>
<dependency>
<groupId>engineering.swat</groupId>
<artifactId>java-watch</artifactId>
<version>0.9.5</version>
</dependency>
<!-- <dependency>--> <!-- <dependency>-->
<!-- <groupId>com.github.hypfvieh</groupId>--> <!-- <groupId>com.github.hypfvieh</groupId>-->
<!-- <artifactId>dbus-java-core</artifactId>--> <!-- <artifactId>dbus-java-core</artifactId>-->

View file

@ -0,0 +1,411 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.*;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.CachingDataStore;
import moe.nekojimi.friendcloud.storage.DataStore;
import moe.nekojimi.friendcloud.storage.LocalData;
import moe.nekojimi.friendcloud.storage.Storable;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
public class Controller
{
private final CachingDataStore dataStore;
private LocalData localData;
private Set<ObjectChangeRecord> changeHeads = null;
public Controller(DataStore dataStore)
{
this.dataStore = new CachingDataStore(dataStore);
}
public synchronized void init()
{
List<LocalData> localDataList = dataStore.getDAOForClass(LocalData.class).getAll();
if (localDataList.isEmpty())
{
localData = dataStore.getDAOForClass(LocalData.class).create(0);
}
else if (localDataList.size() == 1)
{
localData = localDataList.getFirst();
}
else
{
throw new IllegalStateException("We have more than one LocalData somehow!!");
}
if (localData.getSystemID() == 0)
{
Random ran = new Random();
localData.setSystemID(ran.nextInt() & 0x00FFFFFF);
objectChanged(localData);
}
}
public LocalData getLocalData()
{
return localData;
}
public Peer getLocalPeer()
{
return localData.getLocalPeer();
}
public Peer createLocalPeer(ObjectID id)
{
if (localData.getLocalPeer() == null)
{
localData.setLocalPeer(dataStore.getDAOForClass(Peer.class).create(id.toLong()));
objectChanged(localData);
}
return localData.getLocalPeer();
}
// private Map<Long, Node> nodes = new HashMap<>();
public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type)
{
Random ran = new Random();
int randomNumber = ran.nextInt();
ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber);
System.out.println("Assigned new object ID: " + objectID);
return objectID;
}
public static Class<? extends NetworkObject> getNetworkObjectClassByType(ObjectStatements.ObjectType type)
{
return switch (type)
{
case OBJECT_TYPE_FILE -> NetworkFile.class;
case OBJECT_TYPE_FOLDER -> NetworkFolder.class;
case OBJECT_TYPE_PEER -> Peer.class;
case OBJECT_TYPE_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("???");
default -> throw new UnsupportedOperationException("NYI: " + type);
};
}
public synchronized <T extends NetworkObject> T createObjectByID(ObjectID id)
{
if (id.toLong() == 0)
throw new IllegalArgumentException("Cannot create an object with ID=0!");
ObjectStatements.ObjectType type = id.getType();
System.out.println("Creating new object with type: " + type.name());
if (type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED)
throw new IllegalArgumentException();
T ret = (T) dataStore.getDAOForClass(getNetworkObjectClassByType(type)).create(id.toLong());
return ret;
}
public synchronized <T extends NetworkObject> T createObjectByType(ObjectStatements.ObjectType type)
{
return createObjectByID(getNextObjectID(type));
}
public synchronized <T extends NetworkObject> T getObject(ObjectID id)
{
if (id.toLong() == 0)
return null;
Class<T> clazz = (Class<T>) getNetworkObjectClassByType(id.getType());
return dataStore.getDAOForClass(clazz).get(id.toLong());
}
// public synchronized <T extends NetworkObject> T getOrCreateObject(ObjectID id)
// {
// if (id.toLong() == 0)
// return null;
// Class<T> clazz = (Class<T>) getNetworkObjectClassByType(id.getType());
// return dataStore.getDAOForClass(clazz).getOrCreate(id.toLong());
// }
public synchronized List<NetworkObject> listObjects(Set<ObjectStatements.ObjectType> types)
{
List<NetworkObject> ret = new ArrayList<>();
Set<Class<? extends NetworkObject>> classes = types.stream().map(Controller::getNetworkObjectClassByType).collect(Collectors.toSet());
for (Class<? extends NetworkObject> clazz: classes)
{
List<? extends NetworkObject> list = dataStore.getDAOForClass(clazz).getAll();
ret.addAll(list);
}
return ret;
}
public synchronized List<NetworkFSNode> listFSNodes(String path)
{
//TODO: dumbest algorithm in the world
NetworkFolder folder = (NetworkFolder) getFSNode(path);
List<NetworkFSNode> ret = new ArrayList<>();
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
{
NetworkFSNode fsNode = (NetworkFSNode) object;
if (Objects.equals(fsNode.getParent(), folder))
ret.add(fsNode);
}
return ret;
}
public synchronized void addChangeRecord(ObjectChangeRecord record)
{
DataStore.DAO<ObjectChangeRecord> dao = dataStore.getDAOForClass(ObjectChangeRecord.class);
dao.update(record);
// update the change heads; if any of this change's heads are included in ours, then this change replaces them
if (changeHeads != null)
{
boolean recordIsNewHead = changeHeads.isEmpty();
if (!recordIsNewHead)
{
for (long changeHeadID : record.getChangeHeads())
{
ObjectChangeRecord headRecord = dao.get(changeHeadID);
recordIsNewHead = changeHeads.remove(headRecord);
}
}
if (recordIsNewHead)
{
changeHeads.add(record);
System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
}
}
}
public void setCurrentChangeRecord(ObjectChangeRecord record)
{
addChangeRecord(record);
long oldChangeID = localData.getCurrentChangeRecord() == null ? 0L : localData.getCurrentChangeRecord().getChangeID();
localData.setCurrentChangeRecord(record);
objectChanged(localData);
System.out.println("Controller: Change record set; old= " + Long.toHexString(oldChangeID) + " current= " + Long.toHexString(localData.getCurrentChangeRecord().getChangeID()));
}
public ObjectChangeRecord getChangeRecord(long id)
{
if (id == 0)
return null;
return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id);
}
public void addChangeRecords(Set<ObjectChangeRecord> objectChangeRecords)
{
List<ObjectChangeRecord> sorted = ObjectChangeRecord.partiallySort(objectChangeRecords);
for (ObjectChangeRecord record : sorted)
addChangeRecord(record);
}
public void applyChangeRecord(ObjectChangeRecord record)
{
System.out.println("Controller: Applying change record " + record);
long changeID = localData.getCurrentChangeRecord().getChangeID();
if (!record.getChangeHeads().contains(changeID))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID));
addChangeRecord(record);
record.applyToLocalState();
setCurrentChangeRecord(record);
objectChanged(localData);
// if (record == null)
// throw new IllegalArgumentException("Cannot apply unknown change!");
}
public void applyChangeRecords(Set<ObjectChangeRecord> changes)
{
System.out.println("Controller: applying " + changes.size() + " change records.");
addChangeRecords(changes);
// List<ObjectChangeRecord> record = ObjectChangeRecord.partiallySort(changes);
Set<ObjectChangeRecord> pendingChanges = new HashSet<>(changes);
while (!pendingChanges.isEmpty())
{
// find all changes whose change heads = our current change heads
Set<ObjectChangeRecord> applicableChanges = pendingChanges.stream()
.filter(objectChangeRecord -> objectChangeRecord.getChangeHeads().equals(getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet())))
.collect(Collectors.toSet());
// if there are none, we've reached the present (or a present), break
if (applicableChanges.isEmpty())
break;
// if there's just one, follow that one
// TODO: if there's more than one, that's a fork, we should select a current branch by some logic
ObjectChangeRecord changeToApply = applicableChanges.iterator().next();
applyChangeRecord(changeToApply);
// the change we took (and any untaken forks) are no longer pending
pendingChanges.removeAll(applicableChanges);
}
}
public Set<ObjectChangeRecord> getChangeHeads()
{
// stupid algorithm - start with all of the changes, then remove the ones that are referenced by something
// TODO: better algorithm
if (changeHeads == null)
{
changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
Set<Long> referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet());
changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID()));
System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
}
return changeHeads;
}
public Set<Peer> listOtherPeers()
{
Set<Peer> ret = new HashSet<>();
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER)))
{
Peer peer = (Peer) object;
if (localData.getLocalPeer() == null || !peer.equals(localData.getLocalPeer()))
ret.add(peer);
}
return ret;
}
public <T extends Storable> void objectChanged(T storable)
{
Class<T> clazz = (Class<T>) storable.getClass();
dataStore.getDAOForClass(clazz).update(storable);
}
public DataStore getDataStore()
{
return dataStore;
}
public void clearEverything()
{
dataStore.clear();
}
public void addNetworkObject(NetworkObject object)
{
objectChanged(object);
}
public Set<ObjectChangeRecord> findChangesSince(List<Long> changeIDs)
{
// check that the specified change IDs are actually present in our history
DataStore.DAO<ObjectChangeRecord> ocrDao = dataStore.getDAOForClass(ObjectChangeRecord.class);
if (!changeIDs.stream().allMatch(ocrDao::exists))
return null;
// start with the current change heads
Set<ObjectChangeRecord> ret = new HashSet<>();
Deque<ObjectChangeRecord> openSet = new LinkedList<>(getChangeHeads());
while (!openSet.isEmpty())
{
ObjectChangeRecord record = openSet.poll();
if (changeIDs.contains(record.getChangeID()))
continue;
ret.add(record);
for (long changeHeadID: record.getChangeHeads())
{
openSet.add(ocrDao.get(changeHeadID));
}
}
return ret;
}
/**
* Changes the network name and/or path of a given network FS node.
* Performs both renaming and moving operations. If the destination folder(s) don't exist, they will be created as per makeFolder.
* @param fsNode the file or folder being renamed or moved.
* @param newpath the new path of the node, starting with and separated by /.
*/
public synchronized boolean renameFSNode(NetworkFSNode fsNode, String newpath) throws IOException
{
if (fsNode.getNetworkPath().equals(newpath))
return true;
int lastSlash = newpath.lastIndexOf("/");
String name = newpath.substring(lastSlash+1);
String path = newpath.substring(0, lastSlash);
if (path.isEmpty())
path = "/";
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
{
transaction.addObjectBeforeChange(fsNode);
if (!name.equals(fsNode.getName()))
fsNode.setName(name);
NetworkFolder parent = fsNode.getParent();
if (!path.equals(parent != null ? parent.getNetworkPath() : "/"))
{
NetworkFSNode fsn = getFSNode(path);
if (fsn != null && !(fsn instanceof NetworkFolder))
throw new IllegalArgumentException("Given parent path " + fsn.getNetworkPath() + " is a file!");
NetworkFolder newParent = (NetworkFolder) fsn;
if (newParent == null && !path.equals("/"))
{
newParent = makeFSFolder(path);
}
fsNode.setParent(newParent);
}
} catch (IOException e)
{
e.printStackTrace(System.err);
return false;
}
return true;
}
/**
* Creates (or ensures the existence of) a NetworkFolder at the given path, along with all non-existent parent folders.
* @param path the path of the new NetworkFolder; must start with "/"
* @return the new NetworkFolder; or null, if path is "/"
*/
public synchronized NetworkFolder makeFSFolder(String path)
{
if (path.equals("/") || path.isEmpty())
return null;
NetworkFolder ret = null;
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
{
NetworkFSNode node = getFSNode(path);
if (node != null && !(node instanceof NetworkFolder))
throw new IllegalArgumentException("Given parent path " + node.getNetworkPath() + " is a file!");
ret = (NetworkFolder) node;
if (ret != null)
return ret; // already exists
int lastSlash = path.lastIndexOf("/");
String name = path.substring(lastSlash+1);
String parentPath = path.substring(0, lastSlash);
NetworkFolder parent = makeFSFolder(parentPath);
ret = createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER);
ret.setParent(parent);
ret.setName(name);
transaction.addNewlyCreatedObject(ret);
return ret;
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
public synchronized NetworkFSNode getFSNode(String path)
{
// TODO: bad algorithm make better
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
{
NetworkFSNode fsNode = (NetworkFSNode) object;
String networkPath = fsNode.getNetworkPath();
if (networkPath.equals(path))
return fsNode;
}
return null;
}
}

View file

@ -12,12 +12,15 @@ public class FilePieceAccess implements Closeable
private final NetworkFile networkFile; private final NetworkFile networkFile;
private final File file; private final File file;
private final RandomAccessFile randomAccessFile; private final RandomAccessFile randomAccessFile;
private final OpenMode openMode;
public FilePieceAccess(NetworkFile networkFile) throws IOException public FilePieceAccess(NetworkFile networkFile, OpenMode openMode) throws IOException
{ {
this.networkFile = networkFile; this.networkFile = networkFile;
this.file = networkFile.getOrCreateLocalFile(); this.file = networkFile.getOrCreateLocalFile();
this.randomAccessFile = new RandomAccessFile(file,"rw"); this.randomAccessFile = new RandomAccessFile(file,openMode == OpenMode.READ_WRITE ? "rw" : "r");
this.openMode = openMode;
if (openMode == OpenMode.READ_WRITE)
randomAccessFile.setLength(file.length()); randomAccessFile.setLength(file.length());
} }
@ -56,7 +59,9 @@ public class FilePieceAccess implements Closeable
public void writePiece(int index, byte[] buffer) throws IOException public void writePiece(int index, byte[] buffer) throws IOException
{ {
if (buffer.length != getPieceSize(index)) if (openMode == OpenMode.READ_ONLY)
throw new IllegalStateException("File was opened read-only!");
else if (buffer.length != getPieceSize(index))
throw new IllegalArgumentException("Received a file piece that's the wrong size!! Length = " + buffer.length + " != Piece Size = " + getPieceSize(index)); throw new IllegalArgumentException("Received a file piece that's the wrong size!! Length = " + buffer.length + " != Piece Size = " + getPieceSize(index));
else if (index >= networkFile.getPieceCount()) else if (index >= networkFile.getPieceCount())
throw new IllegalArgumentException("Received a file piece with an index past the end of the file!!"); throw new IllegalArgumentException("Received a file piece with an index past the end of the file!!");
@ -72,4 +77,10 @@ public class FilePieceAccess implements Closeable
{ {
randomAccessFile.close(); randomAccessFile.close();
} }
public enum OpenMode
{
READ_ONLY,
READ_WRITE;
}
} }

View file

@ -2,37 +2,17 @@ package moe.nekojimi.friendcloud;
import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter; import com.beust.jcommander.Parameter;
import com.kstruct.gethostname4j.Hostname;
import com.offbynull.portmapper.PortMapperFactory;
import com.offbynull.portmapper.gateway.Bus;
import com.offbynull.portmapper.gateway.Gateway;
import com.offbynull.portmapper.gateways.network.NetworkGateway;
import com.offbynull.portmapper.gateways.process.ProcessGateway;
import com.offbynull.portmapper.mapper.MappedPort;
import com.offbynull.portmapper.mapper.PortMapper;
import com.offbynull.portmapper.mapper.PortType;
import es.blackleg.jlibnotify.JLibnotify;
import es.blackleg.jlibnotify.JLibnotifyNotification;
import es.blackleg.jlibnotify.core.DefaultJLibnotify;
import es.blackleg.jlibnotify.core.DefaultJLibnotifyLoader;
import es.blackleg.jlibnotify.exception.JLibnotifyInitException;
import es.blackleg.jlibnotify.exception.JLibnotifyLoadException;
import jnr.ffi.Platform; import jnr.ffi.Platform;
import moe.nekojimi.friendcloud.filesystem.FUSEAccess; import moe.nekojimi.friendcloud.filesystem.FUSEAccess;
import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.network.requests.ObjectListRequest; import moe.nekojimi.friendcloud.network.TCPConnectionBackend;
import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.objects.PeerFileState;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.DataStore; import moe.nekojimi.friendcloud.storage.DataStore;
import moe.nekojimi.friendcloud.storage.Model;
import moe.nekojimi.friendcloud.storage.StupidJSONFileStore; import moe.nekojimi.friendcloud.storage.StupidJSONFileStore;
import moe.nekojimi.friendcloud.tasks.JoinNetworkTask; import moe.nekojimi.friendcloud.tasks.JoinNetworkTask;
import moe.nekojimi.friendcloud.tasks.PropagateMessageTask;
import org.slf4j.simple.SimpleLogger; import org.slf4j.simple.SimpleLogger;
import java.awt.*;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.*; import java.net.*;
@ -43,41 +23,31 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.stream.Collectors;
public class Main public class Main
{ {
private static Main instance; private static Main instance;
@Parameter(names="-share")
private List<String> sharedFilePaths = new ArrayList<>();
@Parameter(names="-known-peer")
private List<String> knownPeers = new ArrayList<>();
@Parameter(names="-tcp-port")
private int tcpPort = 7777;
@Parameter(names="-no-upnp")
private boolean noUpnp = false;
@Parameter(names="-create-network")
private boolean createNetwork = false;
@Parameter(names = "-storage")
private String storageLocation = ".";
// @Parameter(names="-file") // @Parameter(names="-file")
private Args args;
private ConnectionManager connectionManager; private ConnectionManager connectionManager;
private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(16); private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(16);
private final Set<Future<?>> scheduledFutures = new HashSet<>();
private final FUSEAccess fuseAccess = new FUSEAccess(); private final FUSEAccess fuseAccess = new FUSEAccess();
private Model model; private Controller controller;
private final NotificationManager notificationManager = new NotificationManager(); private final NotificationManager notificationManager = new NotificationManager();
private final SharedFileManager sharedFileManager = new SharedFileManager();
public static void main(String[] args) public static void main(String[] argv)
{ {
instance = new Main(); instance = new Main();
JCommander.newBuilder().addObject(instance).build().parse(args); Args args = new Args();
JCommander.newBuilder().addObject(args).build().parse(argv);
instance.args = args;
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info"); System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info");
@ -88,25 +58,19 @@ public class Main
{ {
System.err.println("main() received exception, dying horribly!!"); System.err.println("main() received exception, dying horribly!!");
e.printStackTrace(System.err); e.printStackTrace(System.err);
try
{
instance.shutdown(); instance.shutdown();
} catch (IOException f)
{
throw new RuntimeException(f);
}
System.exit(1); System.exit(1);
} }
// TestMessage.SearchRequest request = TestMessage.SearchRequest.newBuilder().setQuery("bees!").setPageNumber(316).setResultsPerPage(42069).build(); // TestMessage.SearchRequest request = TestMessage.SearchRequest.newBuilder().setQuery("bees!").setPageNumber(316).setResultsPerPage(42069).build();
} }
private void run() throws IOException, InterruptedException, JLibnotifyLoadException, JLibnotifyInitException private void run() throws IOException
{ {
DataStore dataStore = new StupidJSONFileStore(new File(storageLocation)); DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation));
model = new Model(dataStore); controller = new Controller(dataStore);
model.init(); controller.init();
connectionManager = new ConnectionManager(tcpPort); connectionManager = new ConnectionManager();
Path mountPoint; Path mountPoint;
if (Platform.getNativePlatform().getOS() == Platform.OS.WINDOWS) if (Platform.getNativePlatform().getOS() == Platform.OS.WINDOWS)
@ -115,23 +79,14 @@ public class Main
} }
else else
{ {
mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + tcpPort); mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + args.tcpPort);
boolean created = mountPoint.toFile().mkdirs(); mountPoint.toFile().mkdirs();
System.out.println("Created FUSE mount point " + mountPoint); System.out.println("Created FUSE mount point " + mountPoint);
} }
fuseAccess.mount(mountPoint); fuseAccess.mount(mountPoint);
System.out.println("Mounted virtual filesystem at " + mountPoint); System.out.println("Mounted virtual filesystem at " + mountPoint);
Runtime.getRuntime().addShutdownHook(new Thread(() -> Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
{
try
{
shutdown();
} catch (IOException e)
{
throw new RuntimeException(e);
}
}));
// if (Desktop.isDesktopSupported()) // if (Desktop.isDesktopSupported())
// { // {
@ -139,15 +94,9 @@ public class Main
// desktop.browse(mountPoint.toFile().toURI()); // desktop.browse(mountPoint.toFile().toURI());
// } // }
connectionManager.addNewConnectionConsumer(this::requestCompleteState); // connectionManager.addNewConnectionConsumer(this::requestCompleteState);
connectionManager.start(); connectionManager.addConnectionBackend(new TCPConnectionBackend(args.tcpPort, connectionManager));
String hostname = Hostname.getHostname();
model.getSelfPeer().setSystemName(hostname);
model.getSelfPeer().setUserName(System.getProperty("user.name") + "-" + tcpPort);
addHostAddress(InetAddress.getLocalHost());
model.objectChanged(model.getSelfPeer());
/* /*
Startup procedure: Startup procedure:
@ -159,138 +108,56 @@ public class Main
- Publish local file changes - Publish local file changes
*/ */
if (!noUpnp) JoinNetworkTask joinNetworkTask = new JoinNetworkTask();
setupIGP(); CompletableFuture.runAsync(joinNetworkTask,executor)
.thenRun(this::shareInitialFiles)
.thenRun(this::scheduleCheckins)
.handle((unused, throwable) ->
{
if (throwable != null)
{
System.err.println("Error in initial task!");
throwable.printStackTrace(System.err);
shutdown();
}
return null;
});
}
private void scheduleCheckins()
{
executor.scheduleWithFixedDelay(() -> {
System.out.println("Checking in with friends...");
CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder()
.addAllCurrentChangeHeads(controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()))
.build();
PropagateMessageTask propagateMessageTask = new PropagateMessageTask(checkInMessage);
executor.submit(propagateMessageTask);
}, 0,5, TimeUnit.MINUTES);
}
private void shareInitialFiles()
{
System.out.println("Sharing files given on command line...");
Set<File> sharedFiles = new HashSet<>(); Set<File> sharedFiles = new HashSet<>();
for (String sharedFilePath: sharedFilePaths) for (String sharedFilePath: args.sharedFilePaths)
{ {
sharedFiles.add(new File(sharedFilePath)); sharedFiles.add(new File(sharedFilePath));
} }
List<NetworkObject> knownFiles = model.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE)); sharedFileManager.addSharedFiles(sharedFiles);
for (NetworkObject knownFile: knownFiles)
{
NetworkFile f = (NetworkFile) knownFile;
boolean removed = sharedFiles.remove(f.getLocalFile());
if (removed)
System.out.println("Identified known local file " + f.getObjectID() + " = " + f.getLocalFile());
} }
for (File sharedFile: sharedFiles) private void shutdown()
{
if (sharedFile.exists())
{
System.out.println("Adding shared network file: " + sharedFile.getAbsolutePath());
NetworkFile networkFile = (NetworkFile) model.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FILE);
networkFile.updateFromLocalFile(sharedFile);
model.objectChanged(networkFile);
// PeerFileState peerFileState = (PeerFileState) model.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE);
// peerFileState.setNode(model.getSelfPeer());
// peerFileState.setFile(networkFile);
// peerFileState.setProgress(100);
// model.objectChanged(peerFileState);
}
}
// JoinNetworkTask joinNetworkTask = new JoinNetworkTask();
// executor.submit(joinNetworkTask);
for (String knownPeerAddress : knownPeers)
{
String[] split = knownPeerAddress.split(":");
if (split.length != 2)
{
System.err.println("ERROR: " + knownPeerAddress + " isn't a valid address.");
continue;
}
InetSocketAddress address = new InetSocketAddress(split[0],Integer.parseInt(split[1]));
try
{
URI uri = new URI("tcp", null, address.getHostString(), address.getPort(), null, null, null);
PeerConnection nodeConnection = connectionManager.getNodeConnection(uri);
requestCompleteState(nodeConnection);
} catch (ConnectException ex)
{
System.out.println("Couldn't connect to host " + address);
}
catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
}
private void requestCompleteState(PeerConnection nodeConnection)
{
CompletableFuture<List<NetworkObject>> objectListFuture = nodeConnection.makeRequest(new ObjectListRequest(Set.of(
ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER)));
}
private void addHostAddress(InetAddress address)
{
String host = address.getCanonicalHostName();
Peer selfNode = model.getSelfPeer();
try
{
URI uri = new URI("tcp", null, host, tcpPort, null, null, null);
System.out.println("Added local address " + uri);
selfNode.addAddress(uri);
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
private void setupIGP() throws InterruptedException
{ {
try try
{
// Start gateways
Gateway network = NetworkGateway.create();
Gateway process = ProcessGateway.create();
Bus networkBus = network.getBus();
Bus processBus = process.getBus();
// Discover port forwarding devices and take the first one found
System.out.println("Discovering port mappers...");
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);;
PortMapper mapper = mappers.getFirst();
System.out.println("Got mapper " + mapper + ", mapping port...");
MappedPort mappedPort = mapper.mapPort(PortType.TCP, tcpPort, tcpPort, 60);
System.out.println("Port mapping added: " + mappedPort);
addHostAddress(mappedPort.getExternalAddress());
}
catch (IllegalStateException ex)
{
System.err.println("Failed to map port! error=" + ex.getMessage());
ex.printStackTrace(System.err);
}
}
private void shutdown() throws IOException
{ {
fuseAccess.umount(); fuseAccess.umount();
connectionManager.shutdown(); connectionManager.shutdown();
executor.shutdown(); executor.shutdown();
System.out.println("Waiting 10 seconds to complete tasks..."); System.out.println("Waiting 10 seconds to complete tasks...");
boolean terminated = false; boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
try
{
terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (!terminated) if (!terminated)
{ {
System.out.println("Timed out, ending tasks now. Goodbye!"); System.out.println("Timed out, ending tasks now. Goodbye!");
@ -300,6 +167,10 @@ public class Main
{ {
System.out.println("Finished everything. Goodbye!"); System.out.println("Finished everything. Goodbye!");
} }
} catch (Exception e)
{
throw new RuntimeException(e);
}
} }
public static Main getInstance() public static Main getInstance()
@ -317,13 +188,80 @@ public class Main
return connectionManager; return connectionManager;
} }
public Model getModel() public Controller getModel()
{ {
return model; return controller;
} }
public NotificationManager getNotificationManager() public NotificationManager getNotificationManager()
{ {
return notificationManager; return notificationManager;
} }
public Controller getController()
{
return controller;
}
public Args getArgs()
{
return args;
}
public SharedFileManager getSharedFileManager()
{
return sharedFileManager;
}
@SuppressWarnings("FieldCanBeLocal")
public static class Args
{
@Parameter(names="-share")
private List<String> sharedFilePaths = new ArrayList<>();
@Parameter(names="-known-peer")
private List<String> knownPeers = new ArrayList<>();
@Parameter(names="-tcp-port")
private int tcpPort = 7777;
@Parameter(names="-no-upnp")
private boolean noUpnp = false;
@Parameter(names="-create-network")
private boolean createNetwork = false;
@Parameter(names = "-storage")
private String storageLocation = ".";
public List<String> getSharedFilePaths()
{
return sharedFilePaths;
}
public List<String> getKnownPeers()
{
return knownPeers;
}
public int getTcpPort()
{
return tcpPort;
}
public boolean isNoUpnp()
{
return noUpnp;
}
public boolean isCreateNetwork()
{
return createNetwork;
}
public String getStorageLocation()
{
return storageLocation;
}
}
} }

View file

@ -1,5 +1,6 @@
package moe.nekojimi.friendcloud; package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.Storable; import moe.nekojimi.friendcloud.storage.Storable;
@ -7,7 +8,9 @@ import moe.nekojimi.friendcloud.storage.Storable;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.security.MessageDigest; import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
public class ObjectChangeRecord implements Storable public class ObjectChangeRecord implements Storable
{ {
@ -16,11 +19,14 @@ public class ObjectChangeRecord implements Storable
private ObjectID creatorPeer; private ObjectID creatorPeer;
private Set<Long> changeHeads = new HashSet<>(); private Set<Long> changeHeads = new HashSet<>();
private Set<Change> changes = new HashSet<>(); private Set<Change> changes = new HashSet<>();
private Instant creationTime;
public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage) public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage)
{ {
ObjectChangeRecord record = new ObjectChangeRecord(); // TODO: decode creator ObjectChangeRecord record = new ObjectChangeRecord();
record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList()); record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList());
record.creatorPeer = new ObjectID(objectChangeMessage.getCreatorId());
record.creationTime = Instant.ofEpochMilli(objectChangeMessage.getTimestampMs());
for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList()) for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList())
{ {
record.changes.add(Change.createFromObjectChange(objectChange)); record.changes.add(Change.createFromObjectChange(objectChange));
@ -29,16 +35,18 @@ public class ObjectChangeRecord implements Storable
long specifiedID = objectChangeMessage.getChangeId(); long specifiedID = objectChangeMessage.getChangeId();
if (calculatedID != specifiedID) if (calculatedID != specifiedID)
{ {
System.err.println("WARNING: didn't decode change ID correctly!"); throw new RuntimeException("Failed to verify change ID! specified=" + Long.toHexString(specifiedID) + " != calculated=" + Long.toHexString(calculatedID));
} }
return record; return record;
} }
public static ObjectChangeRecord createFromChanges(ObjectID creator, Set<Change> changes) public static ObjectChangeRecord createFromChanges(ObjectID creator, Set<Long> changeHeads, Set<Change> changes)
{ {
ObjectChangeRecord record = new ObjectChangeRecord(); ObjectChangeRecord record = new ObjectChangeRecord();
record.creatorPeer = creator; record.creatorPeer = creator;
record.creationTime = Instant.now();
record.changes.addAll(changes); record.changes.addAll(changes);
record.changeHeads = changeHeads;
return record; return record;
} }
@ -59,6 +67,8 @@ public class ObjectChangeRecord implements Storable
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder(); ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
builder.setChangeId(getChangeID()); builder.setChangeId(getChangeID());
builder.addAllChangeHeads(changeHeads); builder.addAllChangeHeads(changeHeads);
builder.setCreatorId(creatorPeer.toLong());
builder.setTimestampMs(creationTime.toEpochMilli());
for (Change change : changes) for (Change change : changes)
{ {
builder.addChanges(change.buildObjectChange()); builder.addChanges(change.buildObjectChange());
@ -71,26 +81,43 @@ public class ObjectChangeRecord implements Storable
{ {
return Map.of("changeHeads", changeHeads, return Map.of("changeHeads", changeHeads,
"changes", changes, "changes", changes,
"creator", creatorPeer.toLong()); "creator", creatorPeer.toLong(),
"creationTime", creationTime.toEpochMilli()
);
} }
@SuppressWarnings("unchecked")
@Override @Override
public void updateFromStateMap(Map<String, Object> map) public void updateFromStateMap(Map<String, Object> map)
{ {
changeHeads = new HashSet<>((Collection) map.get("changeHeads")); changeHeads = new HashSet<>((Collection) map.get("changeHeads"));
changes = new HashSet<>((Collection) map.get("changes")); changes = new HashSet<>((Collection) map.get("changes"));
creatorPeer = new ObjectID((Long) map.get("creator")); creatorPeer = new ObjectID((Long) map.get("creator"));
creationTime = Instant.ofEpochMilli((Long) map.get("creationTime"));
}
public void applyToLocalState()
{
for (Change change: changes)
{
NetworkObject object = Main.getInstance().getModel().getObject(change.objectID);
if (object == null)
object = Main.getInstance().getController().createObjectByID(change.objectID);
object.updateFromChange(change);
Main.getInstance().getModel().objectChanged(object);
}
} }
public String toString() public String toString()
{ {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (long changeHeadId: changeHeads) sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";");
for (long changeHeadId: changeHeads.stream().sorted().toList())
{ {
sb.append(changeHeadId).append(","); sb.append(changeHeadId).append(",");
} }
sb.append(";"); sb.append(";");
for (Change change: changes) for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList())
{ {
sb.append(change.toString()).append(";"); sb.append(change.toString()).append(";");
} }
@ -107,7 +134,9 @@ public class ObjectChangeRecord implements Storable
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
byte[] bytes = digest.digest(toString().getBytes(StandardCharsets.UTF_8)); String stringVal = toString();
byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8));
// System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal);
return Util.xorBytesToLong(bytes); return Util.xorBytesToLong(bytes);
} }
@ -127,8 +156,18 @@ public class ObjectChangeRecord implements Storable
return changeHeads; return changeHeads;
} }
public record Change(ObjectID objectID, Map<String, String> beforeValues, Map<String, String> afterValues) public static final class Change
{ {
private final ObjectID objectID;
private final Map<String, String> beforeValues;
private final Map<String, String> afterValues;
public Change(ObjectID objectID, Map<String, String> beforeValues, Map<String, String> afterValues)
{
this.objectID = objectID;
this.beforeValues = beforeValues;
this.afterValues = afterValues;
}
public static Change createFromObjectChange(ObjectStatements.ObjectChange change) public static Change createFromObjectChange(ObjectStatements.ObjectChange change)
{ {
@ -141,8 +180,8 @@ public class ObjectChangeRecord implements Storable
Map<String, String> afterValues = new HashMap<>(); Map<String, String> afterValues = new HashMap<>();
for (String key : after.getValuesMap().keySet()) for (String key : after.getValuesMap().keySet())
{ {
String beforeValue = before.getValuesOrDefault(key, null); String beforeValue = before.getValuesOrDefault(key, "");
String afterValue = after.getValuesOrDefault(key, null); String afterValue = after.getValuesOrDefault(key, "");
if (!afterValue.equals(beforeValue)) if (!afterValue.equals(beforeValue))
{ {
beforeValues.put(key, beforeValue); beforeValues.put(key, beforeValue);
@ -151,7 +190,7 @@ public class ObjectChangeRecord implements Storable
} }
if (!afterValues.isEmpty()) if (!afterValues.isEmpty())
{ {
return new Change(new ObjectID(before.getObjectId()), beforeValues, afterValues); return new Change(new ObjectID(after.getObjectId()), beforeValues, afterValues);
} }
return null; return null;
} }
@ -159,15 +198,18 @@ public class ObjectChangeRecord implements Storable
public String toString() public String toString()
{ {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(objectID.toLong()).append(";"); // The object ID, then ; sb.append(objectID).append(";"); // The object ID, then ;
// now all key-value pairs in alphabetical order // now all key-value pairs in alphabetical order
List<String> keys = new ArrayList<>(beforeValues.keySet()); Set<String> keySet = new HashSet<>(beforeValues.keySet());
Collections.sort(keys); keySet.addAll(afterValues.keySet());
for (String key : keys) for (String key : keySet.stream().sorted().toList())
{ {
sb.append(key).append(":").append(afterValues.get(key)); sb.append(key).append(":")
.append(beforeValues.getOrDefault(key,""))
.append("->")
.append(afterValues.getOrDefault(key,""))
.append(",");
} }
sb.append(";");
return sb.toString(); return sb.toString();
} }
@ -175,10 +217,78 @@ public class ObjectChangeRecord implements Storable
public ObjectStatements.ObjectChange.Builder buildObjectChange() public ObjectStatements.ObjectChange.Builder buildObjectChange()
{ {
ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder(); ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();
builder.putAllBefore(beforeValues); builder.putAllBefore(Util.mapWithoutNullValues(beforeValues));
builder.putAllAfter(afterValues); builder.putAllAfter(Util.mapWithoutNullValues(afterValues));
builder.setObjectId(objectID.toLong()); builder.setObjectId(objectID.toLong());
return builder; return builder;
} }
public ObjectID objectID()
{
return objectID;
}
public Map<String, String> beforeValues()
{
return beforeValues;
}
public Map<String, String> afterValues()
{
return afterValues;
}
@Override
public boolean equals(Object obj)
{
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (Change) obj;
return Objects.equals(this.objectID, that.objectID) &&
Objects.equals(this.beforeValues, that.beforeValues) &&
Objects.equals(this.afterValues, that.afterValues);
}
@Override
public int hashCode()
{
return Objects.hash(objectID, beforeValues, afterValues);
}
}
public static List<ObjectChangeRecord> partiallySort(Set<ObjectChangeRecord> changes)
{
LinkedList<ObjectChangeRecord> ret = new LinkedList<>();
Map<Long,ObjectChangeRecord> idMap = new HashMap<>();
for (ObjectChangeRecord record : changes)
idMap.put(record.getChangeID(), record);
Set<Long> pointedIds = changes.stream()
.flatMap((ObjectChangeRecord objectChangeRecord1) -> objectChangeRecord1.getChangeHeads().stream())
.collect(Collectors.toSet());
Set<ObjectChangeRecord> group = changes.stream()
.filter(objectChangeRecord -> !pointedIds.contains(objectChangeRecord.getChangeID()))
.collect(Collectors.toSet());
while (!group.isEmpty())
{
// add all members of the group to the start of the list
for (ObjectChangeRecord record : group)
{
ret.addFirst(record);
}
// replace the group with all the objects pointed to by the group
group = group.stream()
.flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream())
.map(idMap::get)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
}
System.out.println("ObjectChangeRecord: Partially sorted changes: " + changes);
System.out.println("\tInto list: " + ret);
return ret;
} }
} }

View file

@ -1,47 +1,77 @@
package moe.nekojimi.friendcloud; package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.tasks.PropagateMessageTask; import moe.nekojimi.friendcloud.tasks.PropagateMessageTask;
import moe.nekojimi.friendcloud.tasks.PullChangesTask;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class ObjectChangeTransaction implements Closeable public class ObjectChangeTransaction implements AutoCloseable
{ {
private final ObjectID creator; private final ObjectID creator;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>(); private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
private static ObjectChangeTransaction currentTransaction = null;
private int openCount = 0;
private boolean ended = false; private boolean ended = false;
ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator) private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator)
{ {
this.creator = creator; this.creator = creator;
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
System.out.println("ObjectChangeTransaction: opening transaction");
// attempt to pull changes from the network
Future<?> future = Main.getInstance().getExecutor().submit(new PullChangesTask());
try
{
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e)
{
// this is fine
// throw new RuntimeException(e);
} catch (ExecutionException e)
{
e.printStackTrace(System.err);
throw new RuntimeException(e);
}
} }
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer, ObjectID... objects) public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer)
{ {
ObjectChangeTransaction builder = new ObjectChangeTransaction(connectionManager, creatorPeer); if (currentTransaction == null)
for (ObjectID id : objects) currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer);
{ currentTransaction.increaseOpenCount();
builder.addObjectBeforeChange(id); return currentTransaction;
}
return builder;
} }
public ObjectChangeTransaction addObjectBeforeChange(ObjectID id) private void increaseOpenCount()
{ {
NetworkObject object = Main.getInstance().getModel().getObject(id); openCount++;
if (object != null) }
beforeStates.put(id, object.buildObjectState().build());
return this; public void addObjectBeforeChange(NetworkObject object)
{
beforeStates.putIfAbsent(object.getObjectID(), object.buildObjectState().build());
}
public void addNewlyCreatedObject(NetworkObject object)
{
beforeStates.putIfAbsent(object.getObjectID(), ObjectStatements.ObjectState.getDefaultInstance());
} }
public ObjectChangeRecord endTransaction() public ObjectChangeRecord endTransaction()
@ -53,21 +83,43 @@ public class ObjectChangeTransaction implements Closeable
for (Map.Entry<ObjectID, ObjectStatements.ObjectState> entry : beforeStates.entrySet()) for (Map.Entry<ObjectID, ObjectStatements.ObjectState> entry : beforeStates.entrySet())
{ {
ObjectStatements.ObjectState afterState = Main.getInstance().getModel().getObject(entry.getKey()).buildObjectState().build(); NetworkObject object = Main.getInstance().getModel().getObject(entry.getKey());
ObjectStatements.ObjectState afterState = object.buildObjectState().build();
ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState); ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState);
if (change != null)
{
Main.getInstance().getModel().objectChanged(object);
changes.add(change); changes.add(change);
} }
}
return ObjectChangeRecord.createFromChanges(creator, changes); if (changes.isEmpty())
return null;
return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes);
}
public void commit()
{
// TODO: make this actually perform the changes and mark the transaction as complete; if the transaction closes before commit() make it roll back the changes
} }
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
openCount--;
if (openCount > 0)
return; // still open elsewhere
// end the transaction and get the change object // end the transaction and get the change object
ObjectChangeRecord objectChangeRecord = endTransaction(); ObjectChangeRecord objectChangeRecord = endTransaction();
currentTransaction = null;
if (objectChangeRecord == null)
{
System.out.println("ObjectChangeTransaction: closing transaction, no changes.");
return;
}
System.out.println("ObjectChangeTransaction: closing transaction, submitting change record " + Long.toHexString(objectChangeRecord.getChangeID()));
// add the new change to the model // add the new change to the model
Main.getInstance().getModel().addChangeRecord(objectChangeRecord); Main.getInstance().getModel().setCurrentChangeRecord(objectChangeRecord);
// create a task to propagate the change to other peers // create a task to propagate the change to other peers
Main.getInstance().getExecutor().submit(new PropagateMessageTask(objectChangeRecord.buildObjectChangeMessage().build())); Main.getInstance().getExecutor().submit(new PropagateMessageTask(objectChangeRecord.buildObjectChangeMessage().build()));
} }

View file

@ -0,0 +1,55 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.storage.Storable;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class SharedDirectory implements Storable
{
private final long storageID;
private File directory;
private ObjectID networkFolderID;
public SharedDirectory(long storageID)
{
this.storageID = storageID;
}
@Override
public long getStorageID()
{
return storageID;
}
@Override
public Map<String, Object> getStateMap()
{
String canonicalPath;
try
{
canonicalPath = directory.getCanonicalPath();
} catch (IOException e)
{
throw new RuntimeException(e);
}
return Map.of("directory", canonicalPath, "networkFolderID", networkFolderID.toLong());
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
}
public File getDirectory()
{
return directory;
}
public ObjectID getNetworkFolderID()
{
return networkFolderID;
}
}

View file

@ -0,0 +1,112 @@
package moe.nekojimi.friendcloud;
import engineering.swat.watch.Watch;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.File;
import java.io.IOException;
import java.util.*;
public class SharedFileManager
{
private final List<Watch> watches = new ArrayList<>();
public void addSharedFiles(Set<File> files)
{
if (files.isEmpty())
return;
Controller controller = Main.getInstance().getModel();
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID()))
{
List<NetworkObject> knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
for (NetworkObject knownFile: knownFiles)
{
NetworkFile f = (NetworkFile) knownFile;
boolean removed = files.remove(f.getLocalFile());
if (removed)
System.out.println("Identified known local file " + f.getObjectID() + " = " + f.getLocalFile());
}
for (File sharedFile: files)
{
if (sharedFile.exists())
{
System.out.println("Adding shared network file: " + sharedFile.getAbsolutePath());
NetworkFile networkFile = controller.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FILE);
transaction.addObjectBeforeChange(networkFile);
networkFile.updateFromLocalFile(sharedFile);
controller.objectChanged(networkFile);
}
}
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
public void scanForFileChanges()
{
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
scanForFileChanges(objects);
}
public void scanForFileChanges(List<NetworkObject> objects)
{
// check every NetworkFile to see if any are different from our records
for (NetworkObject object: objects)
{
NetworkFile file = (NetworkFile) object;
if (!file.hasLocalFile())
continue;
File localFile = file.getLocalFile();
boolean changed = (localFile.length() != file.getSize());
if (!changed)
{
Util.HashOutput hashOutput = Util.hashFile(localFile, file.getPieceSize());
changed = ! Arrays.equals(hashOutput.totalDigest, file.getHash());
}
}
}
public void scanForDirectoryChanges()
{
List<SharedDirectory> directories = Main.getInstance().getModel().getDataStore().getDAOForClass(SharedDirectory.class).getAll();
for (SharedDirectory directory: directories)
{
for (File file : Objects.requireNonNull(directory.getDirectory().listFiles()))
{
}
}
}
public boolean startWatchingFiles()
{
// if (watchService == null)
// {
// try
// {
// watchService = FileSystems.getDefault().newWatchService();
// } catch (IOException e)
// {
// e.printStackTrace(System.err);
// return false;
// }
// }
throw new UnsupportedOperationException("NYI");
}
private void filesChanged(Collection<NetworkFile> changed)
{
}
private void newFiles(Collection<File> newFiles)
{
}
}

View file

@ -1,18 +1,27 @@
package moe.nekojimi.friendcloud; package moe.nekojimi.friendcloud;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.LongBuffer; import java.nio.LongBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
public class Util public class Util
{ {
public static long xorBytesToLong(byte[] bytes) public static long xorBytesToLong(byte[] bytes)
{ {
ByteBuffer buf = ByteBuffer.wrap(bytes); ByteBuffer buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.rewind();
LongBuffer longs = buf.asLongBuffer(); LongBuffer longs = buf.asLongBuffer();
long ret = 0xBEEFCAFEF00DBABEL; long ret = 0xBEEFCAFEF00DBABEL;
for (long l: longs.array()) while (longs.hasRemaining())
{ {
ret = ret ^ l; ret = ret ^ longs.get();
} }
return ret; return ret;
} }
@ -23,10 +32,95 @@ public class Util
return ((Number) number).longValue(); return ((Number) number).longValue();
} }
public static HashOutput hashFile(File file)
{
return hashFile(file, 0x100000);
}
public static HashOutput hashFile(File file, long pieceSize)
{
HashOutput ret = new HashOutput();
System.out.println("Calculating hashes for file " + file.getName() + "(Piece size: " + pieceSize + ")");
try (FileInputStream input = new FileInputStream(file))
{
MessageDigest totalDigest = MessageDigest.getInstance("SHA-256");
byte[] pieceBuf = new byte[Math.toIntExact(pieceSize)];
int pieceIdx = 0;
while (true)
{
int bytesRead = input.read(pieceBuf);
if (bytesRead <= 0)
break;
// check to see if this piece is just zeroes, if so, assume it's a missing piece
boolean allZero = true;
for (byte b: pieceBuf)
{
if (b != 0)
{
allZero = false;
break;
}
}
ret.pieces.set(pieceIdx, !allZero);
MessageDigest pieceDigest = MessageDigest.getInstance("SHA-256");
pieceDigest.update(pieceBuf, 0, bytesRead);
ret.pieceDigests.add(pieceDigest.digest());
totalDigest.update(pieceBuf, 0, bytesRead);
pieceIdx++;
}
ret.totalDigest = totalDigest.digest();
System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest));
long pieceCount = file.length() / pieceSize;
System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces.");
return ret;
} catch (NoSuchAlgorithmException | IOException e)
{
throw new RuntimeException(e);
}
}
public static class HashOutput
{
public byte[] totalDigest;
public List<byte[]> pieceDigests = new ArrayList<>();
public BitSet pieces = new BitSet();
}
public static double unconditionalNumberToDouble(Object number) public static double unconditionalNumberToDouble(Object number)
{ {
assert (number instanceof Number); assert (number instanceof Number);
return ((Number) number).doubleValue(); return ((Number) number).doubleValue();
} }
public static Map<String,String> stringifyMap(Map<?,?> map)
{
Map<String,String> ret = new HashMap<>();
for (Map.Entry<?, ?> e : map.entrySet())
{
ret.put(e.getKey().toString(), e.getValue().toString());
}
return ret;
}
public static <K,V> Map<K,V> mapWithoutNullValues(Map<K,V> map)
{
Map<K,V> ret = new HashMap<>();
for (Map.Entry<K,V> e: map.entrySet())
{
if (e.getKey() != null && e.getValue() != null)
ret.put(e.getKey(),e.getValue());
}
return ret;
}
public static <T> CompletableFuture<List<T>> collectFutures(Collection<CompletableFuture<T>> futures)
{
// TODO: should handle timeouts / CancellationException
return CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(unused -> futures.stream().map(CompletableFuture::join).toList());
}
} }

View file

@ -12,7 +12,9 @@ import ru.serce.jnrfuse.FuseStubFS;
import ru.serce.jnrfuse.struct.FileStat; import ru.serce.jnrfuse.struct.FileStat;
import ru.serce.jnrfuse.struct.FuseFileInfo; import ru.serce.jnrfuse.struct.FuseFileInfo;
import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
public class FUSEAccess extends FuseStubFS public class FUSEAccess extends FuseStubFS
@ -27,14 +29,15 @@ public class FUSEAccess extends FuseStubFS
@Override @Override
public int readdir(String path, Pointer buf, FuseFillDir filter, long offset, FuseFileInfo fi) public int readdir(String path, Pointer buf, FuseFillDir filter, long offset, FuseFileInfo fi)
{ {
System.out.println("FUSE: listing contents of directory " + path); List<NetworkFSNode> networkFSNodes = Main.getInstance().getModel().listFSNodes(path);
System.out.println("FUSE: listing contents of directory " + path + ": " + networkFSNodes.size() + " nodes.");
int ret = 0; int ret = 0;
filter.apply(buf, ".", null, 0); filter.apply(buf, ".", null, 0);
filter.apply(buf, "..", null, 0); filter.apply(buf, "..", null, 0);
// filter.apply(buf,"hello", null, 0); // filter.apply(buf,"hello", null, 0);
for (NetworkFSNode fsNode : Main.getInstance().getModel().listFSNodes(path)) for (NetworkFSNode fsNode : networkFSNodes)
{ {
filter.apply(buf, fsNode.getName(), null, 0); filter.apply(buf, fsNode.getName(), null, 0);
} }
@ -42,6 +45,15 @@ public class FUSEAccess extends FuseStubFS
return ret; return ret;
} }
@Override
public int mkdir(String path, long mode)
{
NetworkFolder networkFolder = Main.getInstance().getModel().makeFSFolder(path);
if (networkFolder == null)
return -ErrorCodes.EIO();
return 0;
}
@Override @Override
public int getattr(String path, FileStat stat) public int getattr(String path, FileStat stat)
{ {
@ -156,4 +168,25 @@ public class FUSEAccess extends FuseStubFS
// System.out.println("FUSE: Read " + bytes.length + " bytes."); // System.out.println("FUSE: Read " + bytes.length + " bytes.");
} }
} }
@Override
public int rename(String oldpath, String newpath)
{
NetworkFSNode fsNode = Main.getInstance().getModel().getFSNode(oldpath);
if (fsNode == null)
{
System.err.println("FUSE: Failed to rename file " + oldpath + ": not found");
return -ErrorCodes.ENOENT();
}
try
{
System.out.println("FUSE: Renaming " + oldpath + " to " + newpath);
Main.getInstance().getModel().renameFSNode(fsNode, newpath);
return 0;
} catch (Exception e)
{
e.printStackTrace(System.err);
return -ErrorCodes.EIO();
}
}
} }

View file

@ -0,0 +1,70 @@
package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.objects.ObjectID;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
public abstract class ConnectionBackend<ConnectionType extends PeerConnection> extends Thread
{
private final ConnectionManager connectionManager;
protected final String uriScheme;
abstract List<URI> getURIs();
public ConnectionBackend(@NotNull String name, @NotNull String uriScheme, ConnectionManager connectionManager)
{
super(name);
this.uriScheme = uriScheme;
this.connectionManager = connectionManager;
}
@Override
public final void run()
{
super.run();
while(isListening())
{
try
{
PeerConnection connection = getConnection();
if (connection == null)
break;
connectionManager.receiveConnection(connection);
}
catch (InterruptedException ex)
{
}
catch (IOException ex)
{
break;
}
}
}
protected abstract ConnectionType makeConnection(URI uri, ObjectID peer);
protected URI makeURI(String hostPart, int port)
{
try
{
return new URI(uriScheme, null, hostPart, port, null, null, null);
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
protected abstract ConnectionType getConnection() throws IOException, InterruptedException;
protected abstract boolean isListening();
public String getUriScheme()
{
return uriScheme;
}
public abstract void shutdown();
}

View file

@ -1,60 +1,34 @@
package moe.nekojimi.friendcloud; package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.network.PeerTCPConnection;
import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException; import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import java.util.function.Consumer; import java.util.function.Consumer;
public class ConnectionManager extends Thread public class ConnectionManager
{ {
// private final Executor executor = new ThreadPoolExecutor() // private final Executor executor = new ThreadPoolExecutor()
//TODO: move the TCP stuff to it's own thread, which sends NodeTCPConnections to this thread private final Map<String, ConnectionBackend<?>> backends = new HashMap<>();
private final ServerSocket serverSocket;
private final Set<PeerConnection> activeConnections = new HashSet<>(); private final Set<PeerConnection> activeConnections = new HashSet<>();
private final Set<Consumer<PeerConnection>> newConnectionConsumers = new HashSet<>(); private final Set<Consumer<PeerConnection>> newConnectionConsumers = new HashSet<>();
public ConnectionManager(int portNumber) throws IOException public ConnectionManager()
{ {
serverSocket = new ServerSocket(portNumber);
// serverSocket.bind(new InetSocketAddress()); // serverSocket.bind(new InetSocketAddress());
} }
@Override void receiveConnection(PeerConnection connection)
public void run()
{ {
super.run(); activeConnections.add(connection);
while (!serverSocket.isClosed()) connection.start();
{
try
{
Socket socket = serverSocket.accept();
System.out.println("TCP Connection Manager: accepted connection from " + socket.getRemoteSocketAddress());
PeerTCPConnection nodeTCPConnection = new PeerTCPConnection(socket);
activeConnections.add(nodeTCPConnection);
nodeTCPConnection.start();
for (Consumer<PeerConnection> consumer: newConnectionConsumers) for (Consumer<PeerConnection> consumer: newConnectionConsumers)
{ {
consumer.accept(nodeTCPConnection); consumer.accept(connection);
} }
} catch (Exception e)
{
System.err.println("ConnectionManager experienced exception:" + e.getMessage());
e.printStackTrace(System.err);
}
}
System.err.println("ConnectionManager: thread dying!");
} }
public PeerConnection getNodeConnection(URI uri) throws IOException public PeerConnection getNodeConnection(URI uri) throws IOException
@ -71,14 +45,13 @@ public class ConnectionManager extends Thread
return peerConnection; return peerConnection;
} }
PeerConnection nodeConnection = null; ConnectionBackend<?> backend = backends.get(uri.getScheme());
if (Objects.equals(uri.getScheme(), "tcp")) PeerConnection nodeConnection = backend.makeConnection(uri, peer == null ? new ObjectID(0L) : peer.getObjectID());
{
nodeConnection = new PeerTCPConnection(uri, peer);
nodeConnection.start();
}
if (nodeConnection != null) if (nodeConnection != null)
{
nodeConnection.start();
activeConnections.add(nodeConnection); activeConnections.add(nodeConnection);
}
return nodeConnection; return nodeConnection;
} }
@ -89,7 +62,7 @@ public class ConnectionManager extends Thread
System.out.println("ConnectionManager: trying to get connection to " + peer + " (have " + activeConnections.size() + " connections open)"); System.out.println("ConnectionManager: trying to get connection to " + peer + " (have " + activeConnections.size() + " connections open)");
for (PeerConnection peerConnection: activeConnections) for (PeerConnection peerConnection: activeConnections)
{ {
if (peerConnection.getNode().equals(peer)) if (peerConnection.getPeerID() != null && peerConnection.getPeerID().equals(peer.getObjectID()))
return peerConnection; return peerConnection;
} }
@ -108,9 +81,10 @@ public class ConnectionManager extends Thread
return null; return null;
} }
public void shutdown() throws IOException public void shutdown()
{ {
serverSocket.close(); for (ConnectionBackend<?> backend : backends.values())
backend.shutdown();
for (PeerConnection nc: activeConnections) for (PeerConnection nc: activeConnections)
{ {
nc.shutdown(); nc.shutdown();
@ -131,6 +105,12 @@ public class ConnectionManager extends Thread
activeConnections.removeAll(deadConnections); activeConnections.removeAll(deadConnections);
} }
public void addConnectionBackend(ConnectionBackend<?> backend)
{
backends.put(backend.getUriScheme(), backend);
backend.start();
}
public void addNewConnectionConsumer(Consumer<PeerConnection> consumer) public void addNewConnectionConsumer(Consumer<PeerConnection> consumer)
{ {
newConnectionConsumers.add(consumer); newConnectionConsumers.add(consumer);
@ -141,4 +121,11 @@ public class ConnectionManager extends Thread
newConnectionConsumers.remove(consumer); newConnectionConsumers.remove(consumer);
} }
public List<URI> getURIs()
{
return backends.values().stream()
.filter(ConnectionBackend::isListening)
.flatMap(connectionBackend -> connectionBackend.getURIs().stream())
.toList();
}
} }

View file

@ -1,12 +1,10 @@
package moe.nekojimi.friendcloud.network; package moe.nekojimi.friendcloud.network;
import com.google.protobuf.Any; import com.google.protobuf.*;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import moe.nekojimi.friendcloud.FilePieceAccess; import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.ObjectID;
@ -15,23 +13,24 @@ import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.network.requests.Request; import moe.nekojimi.friendcloud.network.requests.Request;
import moe.nekojimi.friendcloud.protos.PieceMessages; import moe.nekojimi.friendcloud.protos.PieceMessages;
import moe.nekojimi.friendcloud.tasks.PullChangesTask;
import org.jetbrains.annotations.NotNull;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.HashMap; import java.util.*;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
public abstract class PeerConnection extends Thread public abstract class PeerConnection extends Thread
{ {
private final Map<Long, Request<?, ?>> pendingRequests = new HashMap<>(); private final Map<Long, Request<?, ?>> pendingRequests = new HashMap<>();
private Peer peer; private ObjectID peerID = new ObjectID(0);
private long nextMessageId = 1; private long nextMessageId = 1;
private final URI uri; private final URI uri;
private long artificalDelayMs = 0; private long artificalDelayMs = 0;
private final Map<String, MessageHandler<?>> messageHandlers = new HashMap<>();
public PeerConnection() public PeerConnection()
{ {
this(null); this(null);
@ -40,12 +39,13 @@ public abstract class PeerConnection extends Thread
public PeerConnection(URI uri) public PeerConnection(URI uri)
{ {
this.uri = uri; this.uri = uri;
installDefaultMessageHandlers();
} }
public PeerConnection(URI uri, Peer peer) public PeerConnection(URI uri, @NotNull ObjectID peerID)
{ {
this(uri); this(uri);
this.peer = peer; this.peerID = peerID;
} }
@Override @Override
@ -85,9 +85,10 @@ public abstract class PeerConnection extends Thread
private CommonMessages.FriendCloudMessage wrapMessage(Message message, CommonMessages.MessageHeader inReplyTo) private CommonMessages.FriendCloudMessage wrapMessage(Message message, CommonMessages.MessageHeader inReplyTo)
{ {
Peer localPeer = Main.getInstance().getModel().getLocalPeer();
CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder() CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder()
.setMessageId(nextMessageId) .setMessageId(nextMessageId)
.setSenderId(Main.getInstance().getModel().getSelfPeer().getObjectID().toLong()); .setSenderId(localPeer != null ? localPeer.getObjectID().toLong() : 0L);
if (inReplyTo != null) if (inReplyTo != null)
headerBuilder.setReplyToMessageId(inReplyTo.getMessageId()); headerBuilder.setReplyToMessageId(inReplyTo.getMessageId());
@ -114,6 +115,10 @@ public abstract class PeerConnection extends Thread
protected void messageReceived(@org.jetbrains.annotations.NotNull CommonMessages.FriendCloudMessage message) protected void messageReceived(@org.jetbrains.annotations.NotNull CommonMessages.FriendCloudMessage message)
{ {
CommonMessages.MessageHeader header = message.getHeader(); CommonMessages.MessageHeader header = message.getHeader();
Any body = message.getBody();
long replyToMessageId = header.getReplyToMessageId();
ObjectID senderID = new ObjectID(header.getSenderId());
System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
try try
{ {
try try
@ -130,20 +135,19 @@ public abstract class PeerConnection extends Thread
} }
} }
ObjectID senderID = new ObjectID(header.getSenderId()); if (!senderID.isNull())
if (peer == null) {
peer = Main.getInstance().getModel().getOrCreateObject(senderID); if (peerID.isNull())
{
System.out.println("PeerConnection: Identified sender as " + senderID);
peerID = senderID;
}
else else
{ {
if (!senderID.equals(peer.getObjectID())) if (!senderID.equals(peerID))
throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU); throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU);
} }
}
Any body = message.getBody();
long replyToMessageId = header.getReplyToMessageId();
System.out.println("Received message! type=" + body.getTypeUrl() + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
if (replyToMessageId != 0) if (replyToMessageId != 0)
{ {
@ -183,48 +187,108 @@ public abstract class PeerConnection extends Thread
throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name()); throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
} }
@SuppressWarnings({"rawtypes", "unchecked"})
private void handleUnsolicitedMessage(CommonMessages.MessageHeader header, Any body) throws IOException, ReplyWithErrorException private void handleUnsolicitedMessage(CommonMessages.MessageHeader header, Any body) throws IOException, ReplyWithErrorException
{ {
if (body.is(ObjectStatements.ObjectListRequest.class)) String typeUrl = body.getTypeUrl();
if (messageHandlers.containsKey(typeUrl))
{ {
ObjectStatements.ObjectListRequest objectListRequest = body.unpack(ObjectStatements.ObjectListRequest.class); MessageHandler handler = messageHandlers.get(typeUrl);
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(new HashSet<>(objectListRequest.getTypesList())); assert (body.is(handler.clazz));
Message unpack = body.unpack((Class<Message>) handler.clazz);
handler.handle(header, unpack);
}
else
{
System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!");
replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header);
}
}
private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException
{
long replyToMessageId = header.getReplyToMessageId();
System.out.println("Received reply to message ID " + replyToMessageId);
Request<?, ?> request = pendingRequests.get(replyToMessageId);
boolean doneWithRequest = request.handleReply(body);
if (doneWithRequest)
pendingRequests.remove(replyToMessageId);
}
public abstract void shutdown();
public ObjectID getPeerID()
{
return peerID;
}
public synchronized URI getUri()
{
return uri;
}
public <T extends Message> void installMessageHandler(MessageHandler<T> handler)
{
String typeUrl = "type.googleapis.com/" + Internal.getDefaultInstance(handler.clazz).getDescriptorForType().getFullName();
messageHandlers.put(typeUrl, handler);
// System.out.println("PeerConnection: Installed message handler for type " + typeUrl);
}
private void installDefaultMessageHandlers()
{
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException
{
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList()));
ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder(); ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
for (NetworkObject object : objects) for (NetworkObject object : objects)
{ {
objectList.addStates(object.buildObjectState()); objectList.addStates(object.buildObjectState());
} }
// System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList()); // System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList());
sendMessage(wrapMessage(objectList.build(), header)); sendMessage(wrapMessage(objectList.build(), header));
} }
else if (body.is(PieceMessages.FilePiecesRequestMessage.class)) });
installMessageHandler(new MessageHandler<>(PieceMessages.FilePiecesRequestMessage.class)
{ {
PieceMessages.FilePiecesRequestMessage filePiecesRequestMessage = body.unpack(PieceMessages.FilePiecesRequestMessage.class); @Override
if (filePiecesRequestMessage.getPieceMod() == 0) protected void handle(CommonMessages.MessageHeader header, PieceMessages.FilePiecesRequestMessage message) throws IOException
{
if (message.getPieceMod() == 0)
{ {
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header); replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
} }
NetworkFile networkFile = (NetworkFile) Main.getInstance().getModel().getObject(new ObjectID(filePiecesRequestMessage.getFileId())); NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId()));
if (networkFile == null) if (networkFile == null)
{ {
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header); replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
} }
assert networkFile != null; assert networkFile != null;
try (FilePieceAccess filePieceAccess = new FilePieceAccess(networkFile)) try (FilePieceAccess filePieceAccess = new FilePieceAccess(networkFile, FilePieceAccess.OpenMode.READ_ONLY))
{ {
int startIndex = filePiecesRequestMessage.getStartPieceIndex(); int startIndex = message.getStartPieceIndex();
int endIndex = (filePiecesRequestMessage.getStartPieceIndex() + filePiecesRequestMessage.getPieceCount()) - 1; int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1;
System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex); System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex);
for (int index = startIndex; index <= endIndex; index += filePiecesRequestMessage.getPieceMod()) List<Long> indices = new ArrayList<>();
for (int index = startIndex; index <= endIndex; index += message.getPieceMod())
{ {
byte[] buffer = filePieceAccess.readPiece(index); indices.add((long) index);
}
CommonMessages.MultiObjectConfirmationMessage multiObjectConfirmationMessage = CommonMessages.MultiObjectConfirmationMessage.newBuilder().addAllExpectedReturnId(indices).build();
sendMessage(wrapMessage(multiObjectConfirmationMessage, header));
for (Long index : indices)
{
byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index));
if (buffer != null) if (buffer != null)
{ {
System.out.println("Replying to file piece request with piece " + index); System.out.println("Replying to file piece request with piece " + index);
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder() PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
.setPieceIndex(index) .setPieceIndex(Math.toIntExact(index))
.setFileId(networkFile.getObjectID().toLong()) .setFileId(networkFile.getObjectID().toLong())
.setData(ByteString.copyFrom(buffer)) .setData(ByteString.copyFrom(buffer))
.build(); .build();
@ -239,27 +303,74 @@ public abstract class PeerConnection extends Thread
} }
} }
} }
} });
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectChangeRequest.class)
private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException
{ {
long replyToMessageId = header.getReplyToMessageId(); @Override
System.out.println("Received reply to message ID " + replyToMessageId); protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException
Request<?, ?> request = pendingRequests.get(replyToMessageId);
boolean doneWithRequest = request.handleReply(body);
if (doneWithRequest)
pendingRequests.remove(replyToMessageId);
}
public abstract void shutdown() throws IOException;
public synchronized Peer getNode()
{ {
return peer; List<Long> changesSinceList = message.getChangesSinceList();
System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString));
Set<ObjectChangeRecord> changes = Main.getInstance().getModel().findChangesSince(changesSinceList);
if (changes == null)
{
replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header);
}
else
{
ObjectStatements.ObjectChangeListMessage.Builder reply = ObjectStatements.ObjectChangeListMessage.newBuilder();
for (ObjectChangeRecord change : changes)
{
reply.addChangeMessages(change.buildObjectChangeMessage());
}
System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes");
sendMessage(wrapMessage(reply.build(), header));
}
}
});
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectChangeMessage.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message)
{
ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message);
Main.getInstance().getModel().applyChangeRecord(record);
}
});
installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message)
{
List<Long> remoteChangeHeads = message.getCurrentChangeHeadsList();
boolean potentialNewChanges = false;
for (long remoteChangeHead : remoteChangeHeads)
{
boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
if (!exists)
{
potentialNewChanges = true;
break;
}
}
if (potentialNewChanges)
{
PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID)));
Main.getInstance().getExecutor().submit(task);
}
}
});
} }
public synchronized URI getUri() public abstract static class MessageHandler<MessageType extends Message>
{ {
return uri; private final Class<MessageType> clazz;
public MessageHandler(Class<MessageType> clazz)
{
this.clazz = clazz;
}
protected abstract void handle(CommonMessages.MessageHeader header, MessageType message) throws IOException;
} }
} }

View file

@ -0,0 +1,125 @@
package moe.nekojimi.friendcloud.network;
import com.offbynull.portmapper.PortMapperFactory;
import com.offbynull.portmapper.gateway.Bus;
import com.offbynull.portmapper.gateway.Gateway;
import com.offbynull.portmapper.gateways.network.NetworkGateway;
import com.offbynull.portmapper.gateways.process.ProcessGateway;
import com.offbynull.portmapper.mapper.MappedPort;
import com.offbynull.portmapper.mapper.PortMapper;
import com.offbynull.portmapper.mapper.PortType;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
{
private final ServerSocket serverSocket;
private MappedPort mappedPort;
@Override
List<URI> getURIs()
{
List<URI> ret = new ArrayList<>();
ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort()));
ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort()));
if (mappedPort != null)
{
ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort()));
ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort()));
}
return ret;
}
public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException
{
super("TCP Listen Thread", "tcp", connectionManager);
serverSocket = new ServerSocket(port);
// setupIGP(port);
}
private void setupIGP(int port)
{
try
{ // Start gateways
Gateway network = NetworkGateway.create();
Gateway process = ProcessGateway.create();
Bus networkBus = network.getBus();
Bus processBus = process.getBus();
// Discover port forwarding devices and take the first one found
System.out.println("Discovering port mappers...");
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);
;
PortMapper mapper = mappers.getFirst();
System.out.println("Got mapper " + mapper + ", mapping port...");
mappedPort = mapper.mapPort(PortType.TCP, port, port, 3600);
System.out.println("Port mapping added: " + mappedPort);
long refreshDelay = (long) (mappedPort.getLifetime() * 0.9);
Main.getInstance().getExecutor().scheduleWithFixedDelay(() -> {
try
{
System.out.println("Refreshing UPnP port mapping.");
mapper.refreshPort(mappedPort, mappedPort.getLifetime());
} catch (InterruptedException e)
{
e.printStackTrace(System.err);
}
}, refreshDelay, refreshDelay, TimeUnit.SECONDS);
} catch (InterruptedException | IllegalStateException ignored)
{
}
}
@Override
protected TCPPeerConnection makeConnection(URI uri, ObjectID peer)
{
try
{
return new TCPPeerConnection(uri, peer);
} catch (IOException e)
{
System.out.println("TCPConnectionBackend: failed to connect to " + uri + ": " + e.getMessage());
// e.printStackTrace(System.err);
return null;
}
}
@Override
protected TCPPeerConnection getConnection() throws IOException
{
Socket socket = serverSocket.accept();
return new TCPPeerConnection(socket);
}
@Override
protected boolean isListening()
{
return !serverSocket.isClosed();
}
@Override
public void shutdown()
{
try
{
serverSocket.close();
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}

View file

@ -1,6 +1,6 @@
package moe.nekojimi.friendcloud.network; package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.CommonMessages; import moe.nekojimi.friendcloud.protos.CommonMessages;
import java.io.IOException; import java.io.IOException;
@ -9,19 +9,19 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.net.URI; import java.net.URI;
public class PeerTCPConnection extends PeerConnection public class TCPPeerConnection extends PeerConnection
{ {
private final Socket socket; private final Socket socket;
private final int keepAliveTimeS = 300; private final int keepAliveTimeS = 300;
public PeerTCPConnection(URI tcpURL, Peer peer) throws IOException public TCPPeerConnection(URI tcpURL, ObjectID peer) throws IOException
{ {
super(tcpURL, peer); super(tcpURL, peer);
socket = new Socket(tcpURL.getHost(), tcpURL.getPort()); socket = new Socket(tcpURL.getHost(), tcpURL.getPort());
System.out.println("TCP Connection: connected to " + tcpURL + " OK!"); System.out.println("TCP Connection: connected to " + tcpURL + " OK!");
} }
public PeerTCPConnection(Socket openSocket) public TCPPeerConnection(Socket openSocket)
{ {
super(); super();
socket = openSocket; socket = openSocket;
@ -42,8 +42,6 @@ public class PeerTCPConnection extends PeerConnection
if (message != null) if (message != null)
{ {
System.out.println("TCP Connection: read data");
messageReceived(message); messageReceived(message);
} }
} }
@ -60,14 +58,21 @@ public class PeerTCPConnection extends PeerConnection
protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException
{ {
OutputStream outputStream = socket.getOutputStream(); OutputStream outputStream = socket.getOutputStream();
System.out.println("Sending message " + message.getHeader().getMessageId()); System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
message.writeDelimitedTo(outputStream); message.writeDelimitedTo(outputStream);
outputStream.flush(); outputStream.flush();
} }
@Override @Override
public synchronized void shutdown() throws IOException public synchronized void shutdown()
{
try
{ {
socket.close(); socket.close();
} catch (IOException e)
{
System.err.println("TCPPeerConnection: failed to shut down!");
e.printStackTrace(System.err);
}
} }
} }

View file

@ -4,13 +4,15 @@ import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.FilePieceAccess; import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.PieceMessages; import moe.nekojimi.friendcloud.protos.PieceMessages;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.HashSet;
import java.util.List; import java.util.Set;
import java.util.stream.Collectors;
public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMessage, List<Integer>> public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMessage, Set<Integer>>
{ {
private final NetworkFile file; private final NetworkFile file;
private final int startPiece; private final int startPiece;
@ -18,9 +20,9 @@ public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMe
private final int pieceMod; private final int pieceMod;
private FilePieceAccess filePieceAccess; private FilePieceAccess filePieceAccess;
private int expectedPieceCount = 0; // private int expectedPieceCount = 0;
// private List<Long> expectedPieces = new ArrayList<>(); private Set<Integer> expectedPieces = new HashSet<>();
private final List<Integer> receivedPieces = new ArrayList<>(); private final Set<Integer> receivedPieces = new HashSet<>();
public FilePiecesRequest(NetworkFile file, int startPiece, int pieceCount, int pieceMod) public FilePiecesRequest(NetworkFile file, int startPiece, int pieceCount, int pieceMod)
{ {
@ -33,7 +35,7 @@ public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMe
@Override @Override
public PieceMessages.FilePiecesRequestMessage buildMessage() public PieceMessages.FilePiecesRequestMessage buildMessage()
{ {
expectedPieceCount = Math.toIntExact(pieceCount / pieceMod); // expectedPieceCount = Math.toIntExact(pieceCount / pieceMod);
return PieceMessages.FilePiecesRequestMessage.newBuilder() return PieceMessages.FilePiecesRequestMessage.newBuilder()
.setFileId(file.getObjectID().toLong()) .setFileId(file.getObjectID().toLong())
.setPieceCount(pieceCount) .setPieceCount(pieceCount)
@ -51,29 +53,34 @@ public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMe
{ {
if (reply.is(PieceMessages.FilePieceMessage.class)) if (reply.is(PieceMessages.FilePieceMessage.class))
{ {
expectedPieceCount--;
PieceMessages.FilePieceMessage filePieceMessage = reply.unpack(PieceMessages.FilePieceMessage.class); PieceMessages.FilePieceMessage filePieceMessage = reply.unpack(PieceMessages.FilePieceMessage.class);
byte[] buffer = filePieceMessage.getData().toByteArray(); byte[] buffer = filePieceMessage.getData().toByteArray();
int index = Math.toIntExact(filePieceMessage.getPieceIndex()); int index = Math.toIntExact(filePieceMessage.getPieceIndex());
if (filePieceAccess == null) if (filePieceAccess == null)
filePieceAccess = new FilePieceAccess(file); filePieceAccess = new FilePieceAccess(file, FilePieceAccess.OpenMode.READ_WRITE);
filePieceAccess.writePiece((int) index, buffer); filePieceAccess.writePiece((int) index, buffer);
receivedPieces.add(index); receivedPieces.add(index);
} }
else if (reply.is(CommonMessages.MultiObjectConfirmationMessage.class))
{
CommonMessages.MultiObjectConfirmationMessage msg = reply.unpack(CommonMessages.MultiObjectConfirmationMessage.class);
expectedPieces = msg.getExpectedReturnIdList().stream().map(Math::toIntExact).collect(Collectors.toSet());
}
} catch (IOException ex) } catch (IOException ex)
{ {
future.completeExceptionally(ex); future.completeExceptionally(ex);
return true; return true;
} }
if (expectedPieceCount <= 0) if (receivedPieces.equals(expectedPieces))
{ {
future.complete(receivedPieces); future.complete(receivedPieces);
return true;
} }
return expectedPieceCount == 0; return false;
} }
} }

View file

@ -2,15 +2,19 @@ package moe.nekojimi.friendcloud.network.requests;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord; import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.HashSet;
import java.util.Set; import java.util.Set;
public class ObjectChangeRequest extends Request<ObjectStatements.ObjectChangeRequest, Set<ObjectStatements.ObjectChange>> public class ObjectChangeRequest extends Request<ObjectStatements.ObjectChangeRequest, Set<ObjectChangeRecord>>
{ {
private final Set<Long> changesSinceIDs; private final Set<Long> changesSinceIDs;
private Set<Long> expectedIDs = new HashSet<>();
private final Set<Long> receivedIDs = new HashSet<>();
private final Set<ObjectChangeRecord> receivedRecords = new HashSet<>();
public ObjectChangeRequest(Set<Long> changesSinceIDs) public ObjectChangeRequest(Set<Long> changesSinceIDs)
{ {
@ -26,14 +30,35 @@ public class ObjectChangeRequest extends Request<ObjectStatements.ObjectChangeRe
@Override @Override
public boolean handleReply(Any reply) throws InvalidProtocolBufferException public boolean handleReply(Any reply) throws InvalidProtocolBufferException
{ {
if (!super.handleReply(reply)) if (super.handleReply(reply))
return false; return true;
if (reply.is(ObjectStatements.ObjectChangeMessage.class)) if (reply.is(ObjectStatements.ObjectChangeMessage.class))
{ {
ObjectStatements.ObjectChangeMessage objectChangeMessage = reply.unpack(ObjectStatements.ObjectChangeMessage.class); ObjectStatements.ObjectChangeMessage objectChangeMessage = reply.unpack(ObjectStatements.ObjectChangeMessage.class);
ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(objectChangeMessage); ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(objectChangeMessage);
Main.getInstance().getModel().applyChangeRecord(objectChangeRecord); receivedRecords.add(objectChangeRecord);
}
else if (reply.is(CommonMessages.MultiObjectConfirmationMessage.class))
{
CommonMessages.MultiObjectConfirmationMessage msg = reply.unpack(CommonMessages.MultiObjectConfirmationMessage.class);
expectedIDs = new HashSet<>(msg.getExpectedReturnIdList());
}
else if (reply.is(ObjectStatements.ObjectChangeListMessage.class))
{
ObjectStatements.ObjectChangeListMessage msg = reply.unpack(ObjectStatements.ObjectChangeListMessage.class);
for (ObjectStatements.ObjectChangeMessage m : msg.getChangeMessagesList())
{
ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(m);
receivedRecords.add(objectChangeRecord);
}
future.complete(receivedRecords);
return true;
}
if (receivedIDs.equals(expectedIDs))
{
future.complete(receivedRecords);
return true; return true;
} }

View file

@ -2,7 +2,6 @@ package moe.nekojimi.friendcloud.network.requests;
import com.google.protobuf.Any; import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
@ -44,10 +43,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
for (ObjectStatements.ObjectState objectState : objectList.getStatesList()) for (ObjectStatements.ObjectState objectState : objectList.getStatesList())
{ {
System.out.println("Received state of object " + objectState.getObjectId()); ObjectID objectID = new ObjectID(objectState.getObjectId());
NetworkObject object = Main.getInstance().getModel().getOrCreateObject(new ObjectID(objectState.getObjectId())); System.out.println("ObjectListRequest: Received state of object " + objectID);
NetworkObject object = NetworkObject.createByID(objectID);
object.updateFromStateMessage(objectState); object.updateFromStateMessage(objectState);
Main.getInstance().getModel().objectChanged(object);
ret.add(object); ret.add(object);
} }

View file

@ -13,12 +13,18 @@ public abstract class Request<MessageType extends Message, ReturnType>
public abstract MessageType buildMessage(); public abstract MessageType buildMessage();
/**
* Handle a message that was received in reply to the request message.
* @param reply the reply message. May be one of many.
* @return true if no further replies are expected (or able to be processed), false if more replies are coming.
* @throws InvalidProtocolBufferException if reply cannot be decoded.
*/
public boolean handleReply(Any reply) throws InvalidProtocolBufferException public boolean handleReply(Any reply) throws InvalidProtocolBufferException
{ {
if (reply.is(CommonMessages.ErrorMessage.class)) if (reply.is(CommonMessages.ErrorMessage.class))
{ {
CommonMessages.ErrorMessage errorMessage = reply.unpack(CommonMessages.ErrorMessage.class); CommonMessages.ErrorMessage errorMessage = reply.unpack(CommonMessages.ErrorMessage.class);
future.completeExceptionally(new RuntimeException("Request received error response: " + errorMessage.getError().name())); future.completeExceptionally(new RequestReceivedErrorException(errorMessage));
return true; return true;
} }
return false; return false;

View file

@ -0,0 +1,19 @@
package moe.nekojimi.friendcloud.network.requests;
import moe.nekojimi.friendcloud.protos.CommonMessages;
public class RequestReceivedErrorException extends Exception
{
private final CommonMessages.ErrorMessage errorMessage;
public RequestReceivedErrorException(CommonMessages.ErrorMessage errorMessage)
{
super("Request received error:" + errorMessage.getError().name() + "; " + errorMessage.getText());
this.errorMessage = errorMessage;
}
public CommonMessages.ErrorMessage getErrorMessage()
{
return errorMessage;
}
}

View file

@ -17,16 +17,16 @@ public abstract class NetworkFSNode extends NetworkObject
} }
@Override @Override
public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state) public synchronized void updateFromMessageMap(Map<String,String> after, Map<String,String> before)
{ {
super.updateFromStateMessage(state); super.updateFromMessageMap(after,before);
if (state.containsValues("name")) if (after.containsKey("name"))
name = state.getValuesOrThrow("name"); name = after.get("name");
if (state.containsValues("parent")) if (after.containsKey("parent"))
{ {
long parentID = Long.parseLong(state.getValuesOrThrow("parent")); long parentID = Long.parseLong(after.get("parent"));
if (parentID != 0) if (parentID != 0)
parent = (NetworkFolder) Main.getInstance().getModel().getObject(new ObjectID(parentID)); parent = Main.getInstance().getModel().getObject(new ObjectID(parentID));
else else
parent = null; parent = null;
} }
@ -36,7 +36,8 @@ public abstract class NetworkFSNode extends NetworkObject
public ObjectStatements.ObjectState.Builder buildObjectState() public ObjectStatements.ObjectState.Builder buildObjectState()
{ {
return super.buildObjectState() return super.buildObjectState()
.putValues("name", getName()); .putValues("name", getName())
.putValues("parent", parent != null ? Long.toString(parent.getStorageID()) : "0");
} }
@Override @Override
@ -52,7 +53,7 @@ public abstract class NetworkFSNode extends NetworkObject
public void updateFromStateMap(Map<String, Object> map) public void updateFromStateMap(Map<String, Object> map)
{ {
name = map.get("name").toString(); name = map.get("name").toString();
parent = (NetworkFolder) Main.getInstance().getModel().getObject(new ObjectID(((Number)map.get("parent")).longValue())); parent = Main.getInstance().getModel().getObject(new ObjectID(((Number)map.get("parent")).longValue()));
} }
public String getName() public String getName()
@ -65,8 +66,24 @@ public abstract class NetworkFSNode extends NetworkObject
this.name = name; this.name = name;
} }
@Override
public String getFriendlyName()
{
return getName();
}
public String getNetworkPath() public String getNetworkPath()
{ {
return (parent != null ? parent.getNetworkPath() : "") + "/" + name; return (parent != null ? parent.getNetworkPath() : "") + "/" + name;
} }
public NetworkFolder getParent()
{
return parent;
}
public void setParent(NetworkFolder newParent)
{
parent = newParent;
}
} }

View file

@ -5,11 +5,8 @@ import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*; import java.util.*;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -43,7 +40,6 @@ public class NetworkFile extends NetworkFSNode
this.localFile = localFile; this.localFile = localFile;
name = localFile.getName(); name = localFile.getName();
size = localFile.length(); size = localFile.length();
pieceSize = MIN_PIECE_SIZE;
for (pieceSize = MIN_PIECE_SIZE; pieceSize < MAX_PIECE_SIZE; pieceSize *= 2) for (pieceSize = MIN_PIECE_SIZE; pieceSize < MAX_PIECE_SIZE; pieceSize *= 2)
{ {
long pieceCount = size / pieceSize; long pieceCount = size / pieceSize;
@ -51,81 +47,34 @@ public class NetworkFile extends NetworkFSNode
break; break;
} }
pieces = new BitSet(Math.toIntExact(getPieceCount())); Util.HashOutput hashOutput = Util.hashFile(localFile, pieceSize);
pieces = hashOutput.pieces;
hash = hashOutput.totalDigest;
long offset = 0L;
System.out.println("Calculating hashes for file " + localFile.getName() + "(Piece size: " + pieceSize + ")");
try (FileInputStream input = new FileInputStream(localFile))
{
MessageDigest totalDigest = MessageDigest.getInstance("SHA-256");
byte[] pieceBuf = new byte[Math.toIntExact(pieceSize)];
int pieceIdx = 0;
// List<FilePiece> pieces = new ArrayList<>();
while(true)
{
int bytesRead = input.read(pieceBuf);
if (bytesRead <= 0)
break;
// check to see if this piece is just zeroes, if so, assume it's a missing piece
boolean allZero = true;
for (byte b: pieceBuf)
{
if (b != 0)
{
allZero = false;
break;
}
}
pieces.set(pieceIdx, !allZero);
// MessageDigest pieceDigest = MessageDigest.getInstance("SHA-256");
// pieceDigest.update(pieceBuf, 0, bytesRead);
// byte[] pieceHash = pieceDigest.digest();
totalDigest.update(pieceBuf, 0, bytesRead);
// FilePiece piece = new FilePiece(pieceHash, bytesRead, localFile, offset);
// System.out.print(HexFormat.of().formatHex(pieceHash) + ", ");
// pieces.add(piece);
pieceIdx++;
}
System.out.println(); System.out.println();
setLocalFile(localFile); setLocalFile(localFile);
size = localFile.length(); size = localFile.length();
// setPieces(pieces);
setHash(totalDigest.digest());
System.out.println("Total hash: " + HexFormat.of().formatHex(hash));
System.out.println("Have " + pieces.cardinality() + " of " + getPieceCount() + " pieces.");
if (pieces.cardinality() >= getPieceCount()) if (pieces.cardinality() >= getPieceCount())
{ {
peersWithCopy.add(Main.getInstance().getModel().getSelfPeer().getObjectID()); peersWithCopy.add(Main.getInstance().getModel().getLocalPeer().getObjectID());
}
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
} }
} }
@Override @Override
public void updateFromStateMessage(ObjectStatements.ObjectState state) public synchronized void updateFromMessageMap(Map<String, String> after, Map<String, String> before)
{ {
super.updateFromStateMessage(state); super.updateFromMessageMap(after, before);
// if (state.containsValues("path")) if (after.containsKey("size"))
// path = state.getValuesOrThrow("path"); size = Long.parseLong(after.get("size"));
if (state.containsValues("size")) if (after.containsKey("hash"))
size = Long.parseLong(state.getValuesOrThrow("size")); hash = HexFormat.of().parseHex(after.get("hash"));
if (state.containsValues("hash")) if (after.containsKey("pieceSize"))
hash = HexFormat.of().parseHex(state.getValuesOrThrow("hash")); pieceSize = Long.parseLong(after.get("pieceSize"));
if (state.containsValues("pieceSize")) if (after.containsKey("peersWithCopy"))
pieceSize = Long.parseLong(state.getValuesOrThrow("pieceSize"));
if (state.containsValues("peersWithCopy"))
{ {
peersWithCopy.clear(); peersWithCopy.clear();
String[] peers = state.getValuesOrThrow("peersWithCopy").split(","); String[] peers = after.get("peersWithCopy").split(",");
for (String peer: peers) for (String peer: peers)
{ {
peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16))); peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16)));
@ -133,6 +82,7 @@ public class NetworkFile extends NetworkFSNode
} }
} }
@Override @Override
public ObjectStatements.ObjectState.Builder buildObjectState() public ObjectStatements.ObjectState.Builder buildObjectState()
{ {
@ -157,6 +107,7 @@ public class NetworkFile extends NetworkFSNode
return ret; return ret;
} }
@SuppressWarnings("unchecked")
@Override @Override
public void updateFromStateMap(Map<String, Object> map) public void updateFromStateMap(Map<String, Object> map)
{ {
@ -220,11 +171,6 @@ public class NetworkFile extends NetworkFSNode
return pieceSize; return pieceSize;
} }
private void setPieceSize(long pieceSize)
{
this.pieceSize = pieceSize;
}
// public List<FilePiece> getPieces() // public List<FilePiece> getPieces()
// { // {
// return pieces; // return pieces;
@ -240,11 +186,6 @@ public class NetworkFile extends NetworkFSNode
return hash; return hash;
} }
private void setHash(byte[] hash)
{
this.hash = hash;
}
public int getPieceCount() public int getPieceCount()
{ {
return Math.toIntExact(Math.ceilDiv(size, pieceSize)); return Math.toIntExact(Math.ceilDiv(size, pieceSize));
@ -338,6 +279,11 @@ public class NetworkFile extends NetworkFSNode
} }
} }
public boolean hasLocalFile()
{
return localFile != null;
}
public enum StorageType public enum StorageType
{ {
/** The file will be stored as a complete file in the storage directory under it's own name and file path. */ /** The file will be stored as a complete file in the storage directory under it's own name and file path. */

View file

@ -1,5 +1,6 @@
package moe.nekojimi.friendcloud.objects; package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.Storable; import moe.nekojimi.friendcloud.storage.Storable;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -17,6 +18,18 @@ public abstract class NetworkObject implements Storable, Comparable<NetworkObjec
this.objectID = objectID; this.objectID = objectID;
} }
public static NetworkObject createByID(ObjectID objectID)
{
ObjectStatements.ObjectType type = objectID.getType();
return switch (type)
{
case OBJECT_TYPE_PEER -> new Peer(objectID);
case OBJECT_TYPE_FILE -> new NetworkFile(objectID);
case OBJECT_TYPE_FOLDER -> new NetworkFolder(objectID);
default -> throw new IllegalArgumentException("Unrecognised object type!");
};
}
public ObjectID getObjectID() public ObjectID getObjectID()
{ {
return objectID; return objectID;
@ -28,7 +41,6 @@ public abstract class NetworkObject implements Storable, Comparable<NetworkObjec
return getObjectID().toLong(); return getObjectID().toLong();
} }
@Override @Override
public Map<String, Object> getStateMap() public Map<String, Object> getStateMap()
{ {
@ -37,15 +49,31 @@ public abstract class NetworkObject implements Storable, Comparable<NetworkObjec
return ret; return ret;
} }
public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state) public final void updateFromChange(ObjectChangeRecord.Change change)
{
updateFromMessageMap(change.afterValues(), change.beforeValues());
}
private synchronized void updateFromMessageMap(Map<String,String> currentValues)
{
updateFromMessageMap(currentValues,Map.of());
}
protected synchronized void updateFromMessageMap(Map<String,String> currentValues, Map<String,String> beforeValues)
{
}
public synchronized final void updateFromStateMessage(ObjectStatements.ObjectState state)
{ {
if (state.getObjectId() != objectID.toLong()) if (state.getObjectId() != objectID.toLong())
throw new IllegalArgumentException("Wrong object!"); throw new IllegalArgumentException("Wrong object!");
updateFromMessageMap(state.getValuesMap());
} }
public synchronized ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectState a, ObjectStatements.ObjectState b) public synchronized ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectChange a, ObjectStatements.ObjectChange b)
{ {
return null; throw new UnsupportedOperationException("NYI");
} }
public ObjectStatements.ObjectState.Builder buildObjectState() public ObjectStatements.ObjectState.Builder buildObjectState()
@ -73,4 +101,12 @@ public abstract class NetworkObject implements Storable, Comparable<NetworkObjec
{ {
return Objects.hashCode(objectID); return Objects.hashCode(objectID);
} }
public abstract String getFriendlyName();
@Override
public String toString()
{
return this.getClass().getName() + "{" + objectID.toString() + " (" + getFriendlyName() + ")" + "}";
}
} }

View file

@ -1,5 +1,6 @@
package moe.nekojimi.friendcloud.objects; package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import java.util.Objects; import java.util.Objects;
@ -32,6 +33,11 @@ public class ObjectID implements Comparable<ObjectID>
return typePart | systemPart | uniquePart; return typePart | systemPart | uniquePart;
} }
public static ObjectID nullValue()
{
return new ObjectID(0L);
}
public moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType getType() public moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType getType()
{ {
return type; return type;
@ -72,4 +78,9 @@ public class ObjectID implements Comparable<ObjectID>
{ {
return Long.compare(toLong(), objectID.toLong()); return Long.compare(toLong(), objectID.toLong());
} }
public boolean isNull()
{
return type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED;
}
} }

View file

@ -13,10 +13,6 @@ public class Peer extends NetworkObject
private String userName = ""; private String userName = "";
private String systemName = ""; private String systemName = "";
// private Map<NetworkFile, PeerFileState> fileStates = new HashMap<>();
private volatile int lastTriedAddressIdx = -1;
public Peer(ObjectID objectID) public Peer(ObjectID objectID)
{ {
super(objectID); super(objectID);
@ -28,19 +24,18 @@ public class Peer extends NetworkObject
} }
@Override @Override
public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state) protected synchronized void updateFromMessageMap(Map<String, String> after, Map<String, String> before)
{ {
super.updateFromStateMessage(state); super.updateFromMessageMap(after, before);
Map<String,String> values = state.getValuesMap(); if (after.containsKey("userName"))
if (values.containsKey("userName")) userName = after.get("userName");
userName = values.get("userName"); if (after.containsKey("systemName"))
if (values.containsKey("systemName")) systemName = after.get("systemName");
systemName = values.get("systemName"); if (after.containsKey("addresses"))
if (values.containsKey("addresses"))
{ {
addresses.clear(); addresses.clear();
String[] split = values.get("addresses").split(","); String[] split = after.get("addresses").split(",");
for (String s: split) for (String s: split)
{ {
try try
@ -52,7 +47,6 @@ public class Peer extends NetworkObject
} }
} }
} }
// if (values.containsKey("files"))
} }
@Override @Override
@ -104,6 +98,12 @@ public class Peer extends NetworkObject
// return fileStates; // return fileStates;
// } // }
public void setAddresses(Collection<URI> urIs)
{
addresses.clear();
addresses.addAll(urIs);
}
public SortedSet<URI> getAddresses() public SortedSet<URI> getAddresses()
{ {
return addresses; return addresses;
@ -128,4 +128,10 @@ public class Peer extends NetworkObject
{ {
this.systemName = systemName; this.systemName = systemName;
} }
@Override
public String getFriendlyName()
{
return getNodeName();
}
} }

View file

@ -1,92 +0,0 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.Map;
public class PeerFileState extends NetworkObject
{
private ObjectID peerID;
private ObjectID fileID;
private double progress = 0;
public PeerFileState(ObjectID objectID)
{
super(objectID);
}
@Override
public void updateFromStateMessage(ObjectStatements.ObjectState state)
{
super.updateFromStateMessage(state);
peerID = new ObjectID(Long.parseLong(state.getValuesOrThrow("peer")));
fileID = new ObjectID(Long.parseLong(state.getValuesOrThrow("file")));
if (state.containsValues("progress"))
progress = Double.parseDouble(state.getValuesOrThrow("progress"));
}
@Override
public ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectState a, ObjectStatements.ObjectState b)
{
return super.mergeChanges(a, b);
}
@Override
public ObjectStatements.ObjectState.Builder buildObjectState()
{
return super.buildObjectState()
// .putValues("peer", Long.toString(peer.getObjectID().toLong()))
// .putValues("file", Long.toString(file.getObjectID().toLong()))
.putValues("progress", Double.toString(progress));
}
// public void setNode(Peer peer)
// {
// this.peer = peer;
// }
//
// public void setFile(NetworkFile file)
// {
// this.file = file;
// }
public double getProgress()
{
return progress;
}
public void setProgress(double progress)
{
this.progress = progress;
}
// public NetworkFile getFile()
// {
// return file;
// }
//
// public Peer getNode()
// {
// return peer;
// }
@Override
public Map<String, Object> getStateMap()
{
Map<String, Object> ret = super.getStateMap();
// ret.put("peer", peer == null ? 0L : peer.getObjectID().toLong());
// ret.put("file", file == null? 0L : file.getObjectID().toLong());
ret.put("progress", progress);
return ret;
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
// peer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("peer"))));
// file = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("file"))));
progress = Util.unconditionalNumberToDouble( map.get("progress"));
}
}

View file

@ -1,6 +1,5 @@
package moe.nekojimi.friendcloud.storage; package moe.nekojimi.friendcloud.storage;
import java.lang.reflect.Modifier;
import java.util.*; import java.util.*;
public class CachingDataStore extends DataStore public class CachingDataStore extends DataStore
@ -16,6 +15,7 @@ public class CachingDataStore extends DataStore
public synchronized <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz) public synchronized <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz)
{ {
if (daos.containsKey(clazz)) if (daos.containsKey(clazz))
//noinspection unchecked
return (DAO<T>) daos.get(clazz); return (DAO<T>) daos.get(clazz);
else else
{ {
@ -25,29 +25,41 @@ public class CachingDataStore extends DataStore
} }
} }
@Override
public void clear()
{
daos.clear();
backend.clear();
}
@Override @Override
public FSNodeDAO getFSDAO() public FSNodeDAO getFSDAO()
{ {
return backend.getFSDAO(); return backend.getFSDAO();
} }
private Map<Class<? extends Storable>, CachingDAO<?>> daos = new HashMap<>(); private final Map<Class<? extends Storable>, CachingDAO<?>> daos = new HashMap<>();
public class CachingDAO<T extends Storable> implements DAO<T> public class CachingDAO<T extends Storable> implements DAO<T>
{ {
private final DAO<T> backendDao; private final DAO<T> backendDao;
private WeakHashMap<Long, T> cache = new WeakHashMap<>(); private final WeakHashMap<Long, T> cache = new WeakHashMap<>();
public CachingDAO(Class<T> clazz) public CachingDAO(Class<T> clazz)
{ {
this.backendDao = backend.getDAOForClass(clazz); this.backendDao = backend.getDAOForClass(clazz);
} }
@Override
public List<Long> list()
{
return backendDao.list();
}
@Override @Override
public synchronized List<T> getAll() public synchronized List<T> getAll()
{ {
List<T> ret = new ArrayList<>(); List<T> ret = new ArrayList<>();
ret.addAll(cache.values());
for (T t : backendDao.getAll()) for (T t : backendDao.getAll())
{ {
if (!cache.containsKey(t.getStorageID())) if (!cache.containsKey(t.getStorageID()))
@ -55,6 +67,10 @@ public class CachingDataStore extends DataStore
ret.add(t); ret.add(t);
cache.put(t.getStorageID(), t); cache.put(t.getStorageID(), t);
} }
else
{
ret.add(cache.get(t.getStorageID()));
}
} }
return ret; return ret;
} }
@ -76,9 +92,9 @@ public class CachingDataStore extends DataStore
@Override @Override
public T get(long id) public T get(long id)
{ {
T t = backendDao.get(id); if (!cache.containsKey(id))
cache.put(id, t); cache.put(id, backendDao.get(id));
return t; return cache.get(id);
} }
@Override @Override

View file

@ -3,7 +3,6 @@ package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.objects.NetworkFSNode; import moe.nekojimi.friendcloud.objects.NetworkFSNode;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -12,10 +11,13 @@ public abstract class DataStore
public abstract <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz); public abstract <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz);
public abstract FSNodeDAO getFSDAO(); public abstract FSNodeDAO getFSDAO();
public abstract void clear();
public interface DAO<T extends Storable> public interface DAO<T extends Storable>
{ {
default boolean exists(long id) {return list().contains(id);}
default List<Long> list() {return getAll().stream().map(Storable::getStorageID).toList();}
List<T> getAll(); List<T> getAll();
boolean exists(long id);
T create(long id); T create(long id);
T get(long id); T get(long id);
default T getOrCreate(long id) default T getOrCreate(long id)
@ -47,6 +49,15 @@ public abstract class DataStore
this.subclasses = Set.of(subclasses); this.subclasses = Set.of(subclasses);
} }
@Override
public List<Long> list()
{
List<Long> ret = new ArrayList<>();
for (Class<? extends T> subclass : subclasses)
ret.addAll(getDAOForClass(subclass).list());
return ret;
}
@Override @Override
public List<T> getAll() public List<T> getAll()
{ {

View file

@ -64,6 +64,6 @@ public class LocalData implements Storable
localPeer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.getOrDefault("localPeer",0)))); localPeer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.getOrDefault("localPeer",0))));
currentChangeRecord = Main.getInstance().getModel().getChangeRecord(Util.unconditionalNumberToLong(map.getOrDefault("currentChangeRecord",0))); currentChangeRecord = Main.getInstance().getModel().getChangeRecord(Util.unconditionalNumberToLong(map.getOrDefault("currentChangeRecord",0)));
systemID = (int) map.getOrDefault("systemID", 0); systemID = (int) map.getOrDefault("systemID", 0);
System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + currentChangeRecord + ", systemID=" + Integer.toHexString(systemID)); System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + Long.toHexString(currentChangeRecord.getChangeID()) + ", systemID=" + Integer.toHexString(systemID));
} }
} }

View file

@ -1,202 +0,0 @@
package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.objects.*;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.*;
import java.util.stream.Collectors;
public class Model
{
private final CachingDataStore dataStore;
private LocalData localData;
public Model(DataStore dataStore)
{
this.dataStore = new CachingDataStore(dataStore);
}
public synchronized void init()
{
List<LocalData> localDataList = dataStore.getDAOForClass(LocalData.class).getAll();
if (localDataList.isEmpty())
{
localData = dataStore.getDAOForClass(LocalData.class).create(0);
}
else if (localDataList.size() == 1)
{
localData = localDataList.getFirst();
}
else
{
throw new IllegalStateException("We have more than one LocalData somehow!!");
}
if (localData.getSystemID() == 0)
{
Random ran = new Random();
localData.setSystemID(ran.nextInt() & 0x00FFFFFF);
objectChanged(localData);
}
}
public synchronized Peer getSelfPeer()
{
if (localData.getLocalPeer() == null)
{
localData.setLocalPeer(createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER));
objectChanged(localData);
}
return localData.getLocalPeer();
}
// private Map<Long, Node> nodes = new HashMap<>();
public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type)
{
Random ran = new Random();
int randomNumber = ran.nextInt();
ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber);
System.out.println("Assigned new object ID: " + objectID);
return objectID;
}
public static Class<? extends NetworkObject> getNetworkObjectClassByType(ObjectStatements.ObjectType type)
{
return switch (type)
{
case OBJECT_TYPE_FILE -> NetworkFile.class;
case OBJECT_TYPE_FOLDER -> NetworkFolder.class;
case OBJECT_TYPE_PEER -> Peer.class;
case OBJECT_TYPE_PEER_FILE_STATE -> PeerFileState.class;
case OBJECT_TYPE_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("???");
default -> throw new UnsupportedOperationException("NYI");
};
}
public synchronized <T extends NetworkObject> T createObjectByID(ObjectID id)
{
if (id.toLong() == 0)
throw new IllegalArgumentException("Cannot create an object with ID=0!");
ObjectStatements.ObjectType type = id.getType();
System.out.println("Creating new object with type: " + type.name());
if (type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED)
throw new IllegalArgumentException();
T ret = (T) dataStore.getDAOForClass(getNetworkObjectClassByType(type)).create(id.toLong());
return ret;
}
public synchronized <T extends NetworkObject> T createObjectByType(ObjectStatements.ObjectType type)
{
return createObjectByID(getNextObjectID(type));
}
public synchronized <T extends NetworkObject> T getObject(ObjectID id)
{
if (id.toLong() == 0)
return null;
Class<T> clazz = (Class<T>) getNetworkObjectClassByType(id.getType());
return dataStore.getDAOForClass(clazz).get(id.toLong());
}
public synchronized <T extends NetworkObject> T getOrCreateObject(ObjectID id)
{
if (id.toLong() == 0)
return null;
Class<T> clazz = (Class<T>) getNetworkObjectClassByType(id.getType());
return dataStore.getDAOForClass(clazz).getOrCreate(id.toLong());
}
public synchronized List<NetworkObject> listObjects(Set<ObjectStatements.ObjectType> types)
{
List<NetworkObject> ret = new ArrayList<>();
Set<Class<? extends NetworkObject>> classes = types.stream().map(Model::getNetworkObjectClassByType).collect(Collectors.toSet());
for (Class<? extends NetworkObject> clazz: classes)
{
List<? extends NetworkObject> list = dataStore.getDAOForClass(clazz).getAll();
ret.addAll(list);
}
return ret;
}
public synchronized List<NetworkFSNode> listFSNodes(String path)
{
//TODO: dumbest algorithm in the world
List<NetworkFSNode> ret = new ArrayList<>();
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
{
NetworkFSNode fsNode = (NetworkFSNode) object;
String networkPath = fsNode.getNetworkPath();
if (networkPath.substring(0, networkPath.lastIndexOf("/")+1).equals(path))
ret.add(fsNode);
}
return ret;
}
public synchronized NetworkFSNode getFSNode(String path)
{
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
{
NetworkFSNode fsNode = (NetworkFSNode) object;
String networkPath = fsNode.getNetworkPath();
if (networkPath.equals(path))
return fsNode;
}
return null;
}
public synchronized void addChangeRecord(ObjectChangeRecord record)
{
dataStore.getDAOForClass(ObjectChangeRecord.class).update(record);
}
public ObjectChangeRecord getChangeRecord(long id)
{
if (id == 0)
return null;
return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id);
}
public void applyChangeRecord(ObjectChangeRecord record)
{
if (!record.getChangeHeads().contains(localData.getCurrentChangeRecord().getChangeID()))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord().getChangeID());
addChangeRecord(record);
// if (record == null)
// throw new IllegalArgumentException("Cannot apply unknown change!");
}
public Set<ObjectChangeRecord> getChangeHeads()
{
// stupid algorithm - start with all of the changes, then remove the ones that are referenced by something
// TODO: better algorithm
Set<ObjectChangeRecord> ret = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
// for (ObjectChangeRecord record : changeRecords.values())
// {
// throw new UnsupportedOperationException("NYI");
// }
throw new UnsupportedOperationException("NYI");
}
public Set<Peer> listOtherPeers()
{
Set<Peer> ret = new HashSet<>();
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER)))
{
Peer peer = (Peer) object;
if (peer != getSelfPeer())
ret.add(peer);
}
return ret;
}
public <T extends Storable> void objectChanged(T storable)
{
Class<T> clazz = (Class<T>) storable.getClass();
dataStore.getDAOForClass(clazz).update(storable);
}
}

View file

@ -1,6 +1,7 @@
package moe.nekojimi.friendcloud.storage; package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.ObjectChangeRecord; import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.objects.*; import moe.nekojimi.friendcloud.objects.*;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.json.JSONArray; import org.json.JSONArray;
@ -31,6 +32,14 @@ public class StupidJSONFileStore extends DataStore
} }
} }
@Override
public void clear()
{
daos.clear();
storageDirectory.delete();
storageDirectory.mkdirs();
}
@Override @Override
public <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz) public <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz)
{ {
@ -44,8 +53,6 @@ public class StupidJSONFileStore extends DataStore
ret = new NetworkFolderDAO(); ret = new NetworkFolderDAO();
else if (clazz.equals(Peer.class)) else if (clazz.equals(Peer.class))
ret = new PeerDAO(); ret = new PeerDAO();
else if (clazz.equals(PeerFileState.class))
ret = new PeerFileStateDAO();
else if (clazz.equals(NetworkFSNode.class)) else if (clazz.equals(NetworkFSNode.class))
ret = new NetworkFSNodeDAO(); ret = new NetworkFSNodeDAO();
else if (clazz.equals(LocalData.class)) else if (clazz.equals(LocalData.class))
@ -74,7 +81,8 @@ public class StupidJSONFileStore extends DataStore
{ {
File ret = new File(storageDirectory, getNamespace()); File ret = new File(storageDirectory, getNamespace());
if (!ret.exists()) if (!ret.exists())
ret.mkdir(); ret.mkdirs();
assert (ret.exists() && ret.isDirectory());
return ret; return ret;
} }
protected abstract T makeBlank(long id); protected abstract T makeBlank(long id);
@ -174,6 +182,20 @@ public class StupidJSONFileStore extends DataStore
return file.exists(); return file.exists();
} }
@Override
public List<Long> list()
{
List<Long> ret = new ArrayList<>();
// get all files in the storage directory
for (File file : Objects.requireNonNull(getNamespaceDirectory().listFiles()))
{
String name = file.getName();
String nameWithoutExt = name.substring(0, name.indexOf('.'));
ret.add(Long.parseLong(nameWithoutExt, 16));
}
return ret;
}
@Override @Override
public List<T> getAll() public List<T> getAll()
{ {
@ -301,9 +323,8 @@ public class StupidJSONFileStore extends DataStore
@Override @Override
protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException
{ {
if (value instanceof URI) if (value instanceof URI uri)
{ {
URI uri = (URI) value;
return new JSONObject().put("weirdObjectClass", URI.class.getCanonicalName()).put("uri",uri.toString()); return new JSONObject().put("weirdObjectClass", URI.class.getCanonicalName()).put("uri",uri.toString());
} }
return super.serialiseWeirdObject(value); return super.serialiseWeirdObject(value);
@ -330,20 +351,6 @@ public class StupidJSONFileStore extends DataStore
} }
} }
private class PeerFileStateDAO extends NetworkObjectDAO<PeerFileState>
{
@Override
protected String getNamespace()
{
return super.getNamespace() + "/peerFileStates";
}
@Override
protected PeerFileState makeBlank(long id)
{
return new PeerFileState(new ObjectID(id));
}
}
private class LocalDataDAO extends JSONObjectDAO<LocalData> private class LocalDataDAO extends JSONObjectDAO<LocalData>
{ {
@Override @Override
@ -372,5 +379,35 @@ public class StupidJSONFileStore extends DataStore
{ {
return new ObjectChangeRecord(); return new ObjectChangeRecord();
} }
@Override
protected Object deserialiseWeirdObject(JSONObject json)
{
String weirdType = json.getString("weirdObjectClass");
// Class<?> weirdClass = Class.forName(weirdType);
if (weirdType.equals(ObjectChangeRecord.Change.class.getCanonicalName()))
{
return new ObjectChangeRecord.Change(
new ObjectID(json.getLong("objectID")),
Util.stringifyMap(json.getJSONObject("before").toMap()),
Util.stringifyMap(json.getJSONObject("after").toMap()));
}
return super.deserialiseWeirdObject(json);
}
@Override
protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException
{
if (value instanceof ObjectChangeRecord.Change change)
{
JSONObject ret = new JSONObject();
ret.put("weirdObjectClass", ObjectChangeRecord.Change.class.getCanonicalName());
ret.put("objectID", change.objectID().toLong());
ret.put("before", change.beforeValues());
ret.put("after", change.afterValues());
return ret;
}
return super.serialiseWeirdObject(value);
}
} }
} }

View file

@ -1,9 +1,8 @@
package moe.nekojimi.friendcloud.tasks; package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.ConnectionManager; import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.NotificationManager;
import moe.nekojimi.friendcloud.ObjectChangeTransaction; import moe.nekojimi.friendcloud.ObjectChangeTransaction;
import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest; import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest;
@ -15,7 +14,6 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.*; import java.util.*;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.stream.Collectors;
public class FileDownloadTask implements RunnableFuture<File> public class FileDownloadTask implements RunnableFuture<File>
{ {
@ -64,7 +62,7 @@ public class FileDownloadTask implements RunnableFuture<File>
String connectionLine = ""; String connectionLine = "";
String progressLine = ""; String progressLine = "";
Peer selfPeer = Main.getInstance().getModel().getSelfPeer(); Peer selfPeer = Main.getInstance().getModel().getLocalPeer();
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done) while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
{ {
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces."); System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces.");
@ -122,11 +120,11 @@ public class FileDownloadTask implements RunnableFuture<File>
System.out.println("FileDownloadTask: Will download pieces from " + runStart + " to " + runEnd); System.out.println("FileDownloadTask: Will download pieces from " + runStart + " to " + runEnd);
// make one request per connectable peer, striping the needed pieces among them // make one request per connectable peer, striping the needed pieces among them
List<CompletableFuture<List<Integer>>> fileFutures = new ArrayList<>(); List<CompletableFuture<Set<Integer>>> fileFutures = new ArrayList<>();
int offset = 0; int offset = 0;
for (PeerConnection connection : connections) for (PeerConnection connection : connections)
{ {
CompletableFuture<List<Integer>> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size())); CompletableFuture<Set<Integer>> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size()));
fileFutures.add(future); fileFutures.add(future);
offset++; offset++;
} }
@ -134,12 +132,12 @@ public class FileDownloadTask implements RunnableFuture<File>
long timeout = timeoutPerPieceMs * (missingPieceIndices.size() / connections.size()); long timeout = timeoutPerPieceMs * (missingPieceIndices.size() / connections.size());
// wait for all the requests to complete // wait for all the requests to complete
for (CompletableFuture<List<Integer>> future : fileFutures) for (CompletableFuture<Set<Integer>> future : fileFutures)
{ {
try try
{ {
List<Integer> receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS); Set<Integer> receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS);
receivedPieces.forEach(missingPieceIndices::remove); missingPieceIndices.removeAll(receivedPieces);
} catch (InterruptedException e) } catch (InterruptedException e)
{ {
future.cancel(true); future.cancel(true);
@ -160,10 +158,10 @@ public class FileDownloadTask implements RunnableFuture<File>
{ {
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID())) try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID()))
{ {
transaction.addObjectBeforeChange(file.getObjectID()); transaction.addObjectBeforeChange(file);
file.addPeerWithCopy(selfPeer); file.addPeerWithCopy(selfPeer);
} }
catch (IOException ex) catch (IOException ignored)
{ {
} }

View file

@ -1,12 +1,19 @@
package moe.nekojimi.friendcloud.tasks; package moe.nekojimi.friendcloud.tasks;
import com.kstruct.gethostname4j.Hostname;
import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeTransaction; import moe.nekojimi.friendcloud.ObjectChangeTransaction;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.Controller;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class JoinNetworkTask implements Runnable public class JoinNetworkTask implements Runnable
{ {
@ -14,25 +21,57 @@ public class JoinNetworkTask implements Runnable
@Override @Override
public void run() public void run()
{ {
// generate new peer ID System.out.println("JoinNetworkTask: Joining the network!");
ObjectID peerID = null; Controller controller = Main.getInstance().getModel();
try (ObjectChangeTransaction builder = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), peerID))
{ boolean firstJoin = false;
Peer selfPeer = Main.getInstance().getModel().getSelfPeer();
if (selfPeer != null) ObjectID peerID;
peerID = selfPeer.getObjectID(); if (controller.getLocalPeer() != null)
peerID = controller.getLocalPeer().getObjectID();
else else
peerID = Main.getInstance().getModel().getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER); {
peerID = controller.getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER);
// synchronise with the network firstJoin = true;
SyncWithNetworkTask syncWithNetworkTask = new SyncWithNetworkTask(); }
syncWithNetworkTask.run();
ConnectionManager connectionManager = Main.getInstance().getConnectionManager();
if (firstJoin)
{
System.out.println("JoinNetworkTask: Performing first time setup...");
// download the entire state
PullStateTask pullStateTask = new PullStateTask();
Future<?> future = Main.getInstance().getExecutor().submit(pullStateTask);
try
{
future.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e)
{
// throw new RuntimeException(e);
} catch (ExecutionException e)
{
throw new RuntimeException(e);
}
// create our local peer object
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID))
{
// create and submit our Peer object if it doesn't exist
Peer selfPeer = controller.getLocalData().getLocalPeer();
if (selfPeer == null)
{
selfPeer = controller.createLocalPeer(peerID);
selfPeer.setUserName(System.getProperty("user.name"));
String hostname = Hostname.getHostname();
selfPeer.setSystemName(hostname);
selfPeer.setAddresses(connectionManager.getURIs());
controller.objectChanged(selfPeer);
controller.getLocalData().setLocalPeer(selfPeer);
transaction.addNewlyCreatedObject(selfPeer);
}
} catch (IOException e) } catch (IOException e)
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }
}

View file

@ -1,7 +1,7 @@
package moe.nekojimi.friendcloud.tasks; package moe.nekojimi.friendcloud.tasks;
import com.google.protobuf.Message; import com.google.protobuf.Message;
import moe.nekojimi.friendcloud.ConnectionManager; import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.Peer;
@ -21,17 +21,22 @@ public class PropagateMessageTask implements Runnable
public void run() public void run()
{ {
ConnectionManager connectionManager = Main.getInstance().getConnectionManager(); ConnectionManager connectionManager = Main.getInstance().getConnectionManager();
int messagesSent = 0;
for (Peer peer: Main.getInstance().getModel().listOtherPeers()) for (Peer peer: Main.getInstance().getModel().listOtherPeers())
{ {
try try
{ {
PeerConnection connection = connectionManager.getNodeConnection(peer); PeerConnection connection = connectionManager.getNodeConnection(peer);
if (connection != null) if (connection != null)
{
connection.sendUnsolicitedMessage(message); connection.sendUnsolicitedMessage(message);
messagesSent++;
}
} catch (IOException e) } catch (IOException e)
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
System.out.println("PropagateMessageTask: Sent " + message.getDescriptorForType().getFullName() + " to " + messagesSent + " peers.");
} }
} }

View file

@ -0,0 +1,95 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
import moe.nekojimi.friendcloud.network.requests.RequestReceivedErrorException;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class PullChangesTask implements Callable<Boolean>
{
private final Set<Peer> peers;
public PullChangesTask()
{
this(Main.getInstance().getModel().listOtherPeers());
}
public PullChangesTask(Set<Peer> peers)
{
this.peers = peers;
}
@Override
public Boolean call()
{
// for each other peer:
List<CompletableFuture<PullResult>> futures = new ArrayList<>();
Set<Long> changesSinceIDs = Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet());
System.out.println("PullChangesTask: Requesting changes since: " + changesSinceIDs.stream().map(Long::toHexString).collect(Collectors.toSet()));
for (Peer peer : peers)
{
System.out.println("PullChangesTask: Attempting to pull changes from " + peer);
// open a connection
PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
if (connection == null)
continue;
// send a ObjectChangeRequest
ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(changesSinceIDs);
CompletableFuture<PullResult> future = connection.makeRequest(objectChangeRequest)
.handle((changes, ex) ->
{
// integrate the returned changes with our change graph
if (ex == null)
{
Main.getInstance().getModel().applyChangeRecords(changes);
return PullResult.OK;
}
else
{
if (ex instanceof RequestReceivedErrorException re)
{
if (re.getErrorMessage().getError() == CommonMessages.Error.ERROR_END_OF_HISTORY)
{
return PullResult.END_OF_HISTORY;
}
}
ex.printStackTrace(System.err);
return PullResult.FAILED;
}
});
futures.add(future);
}
List<PullResult> results = Util.collectFutures(futures).join();
long peersGotChanges = results.stream().filter(pullResult -> pullResult == PullResult.OK).count();
long peersAtEndOfHistory = results.stream().filter(pullResult -> pullResult == PullResult.END_OF_HISTORY).count();
// if no peers could be contacted:
if (peersGotChanges == 0)
{
// if everyone reported end of history, we aren't synced (need to do a state pull)
// otherwise everyone else is offline so we might as well be synced (we'll cause a fork if we change anything but that's fine)
return peersAtEndOfHistory <= 0;
}
return true;
}
protected enum PullResult
{
OK,
END_OF_HISTORY,
FAILED;
}
}

View file

@ -0,0 +1,106 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
import moe.nekojimi.friendcloud.network.requests.ObjectListRequest;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
/**
* Task to clear our entire local state and re-download it from peers.
*/
public class PullStateTask implements Runnable
{
@Override
public void run()
{
System.out.println("PullStateTask: Pulling state from peers...");
Set<PeerConnection> connections = new HashSet<>();
for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers())
{
String[] split = knownPeerAddress.split(":");
if (split.length != 2)
{
System.err.println("ERROR: " + knownPeerAddress + " isn't a valid address.");
continue;
}
InetSocketAddress address = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
try
{
URI uri = new URI("tcp", null, address.getHostString(), address.getPort(), null, null, null);
PeerConnection nodeConnection = Main.getInstance().getConnectionManager().getNodeConnection(uri);
if (nodeConnection != null)
{
connections.add(nodeConnection);
}
} catch (URISyntaxException | IOException e)
{
throw new RuntimeException(e);
}
}
for (Peer peer : Main.getInstance().getModel().listOtherPeers())
{
PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
if (connection != null)
{
connections.add(connection);
}
}
if (connections.isEmpty())
{
// if we can't connect to anyone, don't replace our state since we can't get a new one
System.out.println("PullStateTask: Have no peers to connect to, giving up.");
return;
}
System.out.println("PullStateTask: Have " + connections.size() + " peers to get state from.");
Main.getInstance().getModel().clearEverything();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (PeerConnection connection : connections)
{
futures.add(connection.makeRequest(new ObjectListRequest(Set.of(
ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects ->
{
System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects.");
for (NetworkObject object : networkObjects)
{
Main.getInstance().getModel().addNetworkObject(object);
}
}));
futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
{
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
}));
}
Util.collectFutures(futures).join();
}
}

View file

@ -1,38 +0,0 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public class SyncWithNetworkTask implements Runnable
{
@Override
public void run()
{
// for each other peer:
for (Peer peer : Main.getInstance().getModel().listOtherPeers())
{
// open a connection
PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
// send a ObjectChangeRequest
ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()));
CompletableFuture<Set<ObjectStatements.ObjectChange>> future = connection.makeRequest(objectChangeRequest);
// integrate the returned changes with our change graph
}
// if no peers could be contacted:
// return success (everyone's offline)
// if everyone reported end of history:
// delete our change graph
// perform a full sync
}
}

View file

@ -17,10 +17,10 @@ message FriendCloudMessage {
message HelloMessage { message HelloMessage {
uint32 protocol_version = 1; // this is the version of the FriendCloud protocol I speak uint32 protocol_version = 1; // this is the version of the FriendCloud protocol I speak
} }
message LoginMessage { message CheckInMessage {
repeated uint64 current_change_heads = 1;
} }
enum Error { enum Error {
@ -44,8 +44,6 @@ message ErrorMessage {
string text = 2; string text = 2;
} }
message PingMessage { message MultiObjectConfirmationMessage {
} repeated uint64 expected_return_id = 1;
message PongMessage {
} }

View file

@ -10,7 +10,6 @@ enum ObjectType {
OBJECT_TYPE_PEER = 2; OBJECT_TYPE_PEER = 2;
OBJECT_TYPE_FILE = 3; OBJECT_TYPE_FILE = 3;
OBJECT_TYPE_FOLDER = 4; OBJECT_TYPE_FOLDER = 4;
OBJECT_TYPE_PEER_FILE_STATE = 5;
} }
message ObjectState { message ObjectState {
@ -38,6 +37,8 @@ message ObjectChangeMessage {
uint64 change_id = 1; uint64 change_id = 1;
repeated uint64 change_heads = 2; repeated uint64 change_heads = 2;
repeated ObjectChange changes = 3; repeated ObjectChange changes = 3;
uint64 creator_id = 4;
uint64 timestamp_ms = 5;
} }
message ObjectChangeListMessage { message ObjectChangeListMessage {
@ -49,7 +50,7 @@ message ObjectChangeRequest {
} }
message ObjectList { message ObjectList {
uint64 change_heads = 1; uint64 change_head = 1;
repeated ObjectState states = 2; repeated ObjectState states = 2;
} }

View file

@ -1,9 +0,0 @@
syntax = "proto3";
option java_package = "moe.nekojimi.friendcloud.protos";
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 results_per_page = 3;
}