Compare commits

...

4 commits

44 changed files with 3523 additions and 846 deletions

29
pom.xml
View file

@ -69,6 +69,35 @@
<artifactId>gethostname4j</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>es.blackleg</groupId>
<artifactId>jlibnotify</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>engineering.swat</groupId>
<artifactId>java-watch</artifactId>
<version>0.9.5</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.github.hypfvieh</groupId>-->
<!-- <artifactId>dbus-java-core</artifactId>-->
<!-- <version>5.1.0-SNAPSHOT</version>-->
<!-- </dependency>-->
<!-- &lt;!&ndash; Unixsocket support using native unix socket implementation of Java (since Java 16) &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.github.hypfvieh</groupId>-->
<!-- <artifactId>dbus-java-transport-native-unixsocket</artifactId>-->
<!-- <version>5.1.0-SNAPSHOT</version>-->
<!-- </dependency>-->
<!-- &lt;!&ndash; Add this if you want support for TCP based DBus connections &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.github.hypfvieh</groupId>-->
<!-- <artifactId>dbus-java-transport-tcp</artifactId>-->
<!-- <version>5.1.0-SNAPSHOT</version>-->
<!-- </dependency>-->
</dependencies>

View file

@ -0,0 +1,411 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.*;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.CachingDataStore;
import moe.nekojimi.friendcloud.storage.DataStore;
import moe.nekojimi.friendcloud.storage.LocalData;
import moe.nekojimi.friendcloud.storage.Storable;
import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
public class Controller
{
private final CachingDataStore dataStore;
private LocalData localData;
private Set<ObjectChangeRecord> changeHeads = null;
public Controller(DataStore dataStore)
{
this.dataStore = new CachingDataStore(dataStore);
}
public synchronized void init()
{
List<LocalData> localDataList = dataStore.getDAOForClass(LocalData.class).getAll();
if (localDataList.isEmpty())
{
localData = dataStore.getDAOForClass(LocalData.class).create(0);
}
else if (localDataList.size() == 1)
{
localData = localDataList.getFirst();
}
else
{
throw new IllegalStateException("We have more than one LocalData somehow!!");
}
if (localData.getSystemID() == 0)
{
Random ran = new Random();
localData.setSystemID(ran.nextInt() & 0x00FFFFFF);
objectChanged(localData);
}
}
public LocalData getLocalData()
{
return localData;
}
public Peer getLocalPeer()
{
return localData.getLocalPeer();
}
public Peer createLocalPeer(ObjectID id)
{
if (localData.getLocalPeer() == null)
{
localData.setLocalPeer(dataStore.getDAOForClass(Peer.class).create(id.toLong()));
objectChanged(localData);
}
return localData.getLocalPeer();
}
// private Map<Long, Node> nodes = new HashMap<>();
public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type)
{
Random ran = new Random();
int randomNumber = ran.nextInt();
ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber);
System.out.println("Assigned new object ID: " + objectID);
return objectID;
}
public static Class<? extends NetworkObject> getNetworkObjectClassByType(ObjectStatements.ObjectType type)
{
return switch (type)
{
case OBJECT_TYPE_FILE -> NetworkFile.class;
case OBJECT_TYPE_FOLDER -> NetworkFolder.class;
case OBJECT_TYPE_PEER -> Peer.class;
case OBJECT_TYPE_UNSPECIFIED, UNRECOGNIZED -> throw new IllegalArgumentException("???");
default -> throw new UnsupportedOperationException("NYI: " + type);
};
}
public synchronized <T extends NetworkObject> T createObjectByID(ObjectID id)
{
if (id.toLong() == 0)
throw new IllegalArgumentException("Cannot create an object with ID=0!");
ObjectStatements.ObjectType type = id.getType();
System.out.println("Creating new object with type: " + type.name());
if (type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED)
throw new IllegalArgumentException();
T ret = (T) dataStore.getDAOForClass(getNetworkObjectClassByType(type)).create(id.toLong());
return ret;
}
public synchronized <T extends NetworkObject> T createObjectByType(ObjectStatements.ObjectType type)
{
return createObjectByID(getNextObjectID(type));
}
public synchronized <T extends NetworkObject> T getObject(ObjectID id)
{
if (id.toLong() == 0)
return null;
Class<T> clazz = (Class<T>) getNetworkObjectClassByType(id.getType());
return dataStore.getDAOForClass(clazz).get(id.toLong());
}
// public synchronized <T extends NetworkObject> T getOrCreateObject(ObjectID id)
// {
// if (id.toLong() == 0)
// return null;
// Class<T> clazz = (Class<T>) getNetworkObjectClassByType(id.getType());
// return dataStore.getDAOForClass(clazz).getOrCreate(id.toLong());
// }
public synchronized List<NetworkObject> listObjects(Set<ObjectStatements.ObjectType> types)
{
List<NetworkObject> ret = new ArrayList<>();
Set<Class<? extends NetworkObject>> classes = types.stream().map(Controller::getNetworkObjectClassByType).collect(Collectors.toSet());
for (Class<? extends NetworkObject> clazz: classes)
{
List<? extends NetworkObject> list = dataStore.getDAOForClass(clazz).getAll();
ret.addAll(list);
}
return ret;
}
public synchronized List<NetworkFSNode> listFSNodes(String path)
{
//TODO: dumbest algorithm in the world
NetworkFolder folder = (NetworkFolder) getFSNode(path);
List<NetworkFSNode> ret = new ArrayList<>();
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
{
NetworkFSNode fsNode = (NetworkFSNode) object;
if (Objects.equals(fsNode.getParent(), folder))
ret.add(fsNode);
}
return ret;
}
public synchronized void addChangeRecord(ObjectChangeRecord record)
{
DataStore.DAO<ObjectChangeRecord> dao = dataStore.getDAOForClass(ObjectChangeRecord.class);
dao.update(record);
// update the change heads; if any of this change's heads are included in ours, then this change replaces them
if (changeHeads != null)
{
boolean recordIsNewHead = changeHeads.isEmpty();
if (!recordIsNewHead)
{
for (long changeHeadID : record.getChangeHeads())
{
ObjectChangeRecord headRecord = dao.get(changeHeadID);
recordIsNewHead = changeHeads.remove(headRecord);
}
}
if (recordIsNewHead)
{
changeHeads.add(record);
System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
}
}
}
public void setCurrentChangeRecord(ObjectChangeRecord record)
{
addChangeRecord(record);
long oldChangeID = localData.getCurrentChangeRecord() == null ? 0L : localData.getCurrentChangeRecord().getChangeID();
localData.setCurrentChangeRecord(record);
objectChanged(localData);
System.out.println("Controller: Change record set; old= " + Long.toHexString(oldChangeID) + " current= " + Long.toHexString(localData.getCurrentChangeRecord().getChangeID()));
}
public ObjectChangeRecord getChangeRecord(long id)
{
if (id == 0)
return null;
return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id);
}
public void addChangeRecords(Set<ObjectChangeRecord> objectChangeRecords)
{
List<ObjectChangeRecord> sorted = ObjectChangeRecord.partiallySort(objectChangeRecords);
for (ObjectChangeRecord record : sorted)
addChangeRecord(record);
}
public void applyChangeRecord(ObjectChangeRecord record)
{
System.out.println("Controller: Applying change record " + record);
long changeID = localData.getCurrentChangeRecord().getChangeID();
if (!record.getChangeHeads().contains(changeID))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID));
addChangeRecord(record);
record.applyToLocalState();
setCurrentChangeRecord(record);
objectChanged(localData);
// if (record == null)
// throw new IllegalArgumentException("Cannot apply unknown change!");
}
public void applyChangeRecords(Set<ObjectChangeRecord> changes)
{
System.out.println("Controller: applying " + changes.size() + " change records.");
addChangeRecords(changes);
// List<ObjectChangeRecord> record = ObjectChangeRecord.partiallySort(changes);
Set<ObjectChangeRecord> pendingChanges = new HashSet<>(changes);
while (!pendingChanges.isEmpty())
{
// find all changes whose change heads = our current change heads
Set<ObjectChangeRecord> applicableChanges = pendingChanges.stream()
.filter(objectChangeRecord -> objectChangeRecord.getChangeHeads().equals(getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet())))
.collect(Collectors.toSet());
// if there are none, we've reached the present (or a present), break
if (applicableChanges.isEmpty())
break;
// if there's just one, follow that one
// TODO: if there's more than one, that's a fork, we should select a current branch by some logic
ObjectChangeRecord changeToApply = applicableChanges.iterator().next();
applyChangeRecord(changeToApply);
// the change we took (and any untaken forks) are no longer pending
pendingChanges.removeAll(applicableChanges);
}
}
public Set<ObjectChangeRecord> getChangeHeads()
{
// stupid algorithm - start with all of the changes, then remove the ones that are referenced by something
// TODO: better algorithm
if (changeHeads == null)
{
changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
Set<Long> referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet());
changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID()));
System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
}
return changeHeads;
}
public Set<Peer> listOtherPeers()
{
Set<Peer> ret = new HashSet<>();
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_PEER)))
{
Peer peer = (Peer) object;
if (localData.getLocalPeer() == null || !peer.equals(localData.getLocalPeer()))
ret.add(peer);
}
return ret;
}
public <T extends Storable> void objectChanged(T storable)
{
Class<T> clazz = (Class<T>) storable.getClass();
dataStore.getDAOForClass(clazz).update(storable);
}
public DataStore getDataStore()
{
return dataStore;
}
public void clearEverything()
{
dataStore.clear();
}
public void addNetworkObject(NetworkObject object)
{
objectChanged(object);
}
public Set<ObjectChangeRecord> findChangesSince(List<Long> changeIDs)
{
// check that the specified change IDs are actually present in our history
DataStore.DAO<ObjectChangeRecord> ocrDao = dataStore.getDAOForClass(ObjectChangeRecord.class);
if (!changeIDs.stream().allMatch(ocrDao::exists))
return null;
// start with the current change heads
Set<ObjectChangeRecord> ret = new HashSet<>();
Deque<ObjectChangeRecord> openSet = new LinkedList<>(getChangeHeads());
while (!openSet.isEmpty())
{
ObjectChangeRecord record = openSet.poll();
if (changeIDs.contains(record.getChangeID()))
continue;
ret.add(record);
for (long changeHeadID: record.getChangeHeads())
{
openSet.add(ocrDao.get(changeHeadID));
}
}
return ret;
}
/**
* Changes the network name and/or path of a given network FS node.
* Performs both renaming and moving operations. If the destination folder(s) don't exist, they will be created as per makeFolder.
* @param fsNode the file or folder being renamed or moved.
* @param newpath the new path of the node, starting with and separated by /.
*/
public synchronized boolean renameFSNode(NetworkFSNode fsNode, String newpath) throws IOException
{
if (fsNode.getNetworkPath().equals(newpath))
return true;
int lastSlash = newpath.lastIndexOf("/");
String name = newpath.substring(lastSlash+1);
String path = newpath.substring(0, lastSlash);
if (path.isEmpty())
path = "/";
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
{
transaction.addObjectBeforeChange(fsNode);
if (!name.equals(fsNode.getName()))
fsNode.setName(name);
NetworkFolder parent = fsNode.getParent();
if (!path.equals(parent != null ? parent.getNetworkPath() : "/"))
{
NetworkFSNode fsn = getFSNode(path);
if (fsn != null && !(fsn instanceof NetworkFolder))
throw new IllegalArgumentException("Given parent path " + fsn.getNetworkPath() + " is a file!");
NetworkFolder newParent = (NetworkFolder) fsn;
if (newParent == null && !path.equals("/"))
{
newParent = makeFSFolder(path);
}
fsNode.setParent(newParent);
}
} catch (IOException e)
{
e.printStackTrace(System.err);
return false;
}
return true;
}
/**
* Creates (or ensures the existence of) a NetworkFolder at the given path, along with all non-existent parent folders.
* @param path the path of the new NetworkFolder; must start with "/"
* @return the new NetworkFolder; or null, if path is "/"
*/
public synchronized NetworkFolder makeFSFolder(String path)
{
if (path.equals("/") || path.isEmpty())
return null;
NetworkFolder ret = null;
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
{
NetworkFSNode node = getFSNode(path);
if (node != null && !(node instanceof NetworkFolder))
throw new IllegalArgumentException("Given parent path " + node.getNetworkPath() + " is a file!");
ret = (NetworkFolder) node;
if (ret != null)
return ret; // already exists
int lastSlash = path.lastIndexOf("/");
String name = path.substring(lastSlash+1);
String parentPath = path.substring(0, lastSlash);
NetworkFolder parent = makeFSFolder(parentPath);
ret = createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER);
ret.setParent(parent);
ret.setName(name);
transaction.addNewlyCreatedObject(ret);
return ret;
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
public synchronized NetworkFSNode getFSNode(String path)
{
// TODO: bad algorithm make better
for (NetworkObject object : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
{
NetworkFSNode fsNode = (NetworkFSNode) object;
String networkPath = fsNode.getNetworkPath();
if (networkPath.equals(path))
return fsNode;
}
return null;
}
}

View file

@ -1,11 +1,10 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.tasks.FileDownloadTask;
import java.io.File;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.FutureTask;
public class DownloadManager
{

View file

@ -12,18 +12,21 @@ public class FilePieceAccess implements Closeable
private final NetworkFile networkFile;
private final File file;
private final RandomAccessFile randomAccessFile;
private final OpenMode openMode;
public FilePieceAccess(NetworkFile networkFile) throws IOException
public FilePieceAccess(NetworkFile networkFile, OpenMode openMode) throws IOException
{
this.networkFile = networkFile;
this.file = networkFile.getOrCreateLocalFile();
this.randomAccessFile = new RandomAccessFile(file,"rw");
randomAccessFile.setLength(file.length());
this.randomAccessFile = new RandomAccessFile(file,openMode == OpenMode.READ_WRITE ? "rw" : "r");
this.openMode = openMode;
if (openMode == OpenMode.READ_WRITE)
randomAccessFile.setLength(file.length());
}
public int getPieceOffset(int index)
public long getPieceOffset(int index)
{
return Math.toIntExact(index * networkFile.getPieceSize());
return (index * networkFile.getPieceSize());
}
public int getPieceSize(int index)
@ -47,7 +50,7 @@ public class FilePieceAccess implements Closeable
int pieceSize = getPieceSize(index);
byte[] buffer = new byte[pieceSize];
int pieceOffset = getPieceOffset(index);
long pieceOffset = getPieceOffset(index);
System.out.println("Reading piece " + index + " from file " + file.getName() + " (offset=" + pieceOffset + ", size=" + pieceSize + ")");
randomAccessFile.seek(pieceOffset);
randomAccessFile.read(buffer);
@ -56,7 +59,9 @@ public class FilePieceAccess implements Closeable
public void writePiece(int index, byte[] buffer) throws IOException
{
if (buffer.length != getPieceSize(index))
if (openMode == OpenMode.READ_ONLY)
throw new IllegalStateException("File was opened read-only!");
else if (buffer.length != getPieceSize(index))
throw new IllegalArgumentException("Received a file piece that's the wrong size!! Length = " + buffer.length + " != Piece Size = " + getPieceSize(index));
else if (index >= networkFile.getPieceCount())
throw new IllegalArgumentException("Received a file piece with an index past the end of the file!!");
@ -72,4 +77,10 @@ public class FilePieceAccess implements Closeable
{
randomAccessFile.close();
}
public enum OpenMode
{
READ_ONLY,
READ_WRITE;
}
}

View file

@ -74,7 +74,7 @@ public class FileRemoteAccess
if (!neededPieces.isEmpty())
{
boolean ok = waitForPieceRange(neededPieces, 10000);
boolean ok = waitForPieceRange(neededPieces, 1000);
if (!ok)
{
System.err.println("FRA: timed out while waiting for pieces " + neededPieces);

View file

@ -2,90 +2,75 @@ package moe.nekojimi.friendcloud;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.kstruct.gethostname4j.Hostname;
import com.offbynull.portmapper.PortMapperFactory;
import com.offbynull.portmapper.gateway.Bus;
import com.offbynull.portmapper.gateway.Gateway;
import com.offbynull.portmapper.gateways.network.NetworkGateway;
import com.offbynull.portmapper.gateways.process.ProcessGateway;
import com.offbynull.portmapper.mapper.MappedPort;
import com.offbynull.portmapper.mapper.PortMapper;
import com.offbynull.portmapper.mapper.PortType;
import jnr.ffi.Platform;
import moe.nekojimi.friendcloud.filesystem.FUSEAccess;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectListRequest;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.objects.PeerFileState;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.network.TCPConnectionBackend;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.storage.DataStore;
import moe.nekojimi.friendcloud.storage.StupidJSONFileStore;
import moe.nekojimi.friendcloud.tasks.JoinNetworkTask;
import moe.nekojimi.friendcloud.tasks.PropagateMessageTask;
import org.slf4j.simple.SimpleLogger;
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.net.*;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class Main
{
private static Main instance;
@Parameter(names="-share")
private List<String> sharedFiles = new ArrayList<>();
@Parameter(names="-known-peer")
private List<String> knownPeers = new ArrayList<>();
@Parameter(names="-tcp-port")
private int tcpPort = 7777;
@Parameter(names="-no-upnp")
private boolean noUpnp = false;
// @Parameter(names="-file")
private Args args;
private ConnectionManager connectionManager;
private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(16);
private final Set<Future<?>> scheduledFutures = new HashSet<>();
private final FUSEAccess fuseAccess = new FUSEAccess();
private Controller controller;
private final NotificationManager notificationManager = new NotificationManager();
private final SharedFileManager sharedFileManager = new SharedFileManager();
public static void main(String[] args)
public static void main(String[] argv)
{
instance = new Main();
JCommander.newBuilder().addObject(instance).build().parse(args);
Args args = new Args();
JCommander.newBuilder().addObject(args).build().parse(argv);
instance.args = args;
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info");
try
{
instance.run();
} catch (IOException | InterruptedException e)
} catch (Exception e)
{
System.err.println("main() received exception, dying horribly!!");
e.printStackTrace(System.err);
try
{
instance.shutdown();
} catch (IOException f)
{
throw new RuntimeException(f);
}
instance.shutdown();
System.exit(1);
}
// TestMessage.SearchRequest request = TestMessage.SearchRequest.newBuilder().setQuery("bees!").setPageNumber(316).setResultsPerPage(42069).build();
}
private void run() throws IOException, InterruptedException
private void run() throws IOException
{
connectionManager = new ConnectionManager(tcpPort);
DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation));
controller = new Controller(dataStore);
controller.init();
connectionManager = new ConnectionManager();
Path mountPoint;
if (Platform.getNativePlatform().getOS() == Platform.OS.WINDOWS)
@ -94,33 +79,24 @@ public class Main
}
else
{
mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + tcpPort);
boolean created = mountPoint.toFile().mkdirs();
mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + args.tcpPort);
mountPoint.toFile().mkdirs();
System.out.println("Created FUSE mount point " + mountPoint);
}
fuseAccess.mount(mountPoint);
System.out.println("Mounted virtual filesystem at " + mountPoint);
Runtime.getRuntime().addShutdownHook(new Thread(() ->
{
try
{
shutdown();
} catch (IOException e)
{
throw new RuntimeException(e);
}
}));
Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
connectionManager.addNewConnectionConsumer(this::resquestCompleteState);
// if (Desktop.isDesktopSupported())
// {
// Desktop desktop = Desktop.getDesktop();
// desktop.browse(mountPoint.toFile().toURI());
// }
connectionManager.start();
// connectionManager.addNewConnectionConsumer(this::requestCompleteState);
String hostname = Hostname.getHostname();
Model.getInstance().getSelfNode().setSystemName(hostname);
Model.getInstance().getSelfNode().setUserName(System.getProperty("user.name") + "-" + tcpPort);
addHostAddress(InetAddress.getLocalHost());
connectionManager.addConnectionBackend(new TCPConnectionBackend(args.tcpPort, connectionManager));
/*
Startup procedure:
@ -132,143 +108,71 @@ public class Main
- Publish local file changes
*/
if (!noUpnp)
setupIGP();
for (String sharedFilePath: sharedFiles)
{
File file = new File(sharedFilePath);
if (file.exists())
{
System.out.println("Adding shared network file: " + file.getAbsolutePath());
NetworkFile networkFile = (NetworkFile) Model.getInstance().createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FILE);
networkFile.updateFromLocalFile(file);
PeerFileState peerFileState = (PeerFileState) Model.getInstance().createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE);
peerFileState.setNode(Model.getInstance().getSelfNode());
peerFileState.setFile(networkFile);
peerFileState.setProgress(100);
}
}
for (String knownPeerAddress : knownPeers)
{
String[] split = knownPeerAddress.split(":");
if (split.length != 2)
{
System.err.println("ERROR: " + knownPeerAddress + " isn't a valid address.");
continue;
}
InetSocketAddress address = new InetSocketAddress(split[0],Integer.parseInt(split[1]));
try
{
URI uri = new URI("tcp", null, address.getHostString(), address.getPort(), null, null, null);
PeerConnection nodeConnection = connectionManager.getNodeConnection(uri);
resquestCompleteState(nodeConnection);
// objectListFuture.whenComplete((networkObjects, throwable) -> {
// for (NetworkObject networkObject: networkObjects)
// {
// if (networkObject instanceof NetworkFile)
// {
// System.out.println("Heard about NetworkFile " + networkObject + ", creating download task!");
// FileDownloadTask fileDownloadTask = new FileDownloadTask((NetworkFile) networkObject, connectionManager);
// executor.submit(fileDownloadTask);
// }
// }
// });
} catch (ConnectException ex)
{
System.out.println("Couldn't connect to host " + address);
}
catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
JoinNetworkTask joinNetworkTask = new JoinNetworkTask();
CompletableFuture.runAsync(joinNetworkTask,executor)
.thenRun(this::shareInitialFiles)
.thenRun(this::scheduleCheckins)
.handle((unused, throwable) ->
{
if (throwable != null)
{
System.err.println("Error in initial task!");
throwable.printStackTrace(System.err);
shutdown();
}
return null;
});
}
private void resquestCompleteState(PeerConnection nodeConnection)
private void scheduleCheckins()
{
CompletableFuture<List<NetworkObject>> objectListFuture = nodeConnection.makeRequest(new ObjectListRequest(Set.of(
ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER_FILE_STATE,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER)));
executor.scheduleWithFixedDelay(() -> {
System.out.println("Checking in with friends...");
CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder()
.addAllCurrentChangeHeads(controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()))
.build();
PropagateMessageTask propagateMessageTask = new PropagateMessageTask(checkInMessage);
executor.submit(propagateMessageTask);
}, 0,5, TimeUnit.MINUTES);
}
private void addHostAddress(InetAddress address)
private void shareInitialFiles()
{
System.out.println("Sharing files given on command line...");
Set<File> sharedFiles = new HashSet<>();
for (String sharedFilePath: args.sharedFilePaths)
{
sharedFiles.add(new File(sharedFilePath));
}
sharedFileManager.addSharedFiles(sharedFiles);
}
private void shutdown()
{
String host = address.getCanonicalHostName();
Peer selfNode = Model.getInstance().getSelfNode();
try
{
URI uri = new URI("tcp", null, host, tcpPort, null, null, null);
System.out.println("Added local address " + uri);
selfNode.addAddress(uri);
} catch (URISyntaxException e)
fuseAccess.umount();
connectionManager.shutdown();
executor.shutdown();
System.out.println("Waiting 10 seconds to complete tasks...");
boolean terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
if (!terminated)
{
System.out.println("Timed out, ending tasks now. Goodbye!");
executor.shutdownNow();
}
else
{
System.out.println("Finished everything. Goodbye!");
}
} catch (Exception e)
{
throw new RuntimeException(e);
}
}
private void setupIGP() throws InterruptedException
{
try
{
// Start gateways
Gateway network = NetworkGateway.create();
Gateway process = ProcessGateway.create();
Bus networkBus = network.getBus();
Bus processBus = process.getBus();
// Discover port forwarding devices and take the first one found
System.out.println("Discovering port mappers...");
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);;
PortMapper mapper = mappers.getFirst();
System.out.println("Got mapper " + mapper + ", mapping port...");
MappedPort mappedPort = mapper.mapPort(PortType.TCP, tcpPort, tcpPort, 60);
System.out.println("Port mapping added: " + mappedPort);
addHostAddress(mappedPort.getExternalAddress());
}
catch (IllegalStateException ex)
{
System.err.println("Failed to map port! error=" + ex.getMessage());
ex.printStackTrace(System.err);
}
}
private void shutdown() throws IOException
{
fuseAccess.umount();
connectionManager.shutdown();
executor.shutdown();
System.out.println("Waiting 10 seconds to complete tasks...");
boolean terminated = false;
try
{
terminated = executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e)
{
throw new RuntimeException(e);
}
if (!terminated)
{
System.out.println("Timed out, ending tasks now. Goodbye!");
executor.shutdownNow();
}
else
{
System.out.println("Finished everything. Goodbye!");
}
}
public static Main getInstance()
{
return instance;
@ -283,4 +187,81 @@ public class Main
{
return connectionManager;
}
public Controller getModel()
{
return controller;
}
public NotificationManager getNotificationManager()
{
return notificationManager;
}
public Controller getController()
{
return controller;
}
public Args getArgs()
{
return args;
}
public SharedFileManager getSharedFileManager()
{
return sharedFileManager;
}
@SuppressWarnings("FieldCanBeLocal")
public static class Args
{
@Parameter(names="-share")
private List<String> sharedFilePaths = new ArrayList<>();
@Parameter(names="-known-peer")
private List<String> knownPeers = new ArrayList<>();
@Parameter(names="-tcp-port")
private int tcpPort = 7777;
@Parameter(names="-no-upnp")
private boolean noUpnp = false;
@Parameter(names="-create-network")
private boolean createNetwork = false;
@Parameter(names = "-storage")
private String storageLocation = ".";
public List<String> getSharedFilePaths()
{
return sharedFilePaths;
}
public List<String> getKnownPeers()
{
return knownPeers;
}
public int getTcpPort()
{
return tcpPort;
}
public boolean isNoUpnp()
{
return noUpnp;
}
public boolean isCreateNetwork()
{
return createNetwork;
}
public String getStorageLocation()
{
return storageLocation;
}
}
}

View file

@ -1,139 +0,0 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.*;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.*;
public class Model
{
private static Model instance = null;
public static Model getInstance()
{
if (instance == null)
instance = new Model();
return instance;
}
private final Map<NetworkObject.ObjectID, NetworkObject> objects = new HashMap<>();
private final int systemID;
private Peer selfPeer = null;
private ObjectChangeRecord changeHead;
private final Map<Long, ObjectChangeRecord> changeRecords = new HashMap<>();
private Model()
{
Random ran = new Random();
systemID = ran.nextInt() & 0x00FFFFFF;
}
public synchronized Peer getSelfNode()
{
if (selfPeer == null)
selfPeer = (Peer) createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER);
return selfPeer;
}
// private Map<Long, Node> nodes = new HashMap<>();
public synchronized NetworkObject.ObjectID getNextObjectID(ObjectStatements.ObjectType type)
{
Random ran = new Random();
int randomNumber = ran.nextInt();
NetworkObject.ObjectID objectID = new NetworkObject.ObjectID(type, systemID, randomNumber);
System.out.println("Assigned new object ID: " + objectID);
return objectID;
}
public synchronized NetworkObject createObjectByID(NetworkObject.ObjectID id)
{
ObjectStatements.ObjectType type = id.getType();
System.out.println("Creating new object with type: " + type.name());
NetworkObject ret = switch (type)
{
// case UNRECOGNIZED -> ;
case OBJECT_TYPE_FILE -> new NetworkFile(id);
case OBJECT_TYPE_UNSPECIFIED -> throw new IllegalArgumentException();
// case OBJECT_TYPE_USER -> null;
case OBJECT_TYPE_FOLDER -> new NetworkFolder(id);
case OBJECT_TYPE_PEER -> new Peer(id);
case OBJECT_TYPE_PEER_FILE_STATE -> new PeerFileState(id);
default -> throw new UnsupportedOperationException("NYI");
};
objects.put(id, ret);
return ret;
}
public synchronized NetworkObject createObjectByType(ObjectStatements.ObjectType type)
{
return createObjectByID(getNextObjectID(type));
}
public synchronized NetworkObject getOrCreateObject(NetworkObject.ObjectID id)
{
if (!objects.containsKey(id))
{
objects.put(id, createObjectByID(id));
}
return objects.get(id);
}
public synchronized List<NetworkObject.ObjectID> listObjects(Set<ObjectStatements.ObjectType> types)
{
return objects.keySet().stream().filter((id)->(types.contains(id.getType()))).toList();
}
public synchronized NetworkObject getObject(NetworkObject.ObjectID objectID)
{
return objects.get(objectID);
}
public synchronized List<NetworkFSNode> listFSNodes(String path)
{
//TODO: dumbest algorithm in the world
List<NetworkFSNode> ret = new ArrayList<>();
for (NetworkObject.ObjectID nodeID : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
{
NetworkFSNode fsNode = (NetworkFSNode) getObject(nodeID);
String networkPath = fsNode.getNetworkPath();
if (networkPath.substring(0, networkPath.lastIndexOf("/")+1).equals(path))
ret.add(fsNode);
}
return ret;
}
public synchronized NetworkFSNode getFSNode(String path)
{
for (NetworkObject.ObjectID nodeID : listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER)))
{
NetworkFSNode fsNode = (NetworkFSNode) getObject(nodeID);
String networkPath = fsNode.getNetworkPath();
if (networkPath.equals(path))
return fsNode;
}
return null;
}
public synchronized void addChangeRecord(ObjectChangeRecord record)
{
changeRecords.put(record.getChangeID(), record);
}
public ObjectChangeRecord getChangeRecord(long id)
{
return changeRecords.get(id);
}
public void applyChangeRecord(ObjectChangeRecord record)
{
if (!record.getChangeHeads().contains(changeHead))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + changeHead.getChangeID());
if (!changeRecords.containsKey(record.getChangeID()))
addChangeRecord(record);
// if (record == null)
// throw new IllegalArgumentException("Cannot apply unknown change!");
}
}

View file

@ -0,0 +1,105 @@
package moe.nekojimi.friendcloud;
import es.blackleg.jlibnotify.JLibnotify;
import es.blackleg.jlibnotify.JLibnotifyNotification;
import es.blackleg.jlibnotify.core.DefaultJLibnotifyLoader;
import es.blackleg.jlibnotify.exception.JLibnotifyInitException;
import es.blackleg.jlibnotify.exception.JLibnotifyLoadException;
public class NotificationManager
{
private final JLibnotify libnotify;
public NotificationManager()
{
JLibnotify n;
try
{
n = DefaultJLibnotifyLoader.init().load();
n.init("FriendCloud");
System.out.println("Libnotify capabilities detected: " + n.getServerCapabilities());
JLibnotifyNotification notification = n.createNotification("FriendCloud started", "It works! Cool!", "dialog-information");
notification.show();
} catch (JLibnotifyLoadException | JLibnotifyInitException e)
{
n = null;
System.err.println("Failed to initialise notification manager.");
e.printStackTrace(System.err);
}
libnotify = n;
}
public enum NotificationType
{
HELLO,
TRANSFER_IN_PROGRESS,
TRANSFER_DONE,
}
protected String getNotificationIcon(NotificationType type)
{
return switch (type)
{
default -> "dialog-information";
};
}
public Notification createNotification(String heading, String body, NotificationType type)
{
try
{
return new Notification(heading, body, type);
} catch (Exception e)
{
e.printStackTrace(System.err);
return null;
}
}
public class Notification
{
private String heading;
private String body;
private NotificationType type;
private JLibnotifyNotification notification;
public Notification(String heading, String body, NotificationType type)
{
this.heading = heading;
this.body = body;
this.type = type;
if (libnotify != null)
{
notification = libnotify.createNotification(heading, body, getNotificationIcon(type));
// notification.setTimeOut(10);
notification.show();
}
}
public void setHeading(String heading)
{
this.heading = heading;
update();
}
public void setBody(String body)
{
this.body = body;
update();
}
public void setType(NotificationType type)
{
this.type = type;
update();
}
public void update()
{
notification.update(heading, body, getNotificationIcon(type));
// notification.setTimeOut(10);
}
}
}

View file

@ -1,39 +1,294 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.Storable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
public class ObjectChangeRecord
public class ObjectChangeRecord implements Storable
{
private final long changeID;
// private final long changeID;
private ObjectChangeRecord(long changeID)
private ObjectID creatorPeer;
private Set<Long> changeHeads = new HashSet<>();
private Set<Change> changes = new HashSet<>();
private Instant creationTime;
public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage)
{
this.changeID = changeID;
ObjectChangeRecord record = new ObjectChangeRecord();
record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList());
record.creatorPeer = new ObjectID(objectChangeMessage.getCreatorId());
record.creationTime = Instant.ofEpochMilli(objectChangeMessage.getTimestampMs());
for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList())
{
record.changes.add(Change.createFromObjectChange(objectChange));
}
long calculatedID = record.getChangeID();
long specifiedID = objectChangeMessage.getChangeId();
if (calculatedID != specifiedID)
{
throw new RuntimeException("Failed to verify change ID! specified=" + Long.toHexString(specifiedID) + " != calculated=" + Long.toHexString(calculatedID));
}
return record;
}
private final Set<ObjectChangeRecord> changeHeads = new HashSet<>();
public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChange objectChange)
public static ObjectChangeRecord createFromChanges(ObjectID creator, Set<Long> changeHeads, Set<Change> changes)
{
throw new UnsupportedOperationException("NYI!");
ObjectChangeRecord record = new ObjectChangeRecord();
record.creatorPeer = creator;
record.creationTime = Instant.now();
record.changes.addAll(changes);
record.changeHeads = changeHeads;
return record;
}
public static ObjectChangeRecord createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after)
public byte[] getHash()
{
throw new UnsupportedOperationException("NYI!");
try
{
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return digest.digest(toString().getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
}
public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage()
{
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
builder.setChangeId(getChangeID());
builder.addAllChangeHeads(changeHeads);
builder.setCreatorId(creatorPeer.toLong());
builder.setTimestampMs(creationTime.toEpochMilli());
for (Change change : changes)
{
builder.addChanges(change.buildObjectChange());
}
return builder;
}
@Override
public Map<String, Object> getStateMap()
{
return Map.of("changeHeads", changeHeads,
"changes", changes,
"creator", creatorPeer.toLong(),
"creationTime", creationTime.toEpochMilli()
);
}
@SuppressWarnings("unchecked")
@Override
public void updateFromStateMap(Map<String, Object> map)
{
changeHeads = new HashSet<>((Collection) map.get("changeHeads"));
changes = new HashSet<>((Collection) map.get("changes"));
creatorPeer = new ObjectID((Long) map.get("creator"));
creationTime = Instant.ofEpochMilli((Long) map.get("creationTime"));
}
public void applyToLocalState()
{
for (Change change: changes)
{
NetworkObject object = Main.getInstance().getModel().getObject(change.objectID);
if (object == null)
object = Main.getInstance().getController().createObjectByID(change.objectID);
object.updateFromChange(change);
Main.getInstance().getModel().objectChanged(object);
}
}
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";");
for (long changeHeadId: changeHeads.stream().sorted().toList())
{
sb.append(changeHeadId).append(",");
}
sb.append(";");
for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList())
{
sb.append(change.toString()).append(";");
}
return sb.toString();
}
public long getChangeID()
{
return changeID;
MessageDigest digest = null;
try
{
digest = MessageDigest.getInstance("SHA-1");
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
String stringVal = toString();
byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8));
// System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal);
return Util.xorBytesToLong(bytes);
}
public Set<ObjectChangeRecord> getChangeHeads()
public ObjectID getCreatorPeer()
{
return Collections.unmodifiableSet(changeHeads);
return creatorPeer;
}
@Override
public long getStorageID()
{
return getChangeID();
}
public Set<Long> getChangeHeads()
{
return changeHeads;
}
public static final class Change
{
private final ObjectID objectID;
private final Map<String, String> beforeValues;
private final Map<String, String> afterValues;
public Change(ObjectID objectID, Map<String, String> beforeValues, Map<String, String> afterValues)
{
this.objectID = objectID;
this.beforeValues = beforeValues;
this.afterValues = afterValues;
}
public static Change createFromObjectChange(ObjectStatements.ObjectChange change)
{
return new Change(new ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap());
}
public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after)
{
Map<String, String> beforeValues = new HashMap<>();
Map<String, String> afterValues = new HashMap<>();
for (String key : after.getValuesMap().keySet())
{
String beforeValue = before.getValuesOrDefault(key, "");
String afterValue = after.getValuesOrDefault(key, "");
if (!afterValue.equals(beforeValue))
{
beforeValues.put(key, beforeValue);
afterValues.put(key, afterValue);
}
}
if (!afterValues.isEmpty())
{
return new Change(new ObjectID(after.getObjectId()), beforeValues, afterValues);
}
return null;
}
public String toString()
{
StringBuilder sb = new StringBuilder();
sb.append(objectID).append(";"); // The object ID, then ;
// now all key-value pairs in alphabetical order
Set<String> keySet = new HashSet<>(beforeValues.keySet());
keySet.addAll(afterValues.keySet());
for (String key : keySet.stream().sorted().toList())
{
sb.append(key).append(":")
.append(beforeValues.getOrDefault(key,""))
.append("->")
.append(afterValues.getOrDefault(key,""))
.append(",");
}
return sb.toString();
}
public ObjectStatements.ObjectChange.Builder buildObjectChange()
{
ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();
builder.putAllBefore(Util.mapWithoutNullValues(beforeValues));
builder.putAllAfter(Util.mapWithoutNullValues(afterValues));
builder.setObjectId(objectID.toLong());
return builder;
}
public ObjectID objectID()
{
return objectID;
}
public Map<String, String> beforeValues()
{
return beforeValues;
}
public Map<String, String> afterValues()
{
return afterValues;
}
@Override
public boolean equals(Object obj)
{
if (obj == this) return true;
if (obj == null || obj.getClass() != this.getClass()) return false;
var that = (Change) obj;
return Objects.equals(this.objectID, that.objectID) &&
Objects.equals(this.beforeValues, that.beforeValues) &&
Objects.equals(this.afterValues, that.afterValues);
}
@Override
public int hashCode()
{
return Objects.hash(objectID, beforeValues, afterValues);
}
}
public static List<ObjectChangeRecord> partiallySort(Set<ObjectChangeRecord> changes)
{
LinkedList<ObjectChangeRecord> ret = new LinkedList<>();
Map<Long,ObjectChangeRecord> idMap = new HashMap<>();
for (ObjectChangeRecord record : changes)
idMap.put(record.getChangeID(), record);
Set<Long> pointedIds = changes.stream()
.flatMap((ObjectChangeRecord objectChangeRecord1) -> objectChangeRecord1.getChangeHeads().stream())
.collect(Collectors.toSet());
Set<ObjectChangeRecord> group = changes.stream()
.filter(objectChangeRecord -> !pointedIds.contains(objectChangeRecord.getChangeID()))
.collect(Collectors.toSet());
while (!group.isEmpty())
{
// add all members of the group to the start of the list
for (ObjectChangeRecord record : group)
{
ret.addFirst(record);
}
// replace the group with all the objects pointed to by the group
group = group.stream()
.flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream())
.map(idMap::get)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
}
System.out.println("ObjectChangeRecord: Partially sorted changes: " + changes);
System.out.println("\tInto list: " + ret);
return ret;
}
}

View file

@ -0,0 +1,126 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.tasks.PropagateMessageTask;
import moe.nekojimi.friendcloud.tasks.PullChangesTask;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class ObjectChangeTransaction implements AutoCloseable
{
private final ObjectID creator;
private final ConnectionManager connectionManager;
private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
private static ObjectChangeTransaction currentTransaction = null;
private int openCount = 0;
private boolean ended = false;
private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator)
{
this.creator = creator;
this.connectionManager = connectionManager;
System.out.println("ObjectChangeTransaction: opening transaction");
// attempt to pull changes from the network
Future<?> future = Main.getInstance().getExecutor().submit(new PullChangesTask());
try
{
future.get(10, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e)
{
// this is fine
// throw new RuntimeException(e);
} catch (ExecutionException e)
{
e.printStackTrace(System.err);
throw new RuntimeException(e);
}
}
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer)
{
if (currentTransaction == null)
currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer);
currentTransaction.increaseOpenCount();
return currentTransaction;
}
private void increaseOpenCount()
{
openCount++;
}
public void addObjectBeforeChange(NetworkObject object)
{
beforeStates.putIfAbsent(object.getObjectID(), object.buildObjectState().build());
}
public void addNewlyCreatedObject(NetworkObject object)
{
beforeStates.putIfAbsent(object.getObjectID(), ObjectStatements.ObjectState.getDefaultInstance());
}
public ObjectChangeRecord endTransaction()
{
if (ended)
throw new IllegalStateException("Transaction already ended!");
ended = true;
Set<ObjectChangeRecord.Change> changes = new HashSet<>();
for (Map.Entry<ObjectID, ObjectStatements.ObjectState> entry : beforeStates.entrySet())
{
NetworkObject object = Main.getInstance().getModel().getObject(entry.getKey());
ObjectStatements.ObjectState afterState = object.buildObjectState().build();
ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState);
if (change != null)
{
Main.getInstance().getModel().objectChanged(object);
changes.add(change);
}
}
if (changes.isEmpty())
return null;
return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes);
}
public void commit()
{
// TODO: make this actually perform the changes and mark the transaction as complete; if the transaction closes before commit() make it roll back the changes
}
@Override
public void close() throws IOException
{
openCount--;
if (openCount > 0)
return; // still open elsewhere
// end the transaction and get the change object
ObjectChangeRecord objectChangeRecord = endTransaction();
currentTransaction = null;
if (objectChangeRecord == null)
{
System.out.println("ObjectChangeTransaction: closing transaction, no changes.");
return;
}
System.out.println("ObjectChangeTransaction: closing transaction, submitting change record " + Long.toHexString(objectChangeRecord.getChangeID()));
// add the new change to the model
Main.getInstance().getModel().setCurrentChangeRecord(objectChangeRecord);
// create a task to propagate the change to other peers
Main.getInstance().getExecutor().submit(new PropagateMessageTask(objectChangeRecord.buildObjectChangeMessage().build()));
}
}

View file

@ -0,0 +1,55 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.storage.Storable;
import java.io.File;
import java.io.IOException;
import java.util.Map;
public class SharedDirectory implements Storable
{
private final long storageID;
private File directory;
private ObjectID networkFolderID;
public SharedDirectory(long storageID)
{
this.storageID = storageID;
}
@Override
public long getStorageID()
{
return storageID;
}
@Override
public Map<String, Object> getStateMap()
{
String canonicalPath;
try
{
canonicalPath = directory.getCanonicalPath();
} catch (IOException e)
{
throw new RuntimeException(e);
}
return Map.of("directory", canonicalPath, "networkFolderID", networkFolderID.toLong());
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
}
public File getDirectory()
{
return directory;
}
public ObjectID getNetworkFolderID()
{
return networkFolderID;
}
}

View file

@ -0,0 +1,112 @@
package moe.nekojimi.friendcloud;
import engineering.swat.watch.Watch;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.File;
import java.io.IOException;
import java.util.*;
public class SharedFileManager
{
private final List<Watch> watches = new ArrayList<>();
public void addSharedFiles(Set<File> files)
{
if (files.isEmpty())
return;
Controller controller = Main.getInstance().getModel();
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID()))
{
List<NetworkObject> knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
for (NetworkObject knownFile: knownFiles)
{
NetworkFile f = (NetworkFile) knownFile;
boolean removed = files.remove(f.getLocalFile());
if (removed)
System.out.println("Identified known local file " + f.getObjectID() + " = " + f.getLocalFile());
}
for (File sharedFile: files)
{
if (sharedFile.exists())
{
System.out.println("Adding shared network file: " + sharedFile.getAbsolutePath());
NetworkFile networkFile = controller.createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_FILE);
transaction.addObjectBeforeChange(networkFile);
networkFile.updateFromLocalFile(sharedFile);
controller.objectChanged(networkFile);
}
}
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
public void scanForFileChanges()
{
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
scanForFileChanges(objects);
}
public void scanForFileChanges(List<NetworkObject> objects)
{
// check every NetworkFile to see if any are different from our records
for (NetworkObject object: objects)
{
NetworkFile file = (NetworkFile) object;
if (!file.hasLocalFile())
continue;
File localFile = file.getLocalFile();
boolean changed = (localFile.length() != file.getSize());
if (!changed)
{
Util.HashOutput hashOutput = Util.hashFile(localFile, file.getPieceSize());
changed = ! Arrays.equals(hashOutput.totalDigest, file.getHash());
}
}
}
public void scanForDirectoryChanges()
{
List<SharedDirectory> directories = Main.getInstance().getModel().getDataStore().getDAOForClass(SharedDirectory.class).getAll();
for (SharedDirectory directory: directories)
{
for (File file : Objects.requireNonNull(directory.getDirectory().listFiles()))
{
}
}
}
public boolean startWatchingFiles()
{
// if (watchService == null)
// {
// try
// {
// watchService = FileSystems.getDefault().newWatchService();
// } catch (IOException e)
// {
// e.printStackTrace(System.err);
// return false;
// }
// }
throw new UnsupportedOperationException("NYI");
}
private void filesChanged(Collection<NetworkFile> changed)
{
}
private void newFiles(Collection<File> newFiles)
{
}
}

View file

@ -0,0 +1,126 @@
package moe.nekojimi.friendcloud;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
public class Util
{
public static long xorBytesToLong(byte[] bytes)
{
ByteBuffer buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.rewind();
LongBuffer longs = buf.asLongBuffer();
long ret = 0xBEEFCAFEF00DBABEL;
while (longs.hasRemaining())
{
ret = ret ^ longs.get();
}
return ret;
}
public static long unconditionalNumberToLong(Object number)
{
assert (number instanceof Number);
return ((Number) number).longValue();
}
public static HashOutput hashFile(File file)
{
return hashFile(file, 0x100000);
}
public static HashOutput hashFile(File file, long pieceSize)
{
HashOutput ret = new HashOutput();
System.out.println("Calculating hashes for file " + file.getName() + "(Piece size: " + pieceSize + ")");
try (FileInputStream input = new FileInputStream(file))
{
MessageDigest totalDigest = MessageDigest.getInstance("SHA-256");
byte[] pieceBuf = new byte[Math.toIntExact(pieceSize)];
int pieceIdx = 0;
while (true)
{
int bytesRead = input.read(pieceBuf);
if (bytesRead <= 0)
break;
// check to see if this piece is just zeroes, if so, assume it's a missing piece
boolean allZero = true;
for (byte b: pieceBuf)
{
if (b != 0)
{
allZero = false;
break;
}
}
ret.pieces.set(pieceIdx, !allZero);
MessageDigest pieceDigest = MessageDigest.getInstance("SHA-256");
pieceDigest.update(pieceBuf, 0, bytesRead);
ret.pieceDigests.add(pieceDigest.digest());
totalDigest.update(pieceBuf, 0, bytesRead);
pieceIdx++;
}
ret.totalDigest = totalDigest.digest();
System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest));
long pieceCount = file.length() / pieceSize;
System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces.");
return ret;
} catch (NoSuchAlgorithmException | IOException e)
{
throw new RuntimeException(e);
}
}
public static class HashOutput
{
public byte[] totalDigest;
public List<byte[]> pieceDigests = new ArrayList<>();
public BitSet pieces = new BitSet();
}
public static double unconditionalNumberToDouble(Object number)
{
assert (number instanceof Number);
return ((Number) number).doubleValue();
}
public static Map<String,String> stringifyMap(Map<?,?> map)
{
Map<String,String> ret = new HashMap<>();
for (Map.Entry<?, ?> e : map.entrySet())
{
ret.put(e.getKey().toString(), e.getValue().toString());
}
return ret;
}
public static <K,V> Map<K,V> mapWithoutNullValues(Map<K,V> map)
{
Map<K,V> ret = new HashMap<>();
for (Map.Entry<K,V> e: map.entrySet())
{
if (e.getKey() != null && e.getValue() != null)
ret.put(e.getKey(),e.getValue());
}
return ret;
}
public static <T> CompletableFuture<List<T>> collectFutures(Collection<CompletableFuture<T>> futures)
{
// TODO: should handle timeouts / CancellationException
return CompletableFuture
.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(unused -> futures.stream().map(CompletableFuture::join).toList());
}
}

View file

@ -2,7 +2,7 @@ package moe.nekojimi.friendcloud.filesystem;
import jnr.ffi.Pointer;
import moe.nekojimi.friendcloud.FileRemoteAccess;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.NetworkFSNode;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkFolder;
@ -12,7 +12,9 @@ import ru.serce.jnrfuse.FuseStubFS;
import ru.serce.jnrfuse.struct.FileStat;
import ru.serce.jnrfuse.struct.FuseFileInfo;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class FUSEAccess extends FuseStubFS
@ -27,14 +29,15 @@ public class FUSEAccess extends FuseStubFS
@Override
public int readdir(String path, Pointer buf, FuseFillDir filter, long offset, FuseFileInfo fi)
{
System.out.println("FUSE: listing contents of directory " + path);
List<NetworkFSNode> networkFSNodes = Main.getInstance().getModel().listFSNodes(path);
System.out.println("FUSE: listing contents of directory " + path + ": " + networkFSNodes.size() + " nodes.");
int ret = 0;
filter.apply(buf, ".", null, 0);
filter.apply(buf, "..", null, 0);
// filter.apply(buf,"hello", null, 0);
for (NetworkFSNode fsNode : Model.getInstance().listFSNodes(path))
for (NetworkFSNode fsNode : networkFSNodes)
{
filter.apply(buf, fsNode.getName(), null, 0);
}
@ -42,6 +45,15 @@ public class FUSEAccess extends FuseStubFS
return ret;
}
@Override
public int mkdir(String path, long mode)
{
NetworkFolder networkFolder = Main.getInstance().getModel().makeFSFolder(path);
if (networkFolder == null)
return -ErrorCodes.EIO();
return 0;
}
@Override
public int getattr(String path, FileStat stat)
{
@ -54,7 +66,7 @@ public class FUSEAccess extends FuseStubFS
}
else
{
NetworkFSNode fsNode = Model.getInstance().getFSNode(path);
NetworkFSNode fsNode = Main.getInstance().getModel().getFSNode(path);
switch (fsNode)
{
case null ->
@ -85,7 +97,7 @@ public class FUSEAccess extends FuseStubFS
public int open(String path, FuseFileInfo fi)
{
System.out.println("FUSE: Opening file " + path);
NetworkFSNode fsNode = Model.getInstance().getFSNode(path);
NetworkFSNode fsNode = Main.getInstance().getModel().getFSNode(path);
if (fsNode == null)
{
System.err.println("FUSE: Failed to open file " + path + ": not found");
@ -156,4 +168,25 @@ public class FUSEAccess extends FuseStubFS
// System.out.println("FUSE: Read " + bytes.length + " bytes.");
}
}
@Override
public int rename(String oldpath, String newpath)
{
NetworkFSNode fsNode = Main.getInstance().getModel().getFSNode(oldpath);
if (fsNode == null)
{
System.err.println("FUSE: Failed to rename file " + oldpath + ": not found");
return -ErrorCodes.ENOENT();
}
try
{
System.out.println("FUSE: Renaming " + oldpath + " to " + newpath);
Main.getInstance().getModel().renameFSNode(fsNode, newpath);
return 0;
} catch (Exception e)
{
e.printStackTrace(System.err);
return -ErrorCodes.EIO();
}
}
}

View file

@ -0,0 +1,70 @@
package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.objects.ObjectID;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
public abstract class ConnectionBackend<ConnectionType extends PeerConnection> extends Thread
{
private final ConnectionManager connectionManager;
protected final String uriScheme;
abstract List<URI> getURIs();
public ConnectionBackend(@NotNull String name, @NotNull String uriScheme, ConnectionManager connectionManager)
{
super(name);
this.uriScheme = uriScheme;
this.connectionManager = connectionManager;
}
@Override
public final void run()
{
super.run();
while(isListening())
{
try
{
PeerConnection connection = getConnection();
if (connection == null)
break;
connectionManager.receiveConnection(connection);
}
catch (InterruptedException ex)
{
}
catch (IOException ex)
{
break;
}
}
}
protected abstract ConnectionType makeConnection(URI uri, ObjectID peer);
protected URI makeURI(String hostPart, int port)
{
try
{
return new URI(uriScheme, null, hostPart, port, null, null, null);
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
protected abstract ConnectionType getConnection() throws IOException, InterruptedException;
protected abstract boolean isListening();
public String getUriScheme()
{
return uriScheme;
}
public abstract void shutdown();
}

View file

@ -1,58 +1,33 @@
package moe.nekojimi.friendcloud;
package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.PeerTCPConnection;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.util.*;
import java.util.function.Consumer;
public class ConnectionManager extends Thread
public class ConnectionManager
{
// private final Executor executor = new ThreadPoolExecutor()
//TODO: move the TCP stuff to it's own thread, which sends NodeTCPConnections to this thread
private final ServerSocket serverSocket;
private final Map<String, ConnectionBackend<?>> backends = new HashMap<>();
private final Set<PeerConnection> activeConnections = new HashSet<>();
private final Set<Consumer<PeerConnection>> newConnectionConsumers = new HashSet<>();
public ConnectionManager(int portNumber) throws IOException
public ConnectionManager()
{
serverSocket = new ServerSocket(portNumber);
// serverSocket.bind(new InetSocketAddress());
}
@Override
public void run()
void receiveConnection(PeerConnection connection)
{
super.run();
while (!serverSocket.isClosed())
activeConnections.add(connection);
connection.start();
for (Consumer<PeerConnection> consumer: newConnectionConsumers)
{
try
{
Socket socket = serverSocket.accept();
System.out.println("TCP Connection Manager: accepted connection from " + socket.getRemoteSocketAddress());
PeerTCPConnection nodeTCPConnection = new PeerTCPConnection(socket);
activeConnections.add(nodeTCPConnection);
nodeTCPConnection.start();
for (Consumer<PeerConnection> consumer: newConnectionConsumers)
{
consumer.accept(nodeTCPConnection);
}
} catch (IOException e)
{
System.err.println("ConnectionManager TCP experienced exception:" + e.getMessage());
e.printStackTrace(System.err);
}
consumer.accept(connection);
}
}
@ -70,24 +45,24 @@ public class ConnectionManager extends Thread
return peerConnection;
}
PeerConnection nodeConnection = null;
if (Objects.equals(uri.getScheme(), "tcp"))
{
nodeConnection = new PeerTCPConnection(uri, peer);
nodeConnection.start();
}
ConnectionBackend<?> backend = backends.get(uri.getScheme());
PeerConnection nodeConnection = backend.makeConnection(uri, peer == null ? new ObjectID(0L) : peer.getObjectID());
if (nodeConnection != null)
{
nodeConnection.start();
activeConnections.add(nodeConnection);
}
return nodeConnection;
}
public PeerConnection getNodeConnection(Peer peer) throws IOException
public PeerConnection getNodeConnection(Peer peer)
{
// try to find if we already have an active connection to this peer
purgeDeadConnections();
System.out.println("ConnectionManager: trying to get connection to " + peer + " (have " + activeConnections.size() + " connections open)");
for (PeerConnection peerConnection: activeConnections)
{
if (peerConnection.getNode() == peer)
if (peerConnection.getPeerID() != null && peerConnection.getPeerID().equals(peer.getObjectID()))
return peerConnection;
}
@ -95,20 +70,21 @@ public class ConnectionManager extends Thread
{
try
{
return getNodeConnection(address);
return getNodeConnection(address, peer);
}
catch (IOException ex)
{
System.err.println("Couldn't create PeerConnection to " + address + " : " + ex.getMessage());
System.err.println("ConnectionManager: Couldn't create PeerConnection to " + address + " : " + ex.getMessage());
}
}
System.err.println("Failed to create PeerConnection to " + peer + "!");
System.err.println("ConnectionManager: Failed to create PeerConnection to " + peer + "!");
return null;
}
public void shutdown() throws IOException
public void shutdown()
{
serverSocket.close();
for (ConnectionBackend<?> backend : backends.values())
backend.shutdown();
for (PeerConnection nc: activeConnections)
{
nc.shutdown();
@ -121,11 +97,20 @@ public class ConnectionManager extends Thread
for (PeerConnection peerConnection: activeConnections)
{
if (!peerConnection.isAlive())
{
System.out.println("ConnectionManager: purged dead connection to " + peerConnection.getUri());
deadConnections.add(peerConnection);
}
}
activeConnections.removeAll(deadConnections);
}
public void addConnectionBackend(ConnectionBackend<?> backend)
{
backends.put(backend.getUriScheme(), backend);
backend.start();
}
public void addNewConnectionConsumer(Consumer<PeerConnection> consumer)
{
newConnectionConsumers.add(consumer);
@ -136,4 +121,11 @@ public class ConnectionManager extends Thread
newConnectionConsumers.remove(consumer);
}
public List<URI> getURIs()
{
return backends.values().stream()
.filter(ConnectionBackend::isListening)
.flatMap(connectionBackend -> connectionBackend.getURIs().stream())
.toList();
}
}

View file

@ -1,36 +1,36 @@
package moe.nekojimi.friendcloud.network;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import com.google.protobuf.*;
import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.network.requests.Request;
import moe.nekojimi.friendcloud.protos.PieceMessages;
import moe.nekojimi.friendcloud.tasks.PullChangesTask;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;
public abstract class PeerConnection extends Thread
{
private final Map<Long, Request<?, ?>> pendingRequests = new HashMap<>();
private Peer peer;
private ObjectID peerID = new ObjectID(0);
private long nextMessageId = 1;
private final URI uri;
private long artificalDelayMs = 0;
private final Map<String, MessageHandler<?>> messageHandlers = new HashMap<>();
public PeerConnection()
{
this(null);
@ -39,12 +39,13 @@ public abstract class PeerConnection extends Thread
public PeerConnection(URI uri)
{
this.uri = uri;
installDefaultMessageHandlers();
}
public PeerConnection(URI uri, Peer peer)
public PeerConnection(URI uri, @NotNull ObjectID peerID)
{
this(uri);
this.peer = peer;
this.peerID = peerID;
}
@Override
@ -75,13 +76,19 @@ public abstract class PeerConnection extends Thread
}
}
public synchronized void sendUnsolicitedMessage(Message message) throws IOException
{
sendMessage(wrapMessage(message));
}
protected abstract void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException;
private CommonMessages.FriendCloudMessage wrapMessage(Message message, CommonMessages.MessageHeader inReplyTo)
{
Peer localPeer = Main.getInstance().getModel().getLocalPeer();
CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder()
.setMessageId(nextMessageId)
.setSenderId(Model.getInstance().getSelfNode().getObjectID().toLong());
.setSenderId(localPeer != null ? localPeer.getObjectID().toLong() : 0L);
if (inReplyTo != null)
headerBuilder.setReplyToMessageId(inReplyTo.getMessageId());
@ -102,37 +109,46 @@ public abstract class PeerConnection extends Thread
{
System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build();
sendMessage(wrapMessage(errorMessage,replyHeader));
sendMessage(wrapMessage(errorMessage, replyHeader));
}
protected void messageReceived(@org.jetbrains.annotations.NotNull CommonMessages.FriendCloudMessage message)
{
if (artificalDelayMs > 0)
{
try
{
System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms...");
Thread.sleep(artificalDelayMs);
} catch (InterruptedException e)
{
// well never mind then
}
}
CommonMessages.MessageHeader header = message.getHeader();
NetworkObject.ObjectID senderID = new NetworkObject.ObjectID(header.getSenderId());
peer = (Peer) Model.getInstance().getOrCreateObject(senderID);
Any body = message.getBody();
long replyToMessageId = header.getReplyToMessageId();
System.out.println("Received message! type=" + body.getTypeUrl() + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId );
ObjectID senderID = new ObjectID(header.getSenderId());
System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
try
{
try
{
if (artificalDelayMs > 0)
{
try
{
System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms...");
Thread.sleep(artificalDelayMs);
} catch (InterruptedException e)
{
// well never mind then
}
}
if (!senderID.isNull())
{
if (peerID.isNull())
{
System.out.println("PeerConnection: Identified sender as " + senderID);
peerID = senderID;
}
else
{
if (!senderID.equals(peerID))
throw new ReplyWithErrorException(CommonMessages.Error.ERROR_WHO_THE_FUCK_ARE_YOU);
}
}
if (replyToMessageId != 0)
{
if (pendingRequests.containsKey(replyToMessageId))
@ -146,13 +162,11 @@ public abstract class PeerConnection extends Thread
{
handleUnsolicitedMessage(header, body);
}
}
catch (ReplyWithErrorException ex)
} catch (ReplyWithErrorException ex)
{
ex.printStackTrace(System.err);
replyWithError(ex.getError(), header);
}
catch (IllegalArgumentException ex)
} catch (IllegalArgumentException ex)
{
ex.printStackTrace(System.err);
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
@ -173,64 +187,21 @@ public abstract class PeerConnection extends Thread
throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
}
@SuppressWarnings({"rawtypes", "unchecked"})
private void handleUnsolicitedMessage(CommonMessages.MessageHeader header, Any body) throws IOException, ReplyWithErrorException
{
if (body.is(ObjectStatements.ObjectListRequest.class))
String typeUrl = body.getTypeUrl();
if (messageHandlers.containsKey(typeUrl))
{
ObjectStatements.ObjectListRequest objectListRequest = body.unpack(ObjectStatements.ObjectListRequest.class);
List<NetworkObject.ObjectID> objectIDS = Model.getInstance().listObjects(new HashSet<>(objectListRequest.getTypesList()));
ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
for (NetworkObject.ObjectID objectID : objectIDS)
{
NetworkObject networkObject = Model.getInstance().getOrCreateObject(objectID);
objectList.addStates(networkObject.buildObjectState());
// networkObject.updateFromStateMessage();
// objectList.addState(networkObject.buildObjectState());
}
System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList());
sendMessage(wrapMessage(objectList.build(), header));
MessageHandler handler = messageHandlers.get(typeUrl);
assert (body.is(handler.clazz));
Message unpack = body.unpack((Class<Message>) handler.clazz);
handler.handle(header, unpack);
}
else if (body.is(PieceMessages.FilePiecesRequestMessage.class))
else
{
PieceMessages.FilePiecesRequestMessage filePiecesRequestMessage = body.unpack(PieceMessages.FilePiecesRequestMessage.class);
if (filePiecesRequestMessage.getPieceMod() == 0)
{
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
}
NetworkFile networkFile = (NetworkFile) Model.getInstance().getObject(new NetworkObject.ObjectID(filePiecesRequestMessage.getFileId()));
if (networkFile == null)
{
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
}
assert networkFile != null;
try (FilePieceAccess filePieceAccess = new FilePieceAccess(networkFile))
{
int startIndex = filePiecesRequestMessage.getStartPieceIndex();
int endIndex = (filePiecesRequestMessage.getStartPieceIndex() + filePiecesRequestMessage.getPieceCount()) - 1;
System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex);
for (int index = startIndex; index <= endIndex; index += filePiecesRequestMessage.getPieceMod())
{
byte[] buffer = filePieceAccess.readPiece(index);
if (buffer != null)
{
System.out.println("Replying to file piece request with piece " + index);
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
.setPieceIndex(index)
.setFileId(networkFile.getObjectID().toLong())
.setData(ByteString.copyFrom(buffer))
.build();
sendMessage(wrapMessage(filePieceMessage, header));
}
else
{
System.err.println("Don't have requested piece " + index + "!");
replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
break;
}
}
}
System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!");
replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header);
}
}
@ -244,15 +215,162 @@ public abstract class PeerConnection extends Thread
pendingRequests.remove(replyToMessageId);
}
public abstract void shutdown() throws IOException;
public abstract void shutdown();
public synchronized Peer getNode()
public ObjectID getPeerID()
{
return peer;
return peerID;
}
public synchronized URI getUri()
{
return uri;
}
public <T extends Message> void installMessageHandler(MessageHandler<T> handler)
{
String typeUrl = "type.googleapis.com/" + Internal.getDefaultInstance(handler.clazz).getDescriptorForType().getFullName();
messageHandlers.put(typeUrl, handler);
// System.out.println("PeerConnection: Installed message handler for type " + typeUrl);
}
private void installDefaultMessageHandlers()
{
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException
{
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList()));
ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
for (NetworkObject object : objects)
{
objectList.addStates(object.buildObjectState());
}
// System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList());
sendMessage(wrapMessage(objectList.build(), header));
}
});
installMessageHandler(new MessageHandler<>(PieceMessages.FilePiecesRequestMessage.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, PieceMessages.FilePiecesRequestMessage message) throws IOException
{
if (message.getPieceMod() == 0)
{
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
}
NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId()));
if (networkFile == null)
{
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
}
assert networkFile != null;
try (FilePieceAccess filePieceAccess = new FilePieceAccess(networkFile, FilePieceAccess.OpenMode.READ_ONLY))
{
int startIndex = message.getStartPieceIndex();
int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1;
System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex);
List<Long> indices = new ArrayList<>();
for (int index = startIndex; index <= endIndex; index += message.getPieceMod())
{
indices.add((long) index);
}
CommonMessages.MultiObjectConfirmationMessage multiObjectConfirmationMessage = CommonMessages.MultiObjectConfirmationMessage.newBuilder().addAllExpectedReturnId(indices).build();
sendMessage(wrapMessage(multiObjectConfirmationMessage, header));
for (Long index : indices)
{
byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index));
if (buffer != null)
{
System.out.println("Replying to file piece request with piece " + index);
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
.setPieceIndex(Math.toIntExact(index))
.setFileId(networkFile.getObjectID().toLong())
.setData(ByteString.copyFrom(buffer))
.build();
sendMessage(wrapMessage(filePieceMessage, header));
}
else
{
System.err.println("Don't have requested piece " + index + "!");
replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
break;
}
}
}
}
});
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectChangeRequest.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException
{
List<Long> changesSinceList = message.getChangesSinceList();
System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString));
Set<ObjectChangeRecord> changes = Main.getInstance().getModel().findChangesSince(changesSinceList);
if (changes == null)
{
replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header);
}
else
{
ObjectStatements.ObjectChangeListMessage.Builder reply = ObjectStatements.ObjectChangeListMessage.newBuilder();
for (ObjectChangeRecord change : changes)
{
reply.addChangeMessages(change.buildObjectChangeMessage());
}
System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes");
sendMessage(wrapMessage(reply.build(), header));
}
}
});
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectChangeMessage.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message)
{
ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message);
Main.getInstance().getModel().applyChangeRecord(record);
}
});
installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message)
{
List<Long> remoteChangeHeads = message.getCurrentChangeHeadsList();
boolean potentialNewChanges = false;
for (long remoteChangeHead : remoteChangeHeads)
{
boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
if (!exists)
{
potentialNewChanges = true;
break;
}
}
if (potentialNewChanges)
{
PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID)));
Main.getInstance().getExecutor().submit(task);
}
}
});
}
public abstract static class MessageHandler<MessageType extends Message>
{
private final Class<MessageType> clazz;
public MessageHandler(Class<MessageType> clazz)
{
this.clazz = clazz;
}
protected abstract void handle(CommonMessages.MessageHeader header, MessageType message) throws IOException;
}
}

View file

@ -0,0 +1,125 @@
package moe.nekojimi.friendcloud.network;
import com.offbynull.portmapper.PortMapperFactory;
import com.offbynull.portmapper.gateway.Bus;
import com.offbynull.portmapper.gateway.Gateway;
import com.offbynull.portmapper.gateways.network.NetworkGateway;
import com.offbynull.portmapper.gateways.process.ProcessGateway;
import com.offbynull.portmapper.mapper.MappedPort;
import com.offbynull.portmapper.mapper.PortMapper;
import com.offbynull.portmapper.mapper.PortType;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
{
private final ServerSocket serverSocket;
private MappedPort mappedPort;
@Override
List<URI> getURIs()
{
List<URI> ret = new ArrayList<>();
ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort()));
ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort()));
if (mappedPort != null)
{
ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort()));
ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort()));
}
return ret;
}
public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException
{
super("TCP Listen Thread", "tcp", connectionManager);
serverSocket = new ServerSocket(port);
// setupIGP(port);
}
private void setupIGP(int port)
{
try
{ // Start gateways
Gateway network = NetworkGateway.create();
Gateway process = ProcessGateway.create();
Bus networkBus = network.getBus();
Bus processBus = process.getBus();
// Discover port forwarding devices and take the first one found
System.out.println("Discovering port mappers...");
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);
;
PortMapper mapper = mappers.getFirst();
System.out.println("Got mapper " + mapper + ", mapping port...");
mappedPort = mapper.mapPort(PortType.TCP, port, port, 3600);
System.out.println("Port mapping added: " + mappedPort);
long refreshDelay = (long) (mappedPort.getLifetime() * 0.9);
Main.getInstance().getExecutor().scheduleWithFixedDelay(() -> {
try
{
System.out.println("Refreshing UPnP port mapping.");
mapper.refreshPort(mappedPort, mappedPort.getLifetime());
} catch (InterruptedException e)
{
e.printStackTrace(System.err);
}
}, refreshDelay, refreshDelay, TimeUnit.SECONDS);
} catch (InterruptedException | IllegalStateException ignored)
{
}
}
@Override
protected TCPPeerConnection makeConnection(URI uri, ObjectID peer)
{
try
{
return new TCPPeerConnection(uri, peer);
} catch (IOException e)
{
System.out.println("TCPConnectionBackend: failed to connect to " + uri + ": " + e.getMessage());
// e.printStackTrace(System.err);
return null;
}
}
@Override
protected TCPPeerConnection getConnection() throws IOException
{
Socket socket = serverSocket.accept();
return new TCPPeerConnection(socket);
}
@Override
protected boolean isListening()
{
return !serverSocket.isClosed();
}
@Override
public void shutdown()
{
try
{
serverSocket.close();
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}

View file

@ -1,6 +1,6 @@
package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import java.io.IOException;
@ -9,19 +9,19 @@ import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
public class PeerTCPConnection extends PeerConnection
public class TCPPeerConnection extends PeerConnection
{
private final Socket socket;
private final int keepAliveTimeS = 300;
public PeerTCPConnection(URI tcpURL, Peer peer) throws IOException
public TCPPeerConnection(URI tcpURL, ObjectID peer) throws IOException
{
super(tcpURL, peer);
socket = new Socket(tcpURL.getHost(), tcpURL.getPort());
System.out.println("TCP Connection: connected to " + tcpURL + " OK!");
}
public PeerTCPConnection(Socket openSocket)
public TCPPeerConnection(Socket openSocket)
{
super();
socket = openSocket;
@ -42,29 +42,37 @@ public class PeerTCPConnection extends PeerConnection
if (message != null)
{
System.out.println("TCP Connection: read data");
messageReceived(message);
}
}
} catch (IOException ex)
} catch (Exception ex)
{
// fuck
ex.printStackTrace(System.err);
}
System.out.println("TCP Connection: connection closed");
}
@Override
protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException
{
OutputStream outputStream = socket.getOutputStream();
System.out.println("Sending message " + message.getHeader().getMessageId());
System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
message.writeDelimitedTo(outputStream);
outputStream.flush();
}
@Override
public synchronized void shutdown() throws IOException
public synchronized void shutdown()
{
socket.close();
try
{
socket.close();
} catch (IOException e)
{
System.err.println("TCPPeerConnection: failed to shut down!");
e.printStackTrace(System.err);
}
}
}

View file

@ -4,13 +4,15 @@ import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.PieceMessages;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMessage, List<Integer>>
public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMessage, Set<Integer>>
{
private final NetworkFile file;
private final int startPiece;
@ -18,9 +20,9 @@ public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMe
private final int pieceMod;
private FilePieceAccess filePieceAccess;
private int expectedPieceCount = 0;
// private List<Long> expectedPieces = new ArrayList<>();
private final List<Integer> receivedPieces = new ArrayList<>();
// private int expectedPieceCount = 0;
private Set<Integer> expectedPieces = new HashSet<>();
private final Set<Integer> receivedPieces = new HashSet<>();
public FilePiecesRequest(NetworkFile file, int startPiece, int pieceCount, int pieceMod)
{
@ -33,7 +35,7 @@ public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMe
@Override
public PieceMessages.FilePiecesRequestMessage buildMessage()
{
expectedPieceCount = Math.toIntExact(pieceCount / pieceMod);
// expectedPieceCount = Math.toIntExact(pieceCount / pieceMod);
return PieceMessages.FilePiecesRequestMessage.newBuilder()
.setFileId(file.getObjectID().toLong())
.setPieceCount(pieceCount)
@ -51,29 +53,34 @@ public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMe
{
if (reply.is(PieceMessages.FilePieceMessage.class))
{
expectedPieceCount--;
PieceMessages.FilePieceMessage filePieceMessage = reply.unpack(PieceMessages.FilePieceMessage.class);
byte[] buffer = filePieceMessage.getData().toByteArray();
int index = Math.toIntExact(filePieceMessage.getPieceIndex());
if (filePieceAccess == null)
filePieceAccess = new FilePieceAccess(file);
filePieceAccess = new FilePieceAccess(file, FilePieceAccess.OpenMode.READ_WRITE);
filePieceAccess.writePiece((int) index, buffer);
receivedPieces.add(index);
}
else if (reply.is(CommonMessages.MultiObjectConfirmationMessage.class))
{
CommonMessages.MultiObjectConfirmationMessage msg = reply.unpack(CommonMessages.MultiObjectConfirmationMessage.class);
expectedPieces = msg.getExpectedReturnIdList().stream().map(Math::toIntExact).collect(Collectors.toSet());
}
} catch (IOException ex)
{
future.completeExceptionally(ex);
return true;
}
if (expectedPieceCount <= 0)
if (receivedPieces.equals(expectedPieces))
{
future.complete(receivedPieces);
return true;
}
return expectedPieceCount == 0;
return false;
}
}

View file

@ -0,0 +1,67 @@
package moe.nekojimi.friendcloud.network.requests;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.HashSet;
import java.util.Set;
public class ObjectChangeRequest extends Request<ObjectStatements.ObjectChangeRequest, Set<ObjectChangeRecord>>
{
private final Set<Long> changesSinceIDs;
private Set<Long> expectedIDs = new HashSet<>();
private final Set<Long> receivedIDs = new HashSet<>();
private final Set<ObjectChangeRecord> receivedRecords = new HashSet<>();
public ObjectChangeRequest(Set<Long> changesSinceIDs)
{
this.changesSinceIDs = changesSinceIDs;
}
@Override
public ObjectStatements.ObjectChangeRequest buildMessage()
{
return ObjectStatements.ObjectChangeRequest.newBuilder().addAllChangesSince(changesSinceIDs).build();
}
@Override
public boolean handleReply(Any reply) throws InvalidProtocolBufferException
{
if (super.handleReply(reply))
return true;
if (reply.is(ObjectStatements.ObjectChangeMessage.class))
{
ObjectStatements.ObjectChangeMessage objectChangeMessage = reply.unpack(ObjectStatements.ObjectChangeMessage.class);
ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(objectChangeMessage);
receivedRecords.add(objectChangeRecord);
}
else if (reply.is(CommonMessages.MultiObjectConfirmationMessage.class))
{
CommonMessages.MultiObjectConfirmationMessage msg = reply.unpack(CommonMessages.MultiObjectConfirmationMessage.class);
expectedIDs = new HashSet<>(msg.getExpectedReturnIdList());
}
else if (reply.is(ObjectStatements.ObjectChangeListMessage.class))
{
ObjectStatements.ObjectChangeListMessage msg = reply.unpack(ObjectStatements.ObjectChangeListMessage.class);
for (ObjectStatements.ObjectChangeMessage m : msg.getChangeMessagesList())
{
ObjectChangeRecord objectChangeRecord = ObjectChangeRecord.createFromChangeMessage(m);
receivedRecords.add(objectChangeRecord);
}
future.complete(receivedRecords);
return true;
}
if (receivedIDs.equals(expectedIDs))
{
future.complete(receivedRecords);
return true;
}
return false;
}
}

View file

@ -2,8 +2,8 @@ package moe.nekojimi.friendcloud.network.requests;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.IOException;
@ -11,7 +11,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, List<NetworkObject>>
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, List<NetworkObject>>
{
private final Set<ObjectStatements.ObjectType> types;
@ -37,14 +37,15 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
{
ObjectStatements.ObjectList objectList = reply.unpack(ObjectStatements.ObjectList.class);
System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
// System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
List<NetworkObject> ret = new ArrayList<>();
for (ObjectStatements.ObjectState objectState : objectList.getStatesList())
{
System.out.println("Received state of object " + objectState.getObjectId());
NetworkObject object = Model.getInstance().getOrCreateObject(new NetworkObject.ObjectID(objectState.getObjectId()));
ObjectID objectID = new ObjectID(objectState.getObjectId());
System.out.println("ObjectListRequest: Received state of object " + objectID);
NetworkObject object = NetworkObject.createByID(objectID);
object.updateFromStateMessage(objectState);
ret.add(object);
}

View file

@ -13,12 +13,18 @@ public abstract class Request<MessageType extends Message, ReturnType>
public abstract MessageType buildMessage();
/**
* Handle a message that was received in reply to the request message.
* @param reply the reply message. May be one of many.
* @return true if no further replies are expected (or able to be processed), false if more replies are coming.
* @throws InvalidProtocolBufferException if reply cannot be decoded.
*/
public boolean handleReply(Any reply) throws InvalidProtocolBufferException
{
if (reply.is(CommonMessages.ErrorMessage.class))
{
CommonMessages.ErrorMessage errorMessage = reply.unpack(CommonMessages.ErrorMessage.class);
future.completeExceptionally(new RuntimeException("Request received error response: " + errorMessage.getError().name()));
future.completeExceptionally(new RequestReceivedErrorException(errorMessage));
return true;
}
return false;

View file

@ -0,0 +1,19 @@
package moe.nekojimi.friendcloud.network.requests;
import moe.nekojimi.friendcloud.protos.CommonMessages;
public class RequestReceivedErrorException extends Exception
{
private final CommonMessages.ErrorMessage errorMessage;
public RequestReceivedErrorException(CommonMessages.ErrorMessage errorMessage)
{
super("Request received error:" + errorMessage.getError().name() + "; " + errorMessage.getText());
this.errorMessage = errorMessage;
}
public CommonMessages.ErrorMessage getErrorMessage()
{
return errorMessage;
}
}

View file

@ -1,8 +1,10 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.Map;
public abstract class NetworkFSNode extends NetworkObject
{
// private String path = "";
@ -15,16 +17,16 @@ public abstract class NetworkFSNode extends NetworkObject
}
@Override
public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state)
public synchronized void updateFromMessageMap(Map<String,String> after, Map<String,String> before)
{
super.updateFromStateMessage(state);
if (state.containsValues("name"))
name = state.getValuesOrThrow("name");
if (state.containsValues("parent"))
super.updateFromMessageMap(after,before);
if (after.containsKey("name"))
name = after.get("name");
if (after.containsKey("parent"))
{
long parentID = Long.parseLong(state.getValuesOrThrow("parent"));
long parentID = Long.parseLong(after.get("parent"));
if (parentID != 0)
parent = (NetworkFolder) Model.getInstance().getOrCreateObject(new ObjectID(parentID));
parent = Main.getInstance().getModel().getObject(new ObjectID(parentID));
else
parent = null;
}
@ -34,7 +36,24 @@ public abstract class NetworkFSNode extends NetworkObject
public ObjectStatements.ObjectState.Builder buildObjectState()
{
return super.buildObjectState()
.putValues("name", getName());
.putValues("name", getName())
.putValues("parent", parent != null ? Long.toString(parent.getStorageID()) : "0");
}
@Override
public Map<String, Object> getStateMap()
{
Map<String, Object> ret = super.getStateMap();
ret.put("name", name);
ret.put("parent", parent != null ? parent.getStorageID() : 0L);
return ret;
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
name = map.get("name").toString();
parent = Main.getInstance().getModel().getObject(new ObjectID(((Number)map.get("parent")).longValue()));
}
public String getName()
@ -47,8 +66,24 @@ public abstract class NetworkFSNode extends NetworkObject
this.name = name;
}
@Override
public String getFriendlyName()
{
return getName();
}
public String getNetworkPath()
{
return (parent != null ? parent.getNetworkPath() : "") + "/" + name;
}
public NetworkFolder getParent()
{
return parent;
}
public void setParent(NetworkFolder newParent)
{
parent = newParent;
}
}

View file

@ -1,25 +1,21 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HexFormat;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.*;
import java.util.stream.Collectors;
public class NetworkFile extends NetworkFSNode
{
private static final int MIN_PIECE_SIZE = 0x400; // 1KiB
private static final int MAX_PIECE_SIZE = 0x100000; // 1 MiB
private static final int MAX_PIECE_SIZE = 0x100000; // 1MiB
private static final int IDEAL_PIECE_COUNT = 1024;
private static File tempDirectory = null;
private long size = 0;
private long pieceSize = 0;
@ -28,10 +24,10 @@ public class NetworkFile extends NetworkFSNode
private File localFile = null;
private final Map<Peer, PeerFileState> fileStates = new HashMap<>();
private final SortedSet<ObjectID> peersWithCopy = new TreeSet<>();
// private final Map<Peer, PeerFileState> fileStates = new HashMap<>();
private BitSet pieces = new BitSet();
private static File tempDirectory = null;
// private List<FilePiece> pieces = new ArrayList<>();
public NetworkFile(ObjectID objectID)
@ -44,7 +40,6 @@ public class NetworkFile extends NetworkFSNode
this.localFile = localFile;
name = localFile.getName();
size = localFile.length();
pieceSize = MIN_PIECE_SIZE;
for (pieceSize = MIN_PIECE_SIZE; pieceSize < MAX_PIECE_SIZE; pieceSize *= 2)
{
long pieceCount = size / pieceSize;
@ -52,75 +47,42 @@ public class NetworkFile extends NetworkFSNode
break;
}
pieces = new BitSet(Math.toIntExact(getPieceCount()));
Util.HashOutput hashOutput = Util.hashFile(localFile, pieceSize);
pieces = hashOutput.pieces;
hash = hashOutput.totalDigest;
long offset = 0L;
System.out.println("Calculating hashes for file " + localFile.getName() + "(Piece size: " + pieceSize + ")");
try (FileInputStream input = new FileInputStream(localFile))
System.out.println();
setLocalFile(localFile);
size = localFile.length();
if (pieces.cardinality() >= getPieceCount())
{
MessageDigest totalDigest = MessageDigest.getInstance("SHA-256");
byte[] pieceBuf = new byte[Math.toIntExact(pieceSize)];
int pieceIdx = 0;
// List<FilePiece> pieces = new ArrayList<>();
while(true)
{
int bytesRead = input.read(pieceBuf);
if (bytesRead <= 0)
break;
// check to see if this piece is just zeroes, if so, assume it's a missing piece
boolean allZero = true;
for (byte b: pieceBuf)
{
if (b != 0)
{
allZero = false;
break;
}
}
pieces.set(pieceIdx, !allZero);
// MessageDigest pieceDigest = MessageDigest.getInstance("SHA-256");
// pieceDigest.update(pieceBuf, 0, bytesRead);
// byte[] pieceHash = pieceDigest.digest();
totalDigest.update(pieceBuf, 0, bytesRead);
// FilePiece piece = new FilePiece(pieceHash, bytesRead, localFile, offset);
// System.out.print(HexFormat.of().formatHex(pieceHash) + ", ");
// pieces.add(piece);
pieceIdx++;
}
System.out.println();
setLocalFile(localFile);
size = localFile.length();
// setPieces(pieces);
setHash(totalDigest.digest());
System.out.println("Total hash: " + HexFormat.of().formatHex(hash));
System.out.println("Have " + pieces.cardinality() + " of " + getPieceCount() + " pieces.");
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
peersWithCopy.add(Main.getInstance().getModel().getLocalPeer().getObjectID());
}
}
@Override
public void updateFromStateMessage(ObjectStatements.ObjectState state)
public synchronized void updateFromMessageMap(Map<String, String> after, Map<String, String> before)
{
super.updateFromStateMessage(state);
// if (state.containsValues("path"))
// path = state.getValuesOrThrow("path");
if (state.containsValues("size"))
size = Long.parseLong(state.getValuesOrThrow("size"));
if (state.containsValues("hash"))
hash = HexFormat.of().parseHex(state.getValuesOrThrow("hash"));
if (state.containsValues("pieceSize"))
pieceSize = Long.parseLong(state.getValuesOrThrow("pieceSize"));
super.updateFromMessageMap(after, before);
if (after.containsKey("size"))
size = Long.parseLong(after.get("size"));
if (after.containsKey("hash"))
hash = HexFormat.of().parseHex(after.get("hash"));
if (after.containsKey("pieceSize"))
pieceSize = Long.parseLong(after.get("pieceSize"));
if (after.containsKey("peersWithCopy"))
{
peersWithCopy.clear();
String[] peers = after.get("peersWithCopy").split(",");
for (String peer: peers)
{
peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16)));
}
}
}
@Override
public ObjectStatements.ObjectState.Builder buildObjectState()
{
@ -128,7 +90,39 @@ public class NetworkFile extends NetworkFSNode
// .putValues("path", path)
.putValues("size", Long.toString(size))
.putValues("hash", HexFormat.of().formatHex(hash))
.putValues("pieceSize", Long.toString(pieceSize));
.putValues("pieceSize", Long.toString(pieceSize))
.putValues("peersWithCopy", peersWithCopy.stream().map(ObjectID::toLong).map(Long::toHexString).collect(Collectors.joining(",")));
}
@Override
public Map<String, Object> getStateMap()
{
Map<String, Object> ret = super.getStateMap();
ret.put("size", size);
ret.put("hash", HexFormat.of().formatHex(hash));
ret.put("pieceSize", pieceSize);
ret.put("pieces", Arrays.stream(pieces.toLongArray()).boxed().toList());
ret.put("localFile", localFile != null ? localFile.getAbsolutePath() : "");
ret.put("peersWithCopy", peersWithCopy.stream().map(ObjectID::toLong).toList());
return ret;
}
@SuppressWarnings("unchecked")
@Override
public void updateFromStateMap(Map<String, Object> map)
{
super.updateFromStateMap(map);
size = ((Number) map.get("size")).longValue();
hash = HexFormat.of().parseHex((CharSequence) map.get("hash"));
pieceSize = ((Number) map.get("pieceSize")).longValue();
pieces = BitSet.valueOf(((ArrayList<Number>) map.get("pieces")).stream().mapToLong(Number::longValue).toArray());
String localFilePath = (String) map.get("localFile");
if (localFilePath.isEmpty())
localFile = null;
else
localFile = new File(localFilePath);
peersWithCopy.clear();
peersWithCopy.addAll(((List<Object>)map.get("peersWithCopy")).stream().map(Util::unconditionalNumberToLong).map(ObjectID::new).toList());
}
public File getLocalFile()
@ -177,12 +171,7 @@ public class NetworkFile extends NetworkFSNode
return pieceSize;
}
private void setPieceSize(long pieceSize)
{
this.pieceSize = pieceSize;
}
// public List<FilePiece> getPieces()
// public List<FilePiece> getPieces()
// {
// return pieces;
// }
@ -197,11 +186,6 @@ public class NetworkFile extends NetworkFSNode
return hash;
}
private void setHash(byte[] hash)
{
this.hash = hash;
}
public int getPieceCount()
{
return Math.toIntExact(Math.ceilDiv(size, pieceSize));
@ -217,7 +201,7 @@ public class NetworkFile extends NetworkFSNode
synchronized (pieces)
{
pieces.set(pieceIdx, has);
pieces.notifyAll();
notifyPieceWaiters();
}
}
@ -231,14 +215,24 @@ public class NetworkFile extends NetworkFSNode
return size;
}
void addFileState(PeerFileState peerFileState)
// void addFileState(PeerFileState peerFileState)
// {
// fileStates.put(peerFileState.getNode(), peerFileState);
// }
//
// public Map<Peer, PeerFileState> getFileStates()
// {
// return fileStates;
// }
public void addPeerWithCopy(Peer selfPeer)
{
fileStates.put(peerFileState.getNode(), peerFileState);
peersWithCopy.add(selfPeer.getObjectID());
}
public Map<Peer, PeerFileState> getFileStates()
public List<Peer> getPeersWithCopy()
{
return fileStates;
return peersWithCopy.stream().map(objectID -> (Peer) Main.getInstance().getModel().getObject(objectID)).toList();
}
/**
@ -277,6 +271,19 @@ public class NetworkFile extends NetworkFSNode
return false;
}
public void notifyPieceWaiters()
{
synchronized (pieces)
{
pieces.notifyAll();
}
}
public boolean hasLocalFile()
{
return localFile != null;
}
public enum StorageType
{
/** The file will be stored as a complete file in the storage directory under it's own name and file path. */

View file

@ -2,13 +2,10 @@ package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.WeakHashMap;
import java.util.Map;
public class NetworkFolder extends NetworkFSNode
{
// private final SortedSet<ObjectID> children = new TreeSet<>((a,b)->Long.compare(a.getId(),b.getId()));
public NetworkFolder(ObjectID objectID)
{
@ -18,8 +15,18 @@ public class NetworkFolder extends NetworkFSNode
@Override
public ObjectStatements.ObjectState.Builder buildObjectState()
{
return super.buildObjectState().putValues("name", name);
return super.buildObjectState();
}
@Override
public Map<String, Object> getStateMap()
{
return super.getStateMap();
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
super.updateFromStateMap(map);
}
}

View file

@ -1,10 +1,15 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.Storable;
import org.jetbrains.annotations.NotNull;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public abstract class NetworkObject
public abstract class NetworkObject implements Storable, Comparable<NetworkObject>
{
private final ObjectID objectID;
@ -13,20 +18,62 @@ public abstract class NetworkObject
this.objectID = objectID;
}
public static NetworkObject createByID(ObjectID objectID)
{
ObjectStatements.ObjectType type = objectID.getType();
return switch (type)
{
case OBJECT_TYPE_PEER -> new Peer(objectID);
case OBJECT_TYPE_FILE -> new NetworkFile(objectID);
case OBJECT_TYPE_FOLDER -> new NetworkFolder(objectID);
default -> throw new IllegalArgumentException("Unrecognised object type!");
};
}
public ObjectID getObjectID()
{
return objectID;
}
public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state)
@Override
public long getStorageID()
{
return getObjectID().toLong();
}
@Override
public Map<String, Object> getStateMap()
{
Map<String, Object> ret = new HashMap<>();
ret.put("id", objectID.toLong());
return ret;
}
public final void updateFromChange(ObjectChangeRecord.Change change)
{
updateFromMessageMap(change.afterValues(), change.beforeValues());
}
private synchronized void updateFromMessageMap(Map<String,String> currentValues)
{
updateFromMessageMap(currentValues,Map.of());
}
protected synchronized void updateFromMessageMap(Map<String,String> currentValues, Map<String,String> beforeValues)
{
}
public synchronized final void updateFromStateMessage(ObjectStatements.ObjectState state)
{
if (state.getObjectId() != objectID.toLong())
throw new IllegalArgumentException("Wrong object!");
updateFromMessageMap(state.getValuesMap());
}
public synchronized ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectState a, ObjectStatements.ObjectState b)
public synchronized ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectChange a, ObjectStatements.ObjectChange b)
{
return null;
throw new UnsupportedOperationException("NYI");
}
public ObjectStatements.ObjectState.Builder buildObjectState()
@ -35,67 +82,31 @@ public abstract class NetworkObject
.setObjectId(objectID.toLong());
}
public static class ObjectID
@Override
public int compareTo(@NotNull NetworkObject networkObject)
{
private final ObjectStatements.ObjectType type;
private final int systemID;
private final int uniqueID;
return Long.compare(getObjectID().toLong(), networkObject.getObjectID().toLong());
}
public ObjectID(long id)
{
uniqueID = (int)(0x00000000_FFFFFFFFL & id);
systemID = Math.toIntExact((0x00FFFFFF_00000000L & id) >>> 32);
type = ObjectStatements.ObjectType.forNumber(Math.toIntExact(((0xFF000000_00000000L & id) >>> 56)));
}
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
NetworkObject that = (NetworkObject) o;
return Objects.equals(objectID, that.objectID);
}
public ObjectID(ObjectStatements.ObjectType type, int systemID, int uniqueID)
{
this.type = type;
this.systemID = systemID;
this.uniqueID = uniqueID;
}
@Override
public int hashCode()
{
return Objects.hashCode(objectID);
}
public long toLong()
{
long uniquePart = Integer.toUnsignedLong(uniqueID);
long systemPart = Integer.toUnsignedLong(systemID) << 32;
long typePart = ((long) type.getNumber()) << 56;
return typePart | systemPart | uniquePart;
}
public abstract String getFriendlyName();
public ObjectStatements.ObjectType getType()
{
return type;
}
public int getSystemID()
{
return systemID;
}
public int getUniqueID()
{
return uniqueID;
}
@Override
public String toString()
{
return "OBJ{" + Long.toHexString(toLong()) + "}";
}
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
ObjectID objectID = (ObjectID) o;
return systemID == objectID.systemID && uniqueID == objectID.uniqueID && type == objectID.type;
}
@Override
public int hashCode()
{
return Objects.hash(type, systemID, uniqueID);
}
@Override
public String toString()
{
return this.getClass().getName() + "{" + objectID.toString() + " (" + getFriendlyName() + ")" + "}";
}
}

View file

@ -0,0 +1,86 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import org.jetbrains.annotations.NotNull;
import java.util.Objects;
public class ObjectID implements Comparable<ObjectID>
{
private final moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType type;
private final int systemID;
private final int uniqueID;
public ObjectID(long id)
{
uniqueID = (int) (0x00000000_FFFFFFFFL & id);
systemID = Math.toIntExact((0x00FFFFFF_00000000L & id) >>> 32);
type = moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType.forNumber(Math.toIntExact(((0xFF000000_00000000L & id) >>> 56)));
}
public ObjectID(moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType type, int systemID, int uniqueID)
{
this.type = type;
this.systemID = systemID;
this.uniqueID = uniqueID;
}
public long toLong()
{
long uniquePart = Integer.toUnsignedLong(uniqueID);
long systemPart = Integer.toUnsignedLong(systemID) << 32;
long typePart = ((long) type.getNumber()) << 56;
return typePart | systemPart | uniquePart;
}
public static ObjectID nullValue()
{
return new ObjectID(0L);
}
public moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType getType()
{
return type;
}
public int getSystemID()
{
return systemID;
}
public int getUniqueID()
{
return uniqueID;
}
@Override
public String toString()
{
return "OBJ{" + Integer.toHexString(type.getNumber()) + "-" + Integer.toHexString(systemID) + "-" + Integer.toHexString(uniqueID) + "}";
}
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
ObjectID objectID = (ObjectID) o;
return systemID == objectID.systemID && uniqueID == objectID.uniqueID && type == objectID.type;
}
@Override
public int hashCode()
{
return Objects.hash(type, systemID, uniqueID);
}
@Override
public int compareTo(@NotNull ObjectID objectID)
{
return Long.compare(toLong(), objectID.toLong());
}
public boolean isNull()
{
return type == ObjectStatements.ObjectType.OBJECT_TYPE_UNSPECIFIED;
}
}

View file

@ -4,22 +4,15 @@ import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
public class Peer extends NetworkObject
{
private final List<URI> addresses = new ArrayList<>();
private final SortedSet<URI> addresses = new TreeSet<>();
private String userName = "";
private String systemName = "";
private Map<NetworkFile, PeerFileState> fileStates = new HashMap<>();
private volatile int lastTriedAddressIdx = -1;
public Peer(ObjectID objectID)
{
super(objectID);
@ -30,20 +23,19 @@ public class Peer extends NetworkObject
return userName + "@" + systemName;
}
@Override
public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state)
{
super.updateFromStateMessage(state);
@Override
protected synchronized void updateFromMessageMap(Map<String, String> after, Map<String, String> before)
{
super.updateFromMessageMap(after, before);
Map<String,String> values = state.getValuesMap();
if (values.containsKey("userName"))
userName = values.get("userName");
if (values.containsKey("systemName"))
systemName = values.get("systemName");
if (values.containsKey("addresses"))
if (after.containsKey("userName"))
userName = after.get("userName");
if (after.containsKey("systemName"))
systemName = after.get("systemName");
if (after.containsKey("addresses"))
{
addresses.clear();
String[] split = values.get("addresses").split(",");
String[] split = after.get("addresses").split(",");
for (String s: split)
{
try
@ -55,8 +47,7 @@ public class Peer extends NetworkObject
}
}
}
// if (values.containsKey("files"))
}
}
@Override
public ObjectStatements.ObjectState.Builder buildObjectState()
@ -73,30 +64,47 @@ public class Peer extends NetworkObject
return builder;
}
public void addAddress(URI address)
@Override
public Map<String, Object> getStateMap()
{
Map<String, Object> ret = super.getStateMap();
ret.put("userName", userName);
ret.put("systemName", systemName);
ret.put("addresses", addresses);
return ret;
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
userName = map.get("userName").toString();
systemName = map.get("systemName").toString();
addresses.clear();
addresses.addAll((Collection<? extends URI>) map.get("addresses"));
}
public void addAddress(URI address)
{
addresses.add(address);
}
public URI getNextAddress()
{
lastTriedAddressIdx++;
if (lastTriedAddressIdx >= addresses.size())
lastTriedAddressIdx = 0;
return addresses.get(lastTriedAddressIdx);
}
// void addFileState(PeerFileState peerFileState)
// {
// fileStates.put(peerFileState.getFile(), peerFileState);
// }
//
// public Map<NetworkFile, PeerFileState> getFileStates()
// {
// return fileStates;
// }
void addFileState(PeerFileState peerFileState)
public void setAddresses(Collection<URI> urIs)
{
fileStates.put(peerFileState.getFile(), peerFileState);
addresses.clear();
addresses.addAll(urIs);
}
public Map<NetworkFile, PeerFileState> getFileStates()
{
return fileStates;
}
public List<URI> getAddresses()
public SortedSet<URI> getAddresses()
{
return addresses;
}
@ -120,4 +128,10 @@ public class Peer extends NetworkObject
{
this.systemName = systemName;
}
@Override
public String getFriendlyName()
{
return getNodeName();
}
}

View file

@ -1,75 +0,0 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.Model;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
public class PeerFileState extends NetworkObject
{
private Peer peer;
private NetworkFile file;
private double progress = 0;
public PeerFileState(ObjectID objectID)
{
super(objectID);
}
@Override
public void updateFromStateMessage(ObjectStatements.ObjectState state)
{
super.updateFromStateMessage(state);
peer = (Peer) Model.getInstance().getOrCreateObject(new ObjectID(Long.parseLong(state.getValuesOrThrow("peer"))));
file = (NetworkFile) Model.getInstance().getOrCreateObject(new ObjectID(Long.parseLong(state.getValuesOrThrow("file"))));
if (state.containsValues("progress"))
progress = Double.parseDouble(state.getValuesOrThrow("progress"));
peer.addFileState(this);
file.addFileState(this);
}
@Override
public ObjectStatements.ObjectState mergeChanges(ObjectStatements.ObjectState a, ObjectStatements.ObjectState b)
{
return super.mergeChanges(a, b);
}
@Override
public ObjectStatements.ObjectState.Builder buildObjectState()
{
return super.buildObjectState()
.putValues("peer", Long.toString(peer.getObjectID().toLong()))
.putValues("file", Long.toString(file.getObjectID().toLong()))
.putValues("progress", Double.toString(progress));
}
public void setNode(Peer peer)
{
this.peer = peer;
}
public void setFile(NetworkFile file)
{
this.file = file;
}
public double getProgress()
{
return progress;
}
public void setProgress(double progress)
{
this.progress = progress;
}
public NetworkFile getFile()
{
return file;
}
public Peer getNode()
{
return peer;
}
}

View file

@ -0,0 +1,108 @@
package moe.nekojimi.friendcloud.storage;
import java.util.*;
public class CachingDataStore extends DataStore
{
private final DataStore backend;
public CachingDataStore(DataStore backend)
{
this.backend = backend;
}
@Override
public synchronized <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz)
{
if (daos.containsKey(clazz))
//noinspection unchecked
return (DAO<T>) daos.get(clazz);
else
{
CachingDAO<T> ret = new CachingDAO<>(clazz);
daos.put(clazz, ret);
return ret;
}
}
@Override
public void clear()
{
daos.clear();
backend.clear();
}
@Override
public FSNodeDAO getFSDAO()
{
return backend.getFSDAO();
}
private final Map<Class<? extends Storable>, CachingDAO<?>> daos = new HashMap<>();
public class CachingDAO<T extends Storable> implements DAO<T>
{
private final DAO<T> backendDao;
private final WeakHashMap<Long, T> cache = new WeakHashMap<>();
public CachingDAO(Class<T> clazz)
{
this.backendDao = backend.getDAOForClass(clazz);
}
@Override
public List<Long> list()
{
return backendDao.list();
}
@Override
public synchronized List<T> getAll()
{
List<T> ret = new ArrayList<>();
for (T t : backendDao.getAll())
{
if (!cache.containsKey(t.getStorageID()))
{
ret.add(t);
cache.put(t.getStorageID(), t);
}
else
{
ret.add(cache.get(t.getStorageID()));
}
}
return ret;
}
@Override
public boolean exists(long id)
{
return backendDao.exists(id);
}
@Override
public T create(long id)
{
T t = backendDao.create(id);
cache.put(id, t);
return t;
}
@Override
public T get(long id)
{
if (!cache.containsKey(id))
cache.put(id, backendDao.get(id));
return cache.get(id);
}
@Override
public void update(T object)
{
long id = object.getStorageID();
backendDao.update(object);
cache.put(id, object);
}
}
}

View file

@ -0,0 +1,100 @@
package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.objects.NetworkFSNode;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public abstract class DataStore
{
public abstract <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz);
public abstract FSNodeDAO getFSDAO();
public abstract void clear();
public interface DAO<T extends Storable>
{
default boolean exists(long id) {return list().contains(id);}
default List<Long> list() {return getAll().stream().map(Storable::getStorageID).toList();}
List<T> getAll();
T create(long id);
T get(long id);
default T getOrCreate(long id)
{
T old = get(id);
if (old != null)
return old;
return create(id);
}
void update(T object);
}
public interface FSNodeDAO extends DAO<NetworkFSNode>
{
default List<NetworkFSNode> getPathChildren(String path)
{
System.err.println("WARNING: using violently cursed implementation of getPathChildren, make a better one now!!!");
return getAll().stream().filter(networkFSNode -> networkFSNode.getNetworkPath().startsWith(path)).toList();
}
}
public class ClassFusionDAO<T extends Storable> implements DAO<T>
{
private final Set<Class<? extends T>> subclasses;
@SafeVarargs
public ClassFusionDAO(Class<? extends T>... subclasses)
{
this.subclasses = Set.of(subclasses);
}
@Override
public List<Long> list()
{
List<Long> ret = new ArrayList<>();
for (Class<? extends T> subclass : subclasses)
ret.addAll(getDAOForClass(subclass).list());
return ret;
}
@Override
public List<T> getAll()
{
List<T> ret = new ArrayList<>();
for (Class<? extends T> subclass : subclasses)
ret.addAll(getDAOForClass(subclass).getAll());
return ret;
}
@Override
public boolean exists(long id)
{
for (Class<? extends T> subclass : subclasses)
if (getDAOForClass(subclass).exists(id))
return true;
return false;
}
@Override
public T create(long id)
{
throw new UnsupportedOperationException("That type's abstract, can't create it");
}
@Override
public T get(long id)
{
for (Class<? extends T> subclass : subclasses)
if (getDAOForClass(subclass).exists(id))
return getDAOForClass(subclass).get(id);
return null;
}
@Override
public void update(T object)
{
throw new UnsupportedOperationException("That type's abstract, can't update it");
}
}
}

View file

@ -0,0 +1,69 @@
package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.util.Map;
public class LocalData implements Storable
{
private Peer localPeer;
private ObjectChangeRecord currentChangeRecord;
private int systemID;
public Peer getLocalPeer()
{
return localPeer;
}
public void setLocalPeer(Peer localPeer)
{
this.localPeer = localPeer;
}
public ObjectChangeRecord getCurrentChangeRecord()
{
return currentChangeRecord;
}
public void setCurrentChangeRecord(ObjectChangeRecord currentChangeRecord)
{
this.currentChangeRecord = currentChangeRecord;
}
public int getSystemID()
{
return systemID;
}
public void setSystemID(int systemID)
{
this.systemID = systemID;
}
@Override
public long getStorageID()
{
return 0;
}
@Override
public Map<String, Object> getStateMap()
{
return Map.of("localPeer", localPeer == null ? 0L : localPeer.getObjectID().toLong(),
"currentChangeRecord", currentChangeRecord == null ? 0L : currentChangeRecord.getChangeID(),
"systemID", systemID);
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
localPeer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.getOrDefault("localPeer",0))));
currentChangeRecord = Main.getInstance().getModel().getChangeRecord(Util.unconditionalNumberToLong(map.getOrDefault("currentChangeRecord",0)));
systemID = (int) map.getOrDefault("systemID", 0);
System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + Long.toHexString(currentChangeRecord.getChangeID()) + ", systemID=" + Integer.toHexString(systemID));
}
}

View file

@ -0,0 +1,29 @@
package moe.nekojimi.friendcloud.storage;
import java.util.Map;
public interface Storable
{
/**
* Gets the unique ID used for persisting the object on disk. Must remain constant for an object's lifetime.
* @return the unique ID.
*/
long getStorageID();
// /**
// * Gets a constant string specifying the namespace that the ID numbers returned by getStorageID() are unique within. Defaults to the class name.
// * @return the namespace name.
// */
// default String getStorageNamespace()
// {
// return this.getClass().getName().toLowerCase();
// }
/**
* Gets a map representing all serializable (i.e. to be saved) fields of this object, by name.
* Fields may be any of the following types: String, Integer, Long, Double, Collection\<X\>, Boolean, Map\<String, X\>
*/
Map<String, Object> getStateMap();
void updateFromStateMap(Map<String, Object> map);
}

View file

@ -0,0 +1,413 @@
package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.objects.*;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
import org.json.JSONObject;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.*;
public class StupidJSONFileStore extends DataStore
{
private final File storageDirectory;
private final Map<Class<? extends Storable>, DAO<? extends Storable>> daos = new HashMap<>();
public StupidJSONFileStore(File storageDirectory)
{
this.storageDirectory = storageDirectory;
if (!storageDirectory.exists())
{
if (!storageDirectory.mkdirs())
throw new RuntimeException("Unable to create storage directory " + storageDirectory.getAbsolutePath());
}
}
@Override
public void clear()
{
daos.clear();
storageDirectory.delete();
storageDirectory.mkdirs();
}
@Override
public <T extends Storable> DAO<T> getDAOForClass(Class<T> clazz)
{
if (daos.containsKey(clazz))
return (DAO<T>) daos.get(clazz);
DAO<?> ret;
if (clazz.equals(NetworkFile.class))
ret = new NetworkFileDAO();
else if (clazz.equals(NetworkFolder.class))
ret = new NetworkFolderDAO();
else if (clazz.equals(Peer.class))
ret = new PeerDAO();
else if (clazz.equals(NetworkFSNode.class))
ret = new NetworkFSNodeDAO();
else if (clazz.equals(LocalData.class))
ret = new LocalDataDAO();
else if (clazz.equals(ObjectChangeRecord.class))
ret = new ObjectChangeRecordDAO();
else
throw new UnsupportedOperationException("Requested DAO for unsupported type " + clazz.getCanonicalName());
daos.put(clazz, ret);
return (DAO<T>) ret;
}
@Override
public FSNodeDAO getFSDAO()
{
return new NetworkFSNodeDAO();
}
private abstract class JSONObjectDAO<T extends Storable> implements DAO<T>
{
protected final Set<@NotNull Class<?>> VALID_JSON_CLASSES = Set.of(new Class[]{Boolean.class, Double.class, Integer.class, Long.class, String.class, JSONArray.class, JSONObject.class, JSONObject.NULL.getClass()});
protected abstract String getNamespace();
protected File getNamespaceDirectory()
{
File ret = new File(storageDirectory, getNamespace());
if (!ret.exists())
ret.mkdirs();
assert (ret.exists() && ret.isDirectory());
return ret;
}
protected abstract T makeBlank(long id);
protected T jsonToObject(JSONObject json)
{
long id = json.getLong("storageID");
T ret = makeBlank(id);
ret.updateFromStateMap(jsonToMap(json));
return ret;
}
protected JSONObject objectToJson(T object)
{
Map<String, Object> stateMap = object.getStateMap();
return mapToJson(stateMap).put("storageID", object.getStorageID());
}
private JSONObject mapToJson(Map<String, Object> stateMap)
{
JSONObject ret = new JSONObject();
for (Map.Entry<String, Object> e : stateMap.entrySet())
{
Class<?> valueClass = e.getValue().getClass();
if (VALID_JSON_CLASSES.contains(valueClass))
ret.put(e.getKey(), e.getValue());
else if (Collection.class.isAssignableFrom(valueClass))
{
Collection<?> valueCollection = (Collection<?>) e.getValue();
List<Object> writeList = new ArrayList<>(valueCollection.size());
for (Object item: valueCollection)
if (VALID_JSON_CLASSES.contains(item.getClass()))
writeList.add(item);
else
writeList.add(serialiseWeirdObject(item));
ret.put(e.getKey(), writeList);
}
else if (Map.class.isAssignableFrom(valueClass))
{
Map<String, Object> valueMap = (Map<String, Object>) e.getValue();
return mapToJson(valueMap);
}
else
{
ret.put(e.getKey(), serialiseWeirdObject(e.getValue()));
}
}
return ret;
}
private Map<String,Object> jsonToMap(JSONObject json)
{
Map<String, Object> ret = new HashMap<>();
for (String key : json.keySet())
{
ret.put(key,deserialiseSomething(json.get(key)));
}
return ret;
}
protected Object deserialiseSomething(Object value)
{
Object ret = value;
if (value instanceof JSONObject)
{
JSONObject valueJsonObject = (JSONObject) value;
if (valueJsonObject.has("weirdObjectClass"))
ret = deserialiseWeirdObject(valueJsonObject);
else
ret = jsonToMap(valueJsonObject);
}
else if (value instanceof JSONArray)
{
JSONArray valueJsonArray = (JSONArray) value;
List<Object> readList = new ArrayList<Object>(valueJsonArray.length());
for (int i = 0; i < valueJsonArray.length(); i++)
{
Object item = deserialiseSomething(valueJsonArray.get(i));
readList.add(item);
}
return readList;
}
return ret;
}
protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException
{
throw new IllegalArgumentException("Don't know how to serialise a " + value.getClass().getCanonicalName() );
}
protected Object deserialiseWeirdObject(JSONObject json)
{
throw new IllegalArgumentException("Don't know how to deserialise that.");
}
@Override
public boolean exists(long id)
{
File file = new File(getNamespaceDirectory(), Long.toHexString(id) + ".json");
return file.exists();
}
@Override
public List<Long> list()
{
List<Long> ret = new ArrayList<>();
// get all files in the storage directory
for (File file : Objects.requireNonNull(getNamespaceDirectory().listFiles()))
{
String name = file.getName();
String nameWithoutExt = name.substring(0, name.indexOf('.'));
ret.add(Long.parseLong(nameWithoutExt, 16));
}
return ret;
}
@Override
public List<T> getAll()
{
List<T> ret = new ArrayList<>();
// get all files in the storage directory
for (File file : Objects.requireNonNull(getNamespaceDirectory().listFiles()))
{
try
{
JSONObject json = new JSONObject(Files.readString(file.toPath()));
ret.add(jsonToObject(json));
} catch (IOException e)
{
e.printStackTrace(System.err);
}
}
return ret;
}
@Override
public T create(long id)
{
T ret = makeBlank(id);
update(ret);
return ret;
}
@Override
public T get(long id)
{
File file = new File(getNamespaceDirectory(), Long.toHexString(id) + ".json");
file.getParentFile().mkdirs();
try
{
JSONObject json = new JSONObject(Files.readString(file.toPath()));
return jsonToObject(json);
} catch (NoSuchFileException ex)
{
System.err.println("JSONFileStore: failed to find object with ID=" + id + ", expected in " + file.getAbsolutePath());
return null;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
@Override
public void update(T object)
{
File file = new File(getNamespaceDirectory(), Long.toHexString(object.getStorageID()) + ".json");
file.getParentFile().mkdirs();
try(FileWriter writer = new FileWriter(file, false))
{
objectToJson(object).write(writer);
writer.flush();
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
private abstract class NetworkObjectDAO<T extends NetworkObject> extends JSONObjectDAO<T>
{
@Override
protected String getNamespace()
{
return "networkObjects";
}
}
private class NetworkFSNodeDAO extends ClassFusionDAO<NetworkFSNode> implements FSNodeDAO
{
public NetworkFSNodeDAO()
{
super(NetworkFile.class, NetworkFolder.class);
}
}
private class NetworkFileDAO extends NetworkObjectDAO<NetworkFile>
{
@Override
protected String getNamespace()
{
return super.getNamespace() + "/files";
}
@Override
protected NetworkFile makeBlank(long id)
{
return new NetworkFile(new ObjectID(id));
}
}
private class NetworkFolderDAO extends NetworkObjectDAO<NetworkFolder>
{
@Override
protected String getNamespace()
{
return super.getNamespace() + "/folders";
}
@Override
protected NetworkFolder makeBlank(long id)
{
return new NetworkFolder(new ObjectID(id));
}
}
private class PeerDAO extends NetworkObjectDAO<Peer>
{
@Override
protected String getNamespace()
{
return super.getNamespace() + "/peers";
}
@Override
protected Peer makeBlank(long id)
{
return new Peer(new ObjectID(id));
}
@Override
protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException
{
if (value instanceof URI uri)
{
return new JSONObject().put("weirdObjectClass", URI.class.getCanonicalName()).put("uri",uri.toString());
}
return super.serialiseWeirdObject(value);
}
@Override
protected Object deserialiseWeirdObject(JSONObject json)
{
String weirdType = json.getString("weirdObjectClass");
try
{
Class<?> weirdClass = Class.forName(weirdType);
if (weirdClass == URI.class)
return new URI(json.getString("uri"));
} catch (ClassNotFoundException e)
{
throw new IllegalArgumentException(e);
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
return super.deserialiseWeirdObject(json);
}
}
private class LocalDataDAO extends JSONObjectDAO<LocalData>
{
@Override
protected String getNamespace()
{
return "localData";
}
@Override
protected LocalData makeBlank(long id)
{
return new LocalData();
}
}
private class ObjectChangeRecordDAO extends JSONObjectDAO<ObjectChangeRecord>
{
@Override
protected String getNamespace()
{
return "changes";
}
@Override
protected ObjectChangeRecord makeBlank(long id)
{
return new ObjectChangeRecord();
}
@Override
protected Object deserialiseWeirdObject(JSONObject json)
{
String weirdType = json.getString("weirdObjectClass");
// Class<?> weirdClass = Class.forName(weirdType);
if (weirdType.equals(ObjectChangeRecord.Change.class.getCanonicalName()))
{
return new ObjectChangeRecord.Change(
new ObjectID(json.getLong("objectID")),
Util.stringifyMap(json.getJSONObject("before").toMap()),
Util.stringifyMap(json.getJSONObject("after").toMap()));
}
return super.deserialiseWeirdObject(json);
}
@Override
protected JSONObject serialiseWeirdObject(Object value) throws IllegalArgumentException
{
if (value instanceof ObjectChangeRecord.Change change)
{
JSONObject ret = new JSONObject();
ret.put("weirdObjectClass", ObjectChangeRecord.Change.class.getCanonicalName());
ret.put("objectID", change.objectID().toLong());
ret.put("before", change.beforeValues());
ret.put("after", change.afterValues());
return ret;
}
return super.serialiseWeirdObject(value);
}
}
}

View file

@ -1,11 +1,13 @@
package moe.nekojimi.friendcloud;
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeTransaction;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.objects.PeerFileState;
import org.jetbrains.annotations.NotNull;
import java.io.File;
@ -16,9 +18,9 @@ import java.util.concurrent.*;
public class FileDownloadTask implements RunnableFuture<File>
{
private final NetworkFile file;
private final ConnectionManager manager;
private final ConnectionManager connectionManager;
private final long timeoutPerPieceMs = 10_000;
private final long timeoutPerPieceMs = 1_000;
private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128;
private final SortedSet<Integer> missingPieceIndices = new TreeSet<>();
@ -27,20 +29,20 @@ public class FileDownloadTask implements RunnableFuture<File>
private boolean failed = false;
private final Object waitObject = new Object();
public FileDownloadTask(NetworkFile file, ConnectionManager manager)
public FileDownloadTask(NetworkFile file, ConnectionManager connectionManager)
{
this.file = file;
this.manager = manager;
this.connectionManager = connectionManager;
for (int i = 0; i < file.getPieceCount(); i++)
{
missingPieceIndices.add(i);
}
}
public FileDownloadTask(NetworkFile file, ConnectionManager manager, SortedSet<Integer> missingPieces)
public FileDownloadTask(NetworkFile file, ConnectionManager connectionManager, SortedSet<Integer> missingPieces)
{
this.file = file;
this.manager = manager;
this.connectionManager = connectionManager;
missingPieceIndices.addAll(missingPieces);
}
@ -52,38 +54,46 @@ public class FileDownloadTask implements RunnableFuture<File>
@Override
public void run()
{
System.out.println("Starting download of file " + file.getName());
System.out.println("Starting download of file " + file.getName() + " (pieces: " + missingPieceIndices + ")");
// NotificationManager.Notification notification = Main.getInstance().getNotificationManager().createNotification("Streaming " + file.getName(), "Starting download...", NotificationManager.NotificationType.TRANSFER_IN_PROGRESS);
int startingPieces = missingPieceIndices.size();
String connectionLine = "";
String progressLine = "";
Peer selfPeer = Main.getInstance().getModel().getLocalPeer();
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
{
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces.");
Map<Peer, PeerFileState> fileStates = file.getFileStates();
// Map<Peer, PeerFileState> fileStates = file.getFileStates();
// determine what nodes we can connect to
List<PeerConnection> connections = new ArrayList<>();
for (PeerFileState peerFileState : fileStates.values())
for (Peer peer : file.getPeersWithCopy())
{
if (peerFileState.getProgress() >= 100.0)
if (peer == selfPeer)
continue; // yeah that's us dipshit
PeerConnection connection = connectionManager.getNodeConnection(peer);
if (connection != null)
{
try
{
PeerConnection connection = manager.getNodeConnection(peerFileState.getNode());
System.out.println("FileDownloadTask: Will download from " + peerFileState.getNode().getNodeName());
connections.add(connection);
} catch (IOException ex)
{
System.err.println("Failed to connect to peer " + peerFileState.getNode().getNodeName() + ": " + ex.getMessage());
}
System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
connections.add(connection);
}
}
// connectionLine = "Connected to " + connections.size() + " peers.";
// notification.setBody(connectionLine + "\n" + progressLine);
// connectionLine = "Connected to " + connections.stream().map(PeerConnection::getNode).map(Peer::getNodeName).collect(Collectors.joining(", "));
// shuffle the connections list
Collections.shuffle(connections);
if (connections.isEmpty())
{
System.err.println("FileDownloadTask: No peers have the file, download failed!");
file.notifyPieceWaiters();
failed = true;
break;
}
@ -110,11 +120,11 @@ public class FileDownloadTask implements RunnableFuture<File>
System.out.println("FileDownloadTask: Will download pieces from " + runStart + " to " + runEnd);
// make one request per connectable peer, striping the needed pieces among them
List<CompletableFuture<List<Integer>>> fileFutures = new ArrayList<>();
List<CompletableFuture<Set<Integer>>> fileFutures = new ArrayList<>();
int offset = 0;
for (PeerConnection connection : connections)
{
CompletableFuture<List<Integer>> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size()));
CompletableFuture<Set<Integer>> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size()));
fileFutures.add(future);
offset++;
}
@ -122,12 +132,12 @@ public class FileDownloadTask implements RunnableFuture<File>
long timeout = timeoutPerPieceMs * (missingPieceIndices.size() / connections.size());
// wait for all the requests to complete
for (CompletableFuture<List<Integer>> future : fileFutures)
for (CompletableFuture<Set<Integer>> future : fileFutures)
{
try
{
List<Integer> receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS);
receivedPieces.forEach(missingPieceIndices::remove);
Set<Integer> receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS);
missingPieceIndices.removeAll(receivedPieces);
} catch (InterruptedException e)
{
future.cancel(true);
@ -135,11 +145,29 @@ public class FileDownloadTask implements RunnableFuture<File>
System.err.println("FileDownloadTask: Request timed out.");
} catch (ExecutionException | TimeoutException e)
{
future.cancel(true);
e.printStackTrace(System.err);
}
// progressLine = "Have " + (startingPieces - missingPieceIndices.size()) + " / " + missingPieceIndices.size() + " pieces. (" + file.getDownloadPercentage() + "%)";
// notification.setBody(connectionLine + "\n" + progressLine);
}
}
if (file.getDownloadPercentage() >= 100.0)
{
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID()))
{
transaction.addObjectBeforeChange(file);
file.addPeerWithCopy(selfPeer);
}
catch (IOException ignored)
{
}
}
// notification.setBody("Finished downloading!");
System.out.println("FileDownloadTask: finished downloading " + file.getName() + "!");
done = true;
synchronized (waitObject)

View file

@ -0,0 +1,77 @@
package moe.nekojimi.friendcloud.tasks;
import com.kstruct.gethostname4j.Hostname;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeTransaction;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.Controller;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class JoinNetworkTask implements Runnable
{
@Override
public void run()
{
System.out.println("JoinNetworkTask: Joining the network!");
Controller controller = Main.getInstance().getModel();
boolean firstJoin = false;
ObjectID peerID;
if (controller.getLocalPeer() != null)
peerID = controller.getLocalPeer().getObjectID();
else
{
peerID = controller.getNextObjectID(ObjectStatements.ObjectType.OBJECT_TYPE_PEER);
firstJoin = true;
}
ConnectionManager connectionManager = Main.getInstance().getConnectionManager();
if (firstJoin)
{
System.out.println("JoinNetworkTask: Performing first time setup...");
// download the entire state
PullStateTask pullStateTask = new PullStateTask();
Future<?> future = Main.getInstance().getExecutor().submit(pullStateTask);
try
{
future.get(30, TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e)
{
// throw new RuntimeException(e);
} catch (ExecutionException e)
{
throw new RuntimeException(e);
}
// create our local peer object
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID))
{
// create and submit our Peer object if it doesn't exist
Peer selfPeer = controller.getLocalData().getLocalPeer();
if (selfPeer == null)
{
selfPeer = controller.createLocalPeer(peerID);
selfPeer.setUserName(System.getProperty("user.name"));
String hostname = Hostname.getHostname();
selfPeer.setSystemName(hostname);
selfPeer.setAddresses(connectionManager.getURIs());
controller.objectChanged(selfPeer);
controller.getLocalData().setLocalPeer(selfPeer);
transaction.addNewlyCreatedObject(selfPeer);
}
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
}
}

View file

@ -0,0 +1,42 @@
package moe.nekojimi.friendcloud.tasks;
import com.google.protobuf.Message;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException;
public class PropagateMessageTask implements Runnable
{
private final Message message;
public PropagateMessageTask(Message message)
{
this.message = message;
}
@Override
public void run()
{
ConnectionManager connectionManager = Main.getInstance().getConnectionManager();
int messagesSent = 0;
for (Peer peer: Main.getInstance().getModel().listOtherPeers())
{
try
{
PeerConnection connection = connectionManager.getNodeConnection(peer);
if (connection != null)
{
connection.sendUnsolicitedMessage(message);
messagesSent++;
}
} catch (IOException e)
{
throw new RuntimeException(e);
}
}
System.out.println("PropagateMessageTask: Sent " + message.getDescriptorForType().getFullName() + " to " + messagesSent + " peers.");
}
}

View file

@ -0,0 +1,95 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
import moe.nekojimi.friendcloud.network.requests.RequestReceivedErrorException;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class PullChangesTask implements Callable<Boolean>
{
private final Set<Peer> peers;
public PullChangesTask()
{
this(Main.getInstance().getModel().listOtherPeers());
}
public PullChangesTask(Set<Peer> peers)
{
this.peers = peers;
}
@Override
public Boolean call()
{
// for each other peer:
List<CompletableFuture<PullResult>> futures = new ArrayList<>();
Set<Long> changesSinceIDs = Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet());
System.out.println("PullChangesTask: Requesting changes since: " + changesSinceIDs.stream().map(Long::toHexString).collect(Collectors.toSet()));
for (Peer peer : peers)
{
System.out.println("PullChangesTask: Attempting to pull changes from " + peer);
// open a connection
PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
if (connection == null)
continue;
// send a ObjectChangeRequest
ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(changesSinceIDs);
CompletableFuture<PullResult> future = connection.makeRequest(objectChangeRequest)
.handle((changes, ex) ->
{
// integrate the returned changes with our change graph
if (ex == null)
{
Main.getInstance().getModel().applyChangeRecords(changes);
return PullResult.OK;
}
else
{
if (ex instanceof RequestReceivedErrorException re)
{
if (re.getErrorMessage().getError() == CommonMessages.Error.ERROR_END_OF_HISTORY)
{
return PullResult.END_OF_HISTORY;
}
}
ex.printStackTrace(System.err);
return PullResult.FAILED;
}
});
futures.add(future);
}
List<PullResult> results = Util.collectFutures(futures).join();
long peersGotChanges = results.stream().filter(pullResult -> pullResult == PullResult.OK).count();
long peersAtEndOfHistory = results.stream().filter(pullResult -> pullResult == PullResult.END_OF_HISTORY).count();
// if no peers could be contacted:
if (peersGotChanges == 0)
{
// if everyone reported end of history, we aren't synced (need to do a state pull)
// otherwise everyone else is offline so we might as well be synced (we'll cause a fork if we change anything but that's fine)
return peersAtEndOfHistory <= 0;
}
return true;
}
protected enum PullResult
{
OK,
END_OF_HISTORY,
FAILED;
}
}

View file

@ -0,0 +1,106 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
import moe.nekojimi.friendcloud.network.requests.ObjectListRequest;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
/**
* Task to clear our entire local state and re-download it from peers.
*/
public class PullStateTask implements Runnable
{
@Override
public void run()
{
System.out.println("PullStateTask: Pulling state from peers...");
Set<PeerConnection> connections = new HashSet<>();
for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers())
{
String[] split = knownPeerAddress.split(":");
if (split.length != 2)
{
System.err.println("ERROR: " + knownPeerAddress + " isn't a valid address.");
continue;
}
InetSocketAddress address = new InetSocketAddress(split[0], Integer.parseInt(split[1]));
try
{
URI uri = new URI("tcp", null, address.getHostString(), address.getPort(), null, null, null);
PeerConnection nodeConnection = Main.getInstance().getConnectionManager().getNodeConnection(uri);
if (nodeConnection != null)
{
connections.add(nodeConnection);
}
} catch (URISyntaxException | IOException e)
{
throw new RuntimeException(e);
}
}
for (Peer peer : Main.getInstance().getModel().listOtherPeers())
{
PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
if (connection != null)
{
connections.add(connection);
}
}
if (connections.isEmpty())
{
// if we can't connect to anyone, don't replace our state since we can't get a new one
System.out.println("PullStateTask: Have no peers to connect to, giving up.");
return;
}
System.out.println("PullStateTask: Have " + connections.size() + " peers to get state from.");
Main.getInstance().getModel().clearEverything();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (PeerConnection connection : connections)
{
futures.add(connection.makeRequest(new ObjectListRequest(Set.of(
ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects ->
{
System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects.");
for (NetworkObject object : networkObjects)
{
Main.getInstance().getModel().addNetworkObject(object);
}
}));
futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
{
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
}));
}
Util.collectFutures(futures).join();
}
}

View file

@ -15,14 +15,19 @@ message FriendCloudMessage {
google.protobuf.Any body = 2;
}
message LoginMessage {
message HelloMessage {
uint32 protocol_version = 1; // this is the version of the FriendCloud protocol I speak
}
message CheckInMessage {
repeated uint64 current_change_heads = 1;
}
enum Error {
ERROR_UNSPECIFIED = 0;
ERROR_WHO_THE_FUCK_ARE_YOU = 1; // sender unidentified or unauthenticated
ERROR_PERMISSION_DENIED = 2; // you can't do that
ERROR_OBJECT_NOT_FOUND = 3; // one or more object(s) specified don't exist
ERROR_OBJECT_NOT_FOUND = 3; // one or more object(s) referenced don't exist
ERROR_INTERNAL = 4; // internal error
ERROR_OUT_OF_DATE = 5; // your action is impossible because you have an out-of-date state (in a way that matters)
ERROR_CHECKSUM_FAILURE = 6; // a supplied checksum didn't match the relevant data
@ -30,14 +35,15 @@ enum Error {
ERROR_INVALID_ARGUMENT = 8; // an argument specified is outside the expected range
ERROR_NOT_EXPECTING_REPLY = 9; // you sent a reply to a message that I wasn't expecting a reply to
ERROR_INVALID_PROTOBUF = 10; // your message couldn't be decoded at all
ERROR_END_OF_HISTORY = 11; // you're referencing a change ID that I've forgotten (or never had)
ERROR_MESSAGE_BODY_UNKNOWN = 12; // I don't know how to handle the type of message in your message body; please stop sending that one
}
message ErrorMessage {
Error error = 1;
string text = 2;
}
message PingMessage {
}
message PongMessage {
message MultiObjectConfirmationMessage {
repeated uint64 expected_return_id = 1;
}

View file

@ -10,7 +10,6 @@ enum ObjectType {
OBJECT_TYPE_PEER = 2;
OBJECT_TYPE_FILE = 3;
OBJECT_TYPE_FOLDER = 4;
OBJECT_TYPE_PEER_FILE_STATE = 5;
}
message ObjectState {
@ -29,9 +28,21 @@ message ObjectStateRequest {
}
message ObjectChange {
uint64 object_id = 1;
map<string,string> before = 3;
map<string,string> after = 4;
}
message ObjectChangeMessage {
uint64 change_id = 1;
repeated uint64 change_heads = 2;
repeated ObjectState states = 3;
repeated ObjectChange changes = 3;
uint64 creator_id = 4;
uint64 timestamp_ms = 5;
}
message ObjectChangeListMessage {
repeated ObjectChangeMessage change_messages = 1;
}
message ObjectChangeRequest {
@ -39,7 +50,7 @@ message ObjectChangeRequest {
}
message ObjectList {
uint64 change_heads = 1;
uint64 change_head = 1;
repeated ObjectState states = 2;
}

View file

@ -1,9 +0,0 @@
syntax = "proto3";
option java_package = "moe.nekojimi.friendcloud.protos";
message SearchRequest {
string query = 1;
int32 page_number = 2;
int32 results_per_page = 3;
}