Compare commits

...

9 commits

19 changed files with 471 additions and 176 deletions

54
README.md Normal file
View file

@ -0,0 +1,54 @@
# FriendCloud (aka SamoyedShare?)
🐕☁🐕☁🐕☁🐕☁🐕
FriendCloud is a decentralised mesh-network-based file-sharing platform for you and your friends!
This repository is for the **backend** (or the reference implementation, at least). See also the [repository for the frontend]().
⚠ Currently FriendCloud is in SUPER WOBBLY TURBO-ALPHA. Don't use this! Your files may crumble into dust! Security is an unknown concept, still but a theory in the mind's-eye of divine astrologers! The database is but a collection of JSON files in a worm-infested crypt, left weak and vulnerable to the ravages of entropy!
# About FriendCloud
- It's like NextCloud; you have your own private space for you and your friends to store files, but it's decentralised and no one person controls it!
- It's like Syncthing; you can sync your files among all your friend's devices, but not everyone needs to have a copy of every file!
- It's like Git; file and object changes are tracked and merged, but (mostly) automagically!
- It's like Bittorrent; files are distributed among a bunch of peers, but you can view and open them with your normal file browser!
- It's like nothing anyone's ever made! Probably for good reason!
(For clarity, FriendCloud is not intended as a replacement for the above services, and is aiming for a slightly different use-case)
# Features
FriendCloud currently supports:
- Connecting to a bunch of other peers and synchronizing state (i.e. what peers and files exist)
- Mounting the virtual filesystem using FUSE (on Linux & Windows)
- Downloading/streaming files by opening them in your file browser
- Creating cloud folders and moving files in your file browser
- Zero security of any kind!!
Currently planned:
- Open API so people can write their own frontend/backend clients if they want (probably better than mine)
- File monitoring/tracking
- Being able to securely invite your friends to the network
- Some kind of public-key-based encrypted transport/user authentication
- Optional remote file modification (other people can change the files you've shared)
- JavaFX-based frontend for sharing/managing the network
- File replication; peers can donate some of their disk space for automatically saving redundant copies of each other's files
- And possibly downloading files on each other's behalf, for peers that can't connect to each other (or are in differnt timezones)
- KIO integration, for KDE on Linux
- One-user, many-peer; you can somehow prove that you control multiple peers and they all share permissions so you can manage your files from each of them
Maybe one day:
- One-peer, many-user; multiple (system) user accounts on the same machine can all have their own FriendCloud user accounts on the same network, even though there's just the one peer (running as a system daemon)
- GIO integration, for GNOME on Linux
- Windows [this thing](https://learn.microsoft.com/en-us/windows/win32/shell/nse-works) for Wandow 5 Macro suport (it's 3am)
- Maybe we can implement CalDAV or something and you can sync your calendar through it or some shit. attack and dethrone NextCloud
- I had weird vague dreams of it reading the metadata of your files like artist, album and stuff and it would automatically organise them for you. but you can just do that manually
- make wplace in it.
- You can be a member of multiple different networks and have the same files shared to each of them
# Building
Git! Java! Maven! I believe in you!

View file

@ -154,6 +154,7 @@ public class Controller
public synchronized void addChangeRecord(ObjectChangeRecord record) public synchronized void addChangeRecord(ObjectChangeRecord record)
{ {
System.out.println("Controller: Adding change record " + record );
DataStore.DAO<ObjectChangeRecord> dao = dataStore.getDAOForClass(ObjectChangeRecord.class); DataStore.DAO<ObjectChangeRecord> dao = dataStore.getDAOForClass(ObjectChangeRecord.class);
dao.update(record); dao.update(record);
// update the change heads; if any of this change's heads are included in ours, then this change replaces them // update the change heads; if any of this change's heads are included in ours, then this change replaces them
@ -165,13 +166,13 @@ public class Controller
for (long changeHeadID : record.getChangeHeads()) for (long changeHeadID : record.getChangeHeads())
{ {
ObjectChangeRecord headRecord = dao.get(changeHeadID); ObjectChangeRecord headRecord = dao.get(changeHeadID);
recordIsNewHead = changeHeads.remove(headRecord); recordIsNewHead |= changeHeads.remove(headRecord);
} }
} }
if (recordIsNewHead) if (recordIsNewHead)
{ {
changeHeads.add(record); changeHeads.add(record);
System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet())); System.out.println("Controller: Change heads updated: " + changeHeads);
} }
} }
} }
@ -204,7 +205,7 @@ public class Controller
System.out.println("Controller: Applying change record " + record); System.out.println("Controller: Applying change record " + record);
long changeID = localData.getCurrentChangeRecord().getChangeID(); long changeID = localData.getCurrentChangeRecord().getChangeID();
if (!record.getChangeHeads().contains(changeID)) if (!record.getChangeHeads().contains(changeID))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID)); throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord());
addChangeRecord(record); addChangeRecord(record);
record.applyToLocalState(); record.applyToLocalState();
@ -256,7 +257,7 @@ public class Controller
changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll()); changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
Set<Long> referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet()); Set<Long> referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet());
changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID())); changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID()));
System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet())); System.out.println("Controller: Determined change heads to be " + changeHeads);
} }
return changeHeads; return changeHeads;
} }
@ -335,7 +336,7 @@ public class Controller
String path = newpath.substring(0, lastSlash); String path = newpath.substring(0, lastSlash);
if (path.isEmpty()) if (path.isEmpty())
path = "/"; path = "/";
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID())) try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID()))
{ {
transaction.addObjectBeforeChange(fsNode); transaction.addObjectBeforeChange(fsNode);
if (!name.equals(fsNode.getName())) if (!name.equals(fsNode.getName()))
@ -372,7 +373,7 @@ public class Controller
if (path.equals("/") || path.isEmpty()) if (path.equals("/") || path.isEmpty())
return null; return null;
NetworkFolder ret = null; NetworkFolder ret = null;
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID())) try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID()))
{ {
NetworkFSNode node = getFSNode(path); NetworkFSNode node = getFSNode(path);
if (node != null && !(node instanceof NetworkFolder)) if (node != null && !(node instanceof NetworkFolder))

View file

@ -5,10 +5,9 @@ import moe.nekojimi.friendcloud.objects.NetworkFile;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.time.Instant;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FileRemoteAccess public class FileRemoteAccess
{ {
@ -61,20 +60,22 @@ public class FileRemoteAccess
else else
piecesToDownload = neededPieces; piecesToDownload = neededPieces;
CompletableFuture<Void> downloadFuture = null;
if (!piecesToDownload.isEmpty()) if (!piecesToDownload.isEmpty())
{ {
System.out.println("FRA: will fetch pieces " + piecesToDownload); System.out.println("FRA: will fetch pieces " + piecesToDownload);
DownloadManager.getInstance().downloadPieces(file, piecesToDownload) downloadFuture = DownloadManager.getInstance().downloadPieces(file, piecesToDownload)
.thenRun(() -> { .thenRun(() ->
preemptiveDownloadInProgress = false; {
}); preemptiveDownloadInProgress = false;
});
// Main.getInstance().getExecutor().submit(futureTask); // Main.getInstance().getExecutor().submit(futureTask);
} }
if (!neededPieces.isEmpty()) if (!neededPieces.isEmpty())
{ {
boolean ok = waitForPieceRange(neededPieces, 1000); boolean ok = waitForPieceRange(neededPieces, 30_000, downloadFuture);
if (!ok) if (!ok)
{ {
System.err.println("FRA: timed out while waiting for pieces " + neededPieces); System.err.println("FRA: timed out while waiting for pieces " + neededPieces);
@ -101,13 +102,34 @@ public class FileRemoteAccess
return ret; return ret;
} }
private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs) private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs, CompletableFuture<Void> downloadFuture)
{ {
for (int pieceIdx : pieces) for (int pieceIdx : pieces)
{ {
boolean ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs); boolean ok = false;
if (!ok) while (pieceTimeoutMs > 0 && !ok)
return false; {
long waitStartTime = Instant.now().toEpochMilli();
try
{
ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs);
}
catch (InterruptedException ignored)
{
ok = false;
}
if (!ok)
{
if (downloadFuture != null && (downloadFuture.isCancelled() || downloadFuture.isCancelled()))
{
System.err.println("FRA: Download failed.");
return false;
}
}
long waitEndTime = Instant.now().toEpochMilli();
long waitTime = waitEndTime - waitStartTime;
pieceTimeoutMs -= waitTime;
}
} }
return true; return true;
} }

View file

@ -44,9 +44,20 @@ public class Main
public static void main(String[] argv) public static void main(String[] argv)
{ {
instance = new Main();
Args args = new Args(); Args args = new Args();
JCommander.newBuilder().addObject(args).build().parse(argv); JCommander jCommander = JCommander.newBuilder()
.addObject(args)
.programName("FriendCloud")
.build();
jCommander.parse(argv);
if (args.help)
{
jCommander.usage();
return;
}
instance = new Main();
instance.args = args; instance.args = args;
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info"); System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info");
@ -66,6 +77,7 @@ public class Main
private void run() throws IOException private void run() throws IOException
{ {
DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation)); DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation));
controller = new Controller(dataStore); controller = new Controller(dataStore);
@ -216,23 +228,32 @@ public class Main
@SuppressWarnings("FieldCanBeLocal") @SuppressWarnings("FieldCanBeLocal")
public static class Args public static class Args
{ {
@Parameter(names="-share") @Parameter(names="-help", help = true, description = "Display this help message and quit.")
private boolean help = false;
@Parameter(names="-share", description = "Add a file path to be shared once the application starts.")
private List<String> sharedFilePaths = new ArrayList<>(); private List<String> sharedFilePaths = new ArrayList<>();
@Parameter(names="-known-peer") @Parameter(names="-known-peer", description = "Add a URI (e.g. tcp://192.168.1.69:42069) of a known peer that will be connected to on first run.")
private List<String> knownPeers = new ArrayList<>(); private List<String> knownPeers = new ArrayList<>();
@Parameter(names="-tcp-port") @Parameter(names="-tcp-port", description = "The TCP port to listen for connections on.")
private int tcpPort = 7777; private int tcpPort = 7777;
@Parameter(names="-no-upnp") @Parameter(names="-advertise-address", description = "Manually add a URI (e.g. tcp://192.168.1.69:42069) that will be given to other peers to connect to this peer with. Useful to provide your global IP if UPnP doesn't work.")
private List<String> advertiseAddresses = new ArrayList<>();
@Parameter(names="-no-upnp", description = "Disables UPnP.")
private boolean noUpnp = false; private boolean noUpnp = false;
@Parameter(names="-create-network") // @Parameter(names="-create-network")
private boolean createNetwork = false; // private boolean createNetwork = false;
@Parameter(names = "-storage") @Parameter(names = "-storage", description = "The location on disk where the local copy of the state database will be stored.")
private String storageLocation = "."; private String storageLocation = "./storage";
@Parameter(names = "-artificial-lag", description = "Set to a value above 0 to introduce artificial delay when sending messages, in milliseconds. Use for testing only!")
private int artificialLagMs = 0;
public List<String> getSharedFilePaths() public List<String> getSharedFilePaths()
{ {
@ -254,14 +275,24 @@ public class Main
return noUpnp; return noUpnp;
} }
public boolean isCreateNetwork() // public boolean isCreateNetwork()
{ // {
return createNetwork; // return createNetwork;
} // }
public String getStorageLocation() public String getStorageLocation()
{ {
return storageLocation; return storageLocation;
} }
public List<String> getAdvertiseAddresses()
{
return advertiseAddresses;
}
public int getArtificialLagMs()
{
return artificialLagMs;
}
} }
} }

View file

@ -55,13 +55,27 @@ public class ObjectChangeRecord implements Storable
try try
{ {
MessageDigest digest = MessageDigest.getInstance("SHA-256"); MessageDigest digest = MessageDigest.getInstance("SHA-256");
return digest.digest(toString().getBytes(StandardCharsets.UTF_8)); return digest.digest(getCanonicalStringRepresentation().getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException e) } catch (NoSuchAlgorithmException e)
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
ObjectChangeRecord record = (ObjectChangeRecord) o;
return Objects.equals(getChangeID(), record.getChangeID());
}
@Override
public int hashCode()
{
return (int) getChangeID();
}
public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage() public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage()
{ {
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder(); ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
@ -108,7 +122,7 @@ public class ObjectChangeRecord implements Storable
} }
} }
public String toString() public String getCanonicalStringRepresentation()
{ {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";"); sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";");
@ -119,11 +133,21 @@ public class ObjectChangeRecord implements Storable
sb.append(";"); sb.append(";");
for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList()) for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList())
{ {
sb.append(change.toString()).append(";"); sb.append(change.getCanonicalStringRepresentation()).append(";");
} }
return sb.toString(); return sb.toString();
} }
@Override
public String toString()
{
return "Change{" + Long.toHexString(getChangeID()) +
", heads=" + changeHeads.stream().map(Long::toHexString).collect(Collectors.toSet()) +
", time=" + creationTime.toString() +
", creator=" + creatorPeer.toString() +
"}";
}
public long getChangeID() public long getChangeID()
{ {
MessageDigest digest = null; MessageDigest digest = null;
@ -134,7 +158,7 @@ public class ObjectChangeRecord implements Storable
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
String stringVal = toString(); String stringVal = getCanonicalStringRepresentation();
byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8)); byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8));
// System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal); // System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal);
return Util.xorBytesToLong(bytes); return Util.xorBytesToLong(bytes);
@ -195,7 +219,7 @@ public class ObjectChangeRecord implements Storable
return null; return null;
} }
public String toString() public String getCanonicalStringRepresentation()
{ {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append(objectID).append(";"); // The object ID, then ; sb.append(objectID).append(";"); // The object ID, then ;
@ -213,7 +237,6 @@ public class ObjectChangeRecord implements Storable
return sb.toString(); return sb.toString();
} }
public ObjectStatements.ObjectChange.Builder buildObjectChange() public ObjectStatements.ObjectChange.Builder buildObjectChange()
{ {
ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder(); ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();

View file

@ -1,6 +1,5 @@
package moe.nekojimi.friendcloud; package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
@ -16,22 +15,19 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class ObjectChangeTransaction implements AutoCloseable public class ObjectChangeTransaction implements AutoCloseable
{ {
private final ObjectID creator; private final ObjectID creator;
private final ConnectionManager connectionManager;
private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>(); private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
private static ObjectChangeTransaction currentTransaction = null; private static ObjectChangeTransaction currentTransaction = null;
private int openCount = 0; private int openCount = 0;
private boolean ended = false; private boolean ended = false;
private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator) private ObjectChangeTransaction(ObjectID creator)
{ {
this.creator = creator; this.creator = creator;
this.connectionManager = connectionManager;
System.out.println("ObjectChangeTransaction: opening transaction"); System.out.println("ObjectChangeTransaction: opening transaction");
@ -51,10 +47,10 @@ public class ObjectChangeTransaction implements AutoCloseable
} }
} }
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer) public static ObjectChangeTransaction startTransaction(ObjectID creatorPeer)
{ {
if (currentTransaction == null) if (currentTransaction == null)
currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer); currentTransaction = new ObjectChangeTransaction(creatorPeer);
currentTransaction.increaseOpenCount(); currentTransaction.increaseOpenCount();
return currentTransaction; return currentTransaction;
} }
@ -95,7 +91,8 @@ public class ObjectChangeTransaction implements AutoCloseable
if (changes.isEmpty()) if (changes.isEmpty())
return null; return null;
return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes); ObjectChangeRecord currentChangeRecord = Main.getInstance().getModel().getLocalData().getCurrentChangeRecord();
return ObjectChangeRecord.createFromChanges(creator, currentChangeRecord != null ? Set.of(currentChangeRecord.getChangeID()) : Set.of(), changes);
} }
public void commit() public void commit()

View file

@ -18,7 +18,7 @@ public class SharedFileManager
if (files.isEmpty()) if (files.isEmpty())
return; return;
Controller controller = Main.getInstance().getModel(); Controller controller = Main.getInstance().getModel();
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID())) try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(controller.getLocalPeer().getObjectID()))
{ {
List<NetworkObject> knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE)); List<NetworkObject> knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));

View file

@ -73,7 +73,7 @@ public class Util
} }
ret.totalDigest = totalDigest.digest(); ret.totalDigest = totalDigest.digest();
System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest)); System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest));
long pieceCount = file.length() / pieceSize; long pieceCount = Math.ceilDiv(file.length() , pieceSize);
System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces."); System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces.");
return ret; return ret;
} catch (NoSuchAlgorithmException | IOException e) } catch (NoSuchAlgorithmException | IOException e)

View file

@ -1,12 +1,17 @@
package moe.nekojimi.friendcloud.network; package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Stream;
public class ConnectionManager public class ConnectionManager
{ {
@ -31,17 +36,17 @@ public class ConnectionManager
} }
} }
public PeerConnection getNodeConnection(URI uri) throws IOException public PeerConnection getNodeConnection(URI uri)
{ {
return getNodeConnection(uri, null); return getNodeConnection(uri, null);
} }
public PeerConnection getNodeConnection(URI uri, Peer peer) throws IOException public PeerConnection getNodeConnection(URI uri, Peer peer)
{ {
purgeDeadConnections(); purgeDeadConnections();
for (PeerConnection peerConnection: activeConnections) for (PeerConnection peerConnection: activeConnections)
{ {
if (peerConnection.getUri() == uri) if (peerConnection.getUri().equals(uri))
return peerConnection; return peerConnection;
} }
@ -68,19 +73,19 @@ public class ConnectionManager
for (URI address : peer.getAddresses()) for (URI address : peer.getAddresses())
{ {
try PeerConnection connection = getNodeConnection(address, peer);
{ if (connection != null)
return getNodeConnection(address, peer); return connection;
}
catch (IOException ex)
{
System.err.println("ConnectionManager: Couldn't create PeerConnection to " + address + " : " + ex.getMessage());
}
} }
System.err.println("ConnectionManager: Failed to create PeerConnection to " + peer + "!"); System.err.println("ConnectionManager: Failed to create PeerConnection to " + peer + "!");
return null; return null;
} }
public CompletableFuture<Optional<PeerConnection>> getNodeConnectionAsync(Peer peer)
{
return CompletableFuture.supplyAsync(() -> Optional.ofNullable(getNodeConnection(peer)), Main.getInstance().getExecutor());
}
public void shutdown() public void shutdown()
{ {
for (ConnectionBackend<?> backend : backends.values()) for (ConnectionBackend<?> backend : backends.values())
@ -123,9 +128,24 @@ public class ConnectionManager
public List<URI> getURIs() public List<URI> getURIs()
{ {
return backends.values().stream() List<URI> ret = new ArrayList<>();
.filter(ConnectionBackend::isListening) for (String advertiseAddress : Main.getInstance().getArgs().getAdvertiseAddresses())
.flatMap(connectionBackend -> connectionBackend.getURIs().stream()) {
.toList(); try
{
URI uri = new URI(advertiseAddress);
ret.add(uri);
} catch (URISyntaxException e)
{
System.err.println("ERROR: " + advertiseAddress + " isn't a valid URI: " + e.getMessage());
}
}
for (ConnectionBackend<?> backend: backends.values())
{
if (!backend.isListening())
continue;
ret.addAll(backend.getURIs());
}
return ret;
} }
} }

View file

@ -2,6 +2,7 @@ package moe.nekojimi.friendcloud.network;
import com.google.protobuf.*; import com.google.protobuf.*;
import moe.nekojimi.friendcloud.Controller;
import moe.nekojimi.friendcloud.FilePieceAccess; import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord; import moe.nekojimi.friendcloud.ObjectChangeRecord;
@ -20,6 +21,7 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public abstract class PeerConnection extends Thread public abstract class PeerConnection extends Thread
{ {
@ -27,22 +29,18 @@ public abstract class PeerConnection extends Thread
private ObjectID peerID = new ObjectID(0); private ObjectID peerID = new ObjectID(0);
private long nextMessageId = 1; private long nextMessageId = 1;
private final URI uri; private final URI uri;
private long artificalDelayMs = 0; private long artificalDelayMs;
private final Map<String, MessageHandler<?>> messageHandlers = new HashMap<>(); private final Map<String, MessageHandler<?>> messageHandlers = new HashMap<>();
public PeerConnection() public PeerConnection(@NotNull URI uri)
{
this(null);
}
public PeerConnection(URI uri)
{ {
this.uri = uri; this.uri = uri;
installDefaultMessageHandlers(); installDefaultMessageHandlers();
artificalDelayMs = Main.getInstance().getArgs().getArtificialLagMs();
} }
public PeerConnection(URI uri, @NotNull ObjectID peerID) public PeerConnection(@NotNull URI uri, @NotNull ObjectID peerID)
{ {
this(uri); this(uri);
this.peerID = peerID; this.peerID = peerID;
@ -58,19 +56,18 @@ public abstract class PeerConnection extends Thread
public synchronized <T> CompletableFuture<T> makeRequest(Request<?, T> request) public synchronized <T> CompletableFuture<T> makeRequest(Request<?, T> request)
{ {
if (!isAlive()) if (!isAlive())
throw new IllegalStateException("Request made to PeerConnection that isn't running!"); throw new IllegalStateException("PeerConnection (" + getUri() + "): Request made to PeerConnection that isn't running!");
try try
{ {
Message message = request.buildMessage(); Message message = request.buildMessage();
CommonMessages.FriendCloudMessage wrappedMessage = wrapMessage(message); CommonMessages.FriendCloudMessage wrappedMessage = wrapMessage(message);
pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request);
sendMessage(wrappedMessage); sendMessage(wrappedMessage);
pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request);
return request.getFuture(); return request.getFuture();
} catch (Exception e) } catch (Exception e)
{ {
System.err.println("Request failed!"); System.err.println("PeerConnection (" + getUri() + "): Request failed!");
e.printStackTrace(System.err); e.printStackTrace(System.err);
return CompletableFuture.failedFuture(e); return CompletableFuture.failedFuture(e);
} }
@ -107,7 +104,7 @@ public abstract class PeerConnection extends Thread
private void replyWithError(CommonMessages.Error error, CommonMessages.MessageHeader replyHeader) throws IOException private void replyWithError(CommonMessages.Error error, CommonMessages.MessageHeader replyHeader) throws IOException
{ {
System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId()); System.err.println("PeerConnection (" + getUri() + "): Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build(); CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build();
sendMessage(wrapMessage(errorMessage, replyHeader)); sendMessage(wrapMessage(errorMessage, replyHeader));
} }
@ -118,7 +115,7 @@ public abstract class PeerConnection extends Thread
Any body = message.getBody(); Any body = message.getBody();
long replyToMessageId = header.getReplyToMessageId(); long replyToMessageId = header.getReplyToMessageId();
ObjectID senderID = new ObjectID(header.getSenderId()); ObjectID senderID = new ObjectID(header.getSenderId());
System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId); System.out.println("PeerConnection (" + getUri() + "): Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
try try
{ {
try try
@ -127,7 +124,7 @@ public abstract class PeerConnection extends Thread
{ {
try try
{ {
System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms..."); System.err.println("WARNING: artificial lag activated! Waiting " + artificalDelayMs + "ms...");
Thread.sleep(artificalDelayMs); Thread.sleep(artificalDelayMs);
} catch (InterruptedException e) } catch (InterruptedException e)
{ {
@ -137,9 +134,15 @@ public abstract class PeerConnection extends Thread
if (!senderID.isNull()) if (!senderID.isNull())
{ {
if (peerID.isNull()) Peer localPeer = Main.getInstance().getModel().getLocalPeer();
if (localPeer != null && Objects.equals(senderID, localPeer.getObjectID()))
{ {
System.out.println("PeerConnection: Identified sender as " + senderID); System.err.println("PeerConnection (" + getUri() + "): Connected to ourselves, terminating connection!");
shutdown();
}
else if (peerID.isNull())
{
System.out.println("PeerConnection (" + getUri() + "): Identified sender as " + senderID);
peerID = senderID; peerID = senderID;
} }
else else
@ -184,7 +187,7 @@ public abstract class PeerConnection extends Thread
private void handleErrorToUnsolicitedMessage(CommonMessages.MessageHeader header, CommonMessages.ErrorMessage body) private void handleErrorToUnsolicitedMessage(CommonMessages.MessageHeader header, CommonMessages.ErrorMessage body)
{ {
throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name()); throw new RuntimeException("PeerConnection (" + getUri() + "): Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
} }
@SuppressWarnings({"rawtypes", "unchecked"}) @SuppressWarnings({"rawtypes", "unchecked"})
@ -200,7 +203,7 @@ public abstract class PeerConnection extends Thread
} }
else else
{ {
System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!"); System.err.println("PeerConnection (" + getUri() + "): don't have a MessageHandler for message type " + typeUrl + "!");
replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header); replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header);
} }
} }
@ -208,7 +211,7 @@ public abstract class PeerConnection extends Thread
private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException
{ {
long replyToMessageId = header.getReplyToMessageId(); long replyToMessageId = header.getReplyToMessageId();
System.out.println("Received reply to message ID " + replyToMessageId); System.out.println("PeerConnection (" + getUri() + "): Received reply to message ID " + replyToMessageId);
Request<?, ?> request = pendingRequests.get(replyToMessageId); Request<?, ?> request = pendingRequests.get(replyToMessageId);
boolean doneWithRequest = request.handleReply(body); boolean doneWithRequest = request.handleReply(body);
if (doneWithRequest) if (doneWithRequest)
@ -236,14 +239,17 @@ public abstract class PeerConnection extends Thread
private void installDefaultMessageHandlers() private void installDefaultMessageHandlers()
{ {
Controller controller = Main.getInstance().getModel();
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class) installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class)
{ {
@Override @Override
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException
{ {
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList())); List<NetworkObject> objects = controller.listObjects(new HashSet<>(message.getTypesList()));
ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder(); ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
ObjectChangeRecord currentChangeRecord = controller.getLocalData().getCurrentChangeRecord();
objectList.setChangeHead(currentChangeRecord == null ? 0L : currentChangeRecord.getChangeID());
for (NetworkObject object : objects) for (NetworkObject object : objects)
{ {
objectList.addStates(object.buildObjectState()); objectList.addStates(object.buildObjectState());
@ -263,7 +269,7 @@ public abstract class PeerConnection extends Thread
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header); replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
} }
NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId())); NetworkFile networkFile = controller.getObject(new ObjectID(message.getFileId()));
if (networkFile == null) if (networkFile == null)
{ {
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header); replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
@ -273,7 +279,7 @@ public abstract class PeerConnection extends Thread
{ {
int startIndex = message.getStartPieceIndex(); int startIndex = message.getStartPieceIndex();
int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1; int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1;
System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex); System.out.println("PeerConnection (" + getUri() + "): Been asked for pieces from " + startIndex + " to " + endIndex);
List<Long> indices = new ArrayList<>(); List<Long> indices = new ArrayList<>();
for (int index = startIndex; index <= endIndex; index += message.getPieceMod()) for (int index = startIndex; index <= endIndex; index += message.getPieceMod())
{ {
@ -286,7 +292,7 @@ public abstract class PeerConnection extends Thread
byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index)); byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index));
if (buffer != null) if (buffer != null)
{ {
System.out.println("Replying to file piece request with piece " + index); System.out.println("PeerConnection (" + getUri() + "): Replying to file piece request with piece " + index);
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder() PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
.setPieceIndex(Math.toIntExact(index)) .setPieceIndex(Math.toIntExact(index))
.setFileId(networkFile.getObjectID().toLong()) .setFileId(networkFile.getObjectID().toLong())
@ -296,7 +302,7 @@ public abstract class PeerConnection extends Thread
} }
else else
{ {
System.err.println("Don't have requested piece " + index + "!"); System.err.println("PeerConnection (" + getUri() + "): Don't have requested piece " + index + "!");
replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header); replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
break; break;
} }
@ -310,8 +316,8 @@ public abstract class PeerConnection extends Thread
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException
{ {
List<Long> changesSinceList = message.getChangesSinceList(); List<Long> changesSinceList = message.getChangesSinceList();
System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString)); System.out.println("PeerConnection (" + getUri() + "): Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString).collect(Collectors.toSet()));
Set<ObjectChangeRecord> changes = Main.getInstance().getModel().findChangesSince(changesSinceList); Set<ObjectChangeRecord> changes = controller.findChangesSince(changesSinceList);
if (changes == null) if (changes == null)
{ {
replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header); replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header);
@ -323,7 +329,7 @@ public abstract class PeerConnection extends Thread
{ {
reply.addChangeMessages(change.buildObjectChangeMessage()); reply.addChangeMessages(change.buildObjectChangeMessage());
} }
System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes"); System.out.println("PeerConnection (" + getUri() + "): Replying with " + reply.getChangeMessagesCount() + " changes");
sendMessage(wrapMessage(reply.build(), header)); sendMessage(wrapMessage(reply.build(), header));
} }
} }
@ -334,19 +340,26 @@ public abstract class PeerConnection extends Thread
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message) protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message)
{ {
ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message); ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message);
Main.getInstance().getModel().applyChangeRecord(record); controller.applyChangeRecord(record);
} }
}); });
installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class) installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class)
{ {
@Override @Override
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message) protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message) throws IOException
{ {
List<Long> remoteChangeHeads = message.getCurrentChangeHeadsList(); Peer peer = controller.getObject(peerID);
if (peer != null)
{
peer.setLastKnownChangeID(message.getCurrentChange());
controller.objectChanged(peer);
}
Set<Long> remoteChangeHeads = new HashSet<>(message.getCurrentChangeHeadsList());
boolean potentialNewChanges = false; boolean potentialNewChanges = false;
for (long remoteChangeHead : remoteChangeHeads) for (long remoteChangeHead : remoteChangeHeads)
{ {
boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead); boolean exists = controller.getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
if (!exists) if (!exists)
{ {
potentialNewChanges = true; potentialNewChanges = true;
@ -355,8 +368,28 @@ public abstract class PeerConnection extends Thread
} }
if (potentialNewChanges) if (potentialNewChanges)
{ {
PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID))); if (peer != null)
Main.getInstance().getExecutor().submit(task); {
PullChangesTask task = new PullChangesTask(Set.of(peer));
Main.getInstance().getExecutor().submit(task);
}
}
else
{
Set<Long> changeHeadIDs = controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet());
// if there's no new changes then we know all the changes that the remote has
// so if they're not the same as our latest changes then they don't know about something we do
// so send them a checkin
boolean remoteOutOfDate = !changeHeadIDs.equals(remoteChangeHeads);
if (remoteOutOfDate)
{
CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder()
.addAllCurrentChangeHeads(changeHeadIDs)
.build();
sendMessage(wrapMessage(checkInMessage));
}
} }
} }
}); });

View file

@ -13,11 +13,10 @@ import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -30,22 +29,57 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
List<URI> getURIs() List<URI> getURIs()
{ {
List<URI> ret = new ArrayList<>(); List<URI> ret = new ArrayList<>();
ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort())); InetAddress inetAddress = serverSocket.getInetAddress();
ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort())); addAddressToList(inetAddress, ret);
try
{
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
for (Iterator<NetworkInterface> it = networkInterfaces.asIterator(); it.hasNext(); )
{
NetworkInterface networkInterface = it.next();
if (networkInterface.isLoopback())
continue;
if (!networkInterface.isUp())
continue;
for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses())
addAddressToList(interfaceAddress.getAddress(), ret);
}
} catch (SocketException e)
{
}
if (mappedPort != null) if (mappedPort != null)
{ {
ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort())); addAddressToList(mappedPort.getExternalAddress(), ret);
ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort()));
} }
System.out.println("Local addresses: " + ret);
return ret; return ret;
} }
/**
* utility method for processing URIs
*/
private void addAddressToList(InetAddress address, List<URI> list)
{
if (address.isAnyLocalAddress())
return;
String hostAddress = address.getHostAddress();
if (!hostAddress.isEmpty())
list.add(makeURI(hostAddress, serverSocket.getLocalPort()));
String hostName = address.getHostName();
if (!hostName.isEmpty() && !hostName.equals(hostAddress))
list.add(makeURI(hostName, serverSocket.getLocalPort()));
}
public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException
{ {
super("TCP Listen Thread", "tcp", connectionManager); super("TCP Listen Thread", "tcp", connectionManager);
serverSocket = new ServerSocket(port); serverSocket = new ServerSocket(port);
// setupIGP(port); if (!Main.getInstance().getArgs().isNoUpnp())
setupIGP(port);
} }
private void setupIGP(int port) private void setupIGP(int port)
@ -60,6 +94,9 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
// Discover port forwarding devices and take the first one found // Discover port forwarding devices and take the first one found
System.out.println("Discovering port mappers..."); System.out.println("Discovering port mappers...");
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus); List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);
if (mappers.isEmpty())
return;
; ;
PortMapper mapper = mappers.getFirst(); PortMapper mapper = mappers.getFirst();
System.out.println("Got mapper " + mapper + ", mapping port..."); System.out.println("Got mapper " + mapper + ", mapping port...");
@ -102,6 +139,7 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
protected TCPPeerConnection getConnection() throws IOException protected TCPPeerConnection getConnection() throws IOException
{ {
Socket socket = serverSocket.accept(); Socket socket = serverSocket.accept();
System.out.println("TCPConnectionBackend: Received connection from " + socket.getRemoteSocketAddress());
return new TCPPeerConnection(socket); return new TCPPeerConnection(socket);
} }

View file

@ -2,12 +2,12 @@ package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.CommonMessages; import moe.nekojimi.friendcloud.protos.CommonMessages;
import org.jetbrains.annotations.NotNull;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.*;
import java.net.URI;
public class TCPPeerConnection extends PeerConnection public class TCPPeerConnection extends PeerConnection
{ {
@ -18,24 +18,37 @@ public class TCPPeerConnection extends PeerConnection
{ {
super(tcpURL, peer); super(tcpURL, peer);
socket = new Socket(tcpURL.getHost(), tcpURL.getPort()); socket = new Socket(tcpURL.getHost(), tcpURL.getPort());
System.out.println("TCP Connection: connected to " + tcpURL + " OK!"); System.out.println("TCPPeerConnection: connected to " + tcpURL + " OK!");
} }
public TCPPeerConnection(Socket openSocket) public TCPPeerConnection(Socket openSocket)
{ {
super(); super(getSocketURI(openSocket.getInetAddress(), openSocket.getPort()));
socket = openSocket; socket = openSocket;
} }
private static URI getSocketURI(@NotNull InetAddress address, int port)
{
try
{
return new URI("tcp://" + address.getHostAddress() + ":" + port);
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
@Override @Override
public void run() public void run()
{ {
super.run(); super.run();
try try(InputStream inputStream = socket.getInputStream())
{ {
InputStream inputStream = socket.getInputStream(); socket.setKeepAlive(true);
while (!socket.isClosed()) socket.setSoTimeout(keepAliveTimeS * 1000);
while (!socket.isClosed() && !socket.isInputShutdown())
{ {
CommonMessages.FriendCloudMessage message = CommonMessages.FriendCloudMessage.parseDelimitedFrom(inputStream); CommonMessages.FriendCloudMessage message = CommonMessages.FriendCloudMessage.parseDelimitedFrom(inputStream);
// Any any = Any.parseDelimitedFrom(inputStream); // Any any = Any.parseDelimitedFrom(inputStream);
@ -45,22 +58,38 @@ public class TCPPeerConnection extends PeerConnection
messageReceived(message); messageReceived(message);
} }
} }
} catch (Exception ex) }
catch (SocketTimeoutException ex)
{
System.out.println("TCPPeerConnection (" + getUri() + "): Read timed out, closing connection.");
}
catch (Exception ex)
{ {
// fuck // fuck
ex.printStackTrace(System.err); ex.printStackTrace(System.err);
} }
System.out.println("TCP Connection: connection closed"); System.out.println("TCPPeerConnection (" + getUri() + "): connection closed");
shutdown();
} }
@Override @Override
protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException
{ {
OutputStream outputStream = socket.getOutputStream(); try
System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl()); {
message.writeDelimitedTo(outputStream); OutputStream outputStream = socket.getOutputStream();
outputStream.flush(); System.out.println("TCPPeerConnection (" + getUri() + "): Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
message.writeDelimitedTo(outputStream);
outputStream.flush();
}
catch (SocketException ex)
{
// handle this type of exception by closing the connection
System.err.println("TCPPeerConnection (" + getUri() + "): Failed to send, closing connection:" + ex.getMessage());
shutdown();
throw ex; // upper layer needs to know it failed
}
} }
@Override @Override
@ -69,9 +98,10 @@ public class TCPPeerConnection extends PeerConnection
try try
{ {
socket.close(); socket.close();
interrupt();
} catch (IOException e) } catch (IOException e)
{ {
System.err.println("TCPPeerConnection: failed to shut down!"); System.err.println("TCPPeerConnection (" + getUri() + "): failed to shut down!");
e.printStackTrace(System.err); e.printStackTrace(System.err);
} }
} }

View file

@ -11,7 +11,7 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, List<NetworkObject>> public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, ObjectListRequest.ObjectListRequestResult>
{ {
private final Set<ObjectStatements.ObjectType> types; private final Set<ObjectStatements.ObjectType> types;
@ -39,7 +39,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
// System.out.println("Received ObjectList, objects=" + objectList.getStatesList()); // System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
List<NetworkObject> ret = new ArrayList<>(); ObjectListRequestResult ret = new ObjectListRequestResult();
ret.objects = new ArrayList<>();
ret.changeID = objectList.getChangeHead();
for (ObjectStatements.ObjectState objectState : objectList.getStatesList()) for (ObjectStatements.ObjectState objectState : objectList.getStatesList())
{ {
@ -47,7 +50,7 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
System.out.println("ObjectListRequest: Received state of object " + objectID); System.out.println("ObjectListRequest: Received state of object " + objectID);
NetworkObject object = NetworkObject.createByID(objectID); NetworkObject object = NetworkObject.createByID(objectID);
object.updateFromStateMessage(objectState); object.updateFromStateMessage(objectState);
ret.add(object); ret.objects.add(object);
} }
future.complete(ret); future.complete(ret);
@ -63,4 +66,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
return false; return false;
} }
public static class ObjectListRequestResult
{
public List<NetworkObject> objects;
public long changeID;
}
} }

View file

@ -25,7 +25,7 @@ public class NetworkFile extends NetworkFSNode
private File localFile = null; private File localFile = null;
private final SortedSet<ObjectID> peersWithCopy = new TreeSet<>(); private final SortedSet<ObjectID> peersWithCopy = new TreeSet<>();
// private final Map<Peer, PeerFileState> fileStates = new HashMap<>(); // private final Map<Peer, PeerFileState> fileStates = new HashMap<>();
private BitSet pieces = new BitSet(); private BitSet pieces = new BitSet();
// private List<FilePiece> pieces = new ArrayList<>(); // private List<FilePiece> pieces = new ArrayList<>();
@ -70,14 +70,14 @@ public class NetworkFile extends NetworkFSNode
if (after.containsKey("hash")) if (after.containsKey("hash"))
hash = HexFormat.of().parseHex(after.get("hash")); hash = HexFormat.of().parseHex(after.get("hash"));
if (after.containsKey("pieceSize")) if (after.containsKey("pieceSize"))
pieceSize = Long.parseLong(after.get("pieceSize")); pieceSize = Long.parseLong(after.get("pieceSize"));
if (after.containsKey("peersWithCopy")) if (after.containsKey("peersWithCopy"))
{ {
peersWithCopy.clear(); peersWithCopy.clear();
String[] peers = after.get("peersWithCopy").split(","); String[] peers = after.get("peersWithCopy").split(",");
for (String peer: peers) for (String peer : peers)
{ {
peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16))); peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer, 16)));
} }
} }
} }
@ -122,7 +122,7 @@ public class NetworkFile extends NetworkFSNode
else else
localFile = new File(localFilePath); localFile = new File(localFilePath);
peersWithCopy.clear(); peersWithCopy.clear();
peersWithCopy.addAll(((List<Object>)map.get("peersWithCopy")).stream().map(Util::unconditionalNumberToLong).map(ObjectID::new).toList()); peersWithCopy.addAll(((List<Object>) map.get("peersWithCopy")).stream().map(Util::unconditionalNumberToLong).map(ObjectID::new).toList());
} }
public File getLocalFile() public File getLocalFile()
@ -132,7 +132,9 @@ public class NetworkFile extends NetworkFSNode
/** /**
* Retreives the local file used for storing file data, or if it doesn't exist, creates it. * Retreives the local file used for storing file data, or if it doesn't exist, creates it.
*
* @return a File object for the (potentially new) file storage. * @return a File object for the (potentially new) file storage.
*
* @throws IOException if the file could not be created due to an IO error. * @throws IOException if the file could not be created due to an IO error.
*/ */
public File getOrCreateLocalFile() throws IOException public File getOrCreateLocalFile() throws IOException
@ -146,7 +148,9 @@ public class NetworkFile extends NetworkFSNode
{ {
localFile = new File(tempDirectory, getName()); localFile = new File(tempDirectory, getName());
// localFile = File.createTempFile("FriendCloud", getNetworkPath()); // localFile = File.createTempFile("FriendCloud", getNetworkPath());
localFile.createNewFile(); boolean created = localFile.createNewFile();
if (created)
System.out.println("NetworkFile: Created local file " + localFile.getAbsolutePath() + " for " + getObjectID());
} }
return localFile; return localFile;
} }
@ -237,34 +241,25 @@ public class NetworkFile extends NetworkFSNode
/** /**
* Waits for a certain piece of the file to become available, or for a timeout to expire. * Waits for a certain piece of the file to become available, or for a timeout to expire.
* @param pieceIdx the index of the piece to wait for. Note: <=0 means "don't wait at all", not "wait forever". *
* @param pieceIdx the index of the piece to wait for. Note: <=0 means "don't wait at all", not "wait forever".
* @param timeoutMs the amount of time, in milliseconds, to wait. * @param timeoutMs the amount of time, in milliseconds, to wait.
* @return true if the piece is available (haspiece(pieceIdx) will return true), false if the timeout was reached. * @throws InterruptedException if the waiting was interrupted.
*
* @return true if the piece is available (hasPiece(pieceIdx) will return true), false if the timeout was reached.
*/ */
public boolean waitForFilePiece(int pieceIdx, long timeoutMs) public boolean waitForFilePiece(int pieceIdx, long timeoutMs) throws InterruptedException
{ {
synchronized (pieces) synchronized (pieces)
{ {
if (hasPiece(pieceIdx)) if (hasPiece(pieceIdx))
return true; return true;
while (timeoutMs > 0) System.out.println("NetworkFile: waiting " + timeoutMs + "ms for piece " + pieceIdx + " of file " + name);
pieces.wait(timeoutMs);
if (hasPiece(pieceIdx))
{ {
long startTime = System.currentTimeMillis(); System.out.println("NetworkFile: got piece we were waiting for.");
try return true;
{
System.out.println("NetworkFile: waiting "+ timeoutMs + "ms for piece " + pieceIdx + " of file " + name);
pieces.wait(timeoutMs);
if (hasPiece(pieceIdx))
{
System.out.println("NetworkFile: got piece we were waiting for.");
return true;
}
} catch (InterruptedException ignored)
{
}
long endTime = System.currentTimeMillis();
long timeWaited = (endTime - startTime);
timeoutMs -= timeWaited;
} }
} }
System.err.println("Timed out waiting for piece!"); System.err.println("Timed out waiting for piece!");
@ -286,11 +281,17 @@ public class NetworkFile extends NetworkFSNode
public enum StorageType public enum StorageType
{ {
/** The file will be stored as a complete file in the storage directory under it's own name and file path. */ /**
* The file will be stored as a complete file in the storage directory under it's own name and file path.
*/
COMPLETE, COMPLETE,
/** The file will be stored as a complete file in a temporary directory. It will be deleted when the computer restarts. */ /**
* The file will be stored as a complete file in a temporary directory. It will be deleted when the computer restarts.
*/
TEMPORARY_COMPLETE, TEMPORARY_COMPLETE,
/** Each piece of the file will be stored permanently in an individual piece file, in the pieces subdirectory. */ /**
* Each piece of the file will be stored permanently in an individual piece file, in the pieces subdirectory.
*/
PIECES, PIECES,
} }
} }

View file

@ -1,5 +1,6 @@
package moe.nekojimi.friendcloud.objects; package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.net.URI; import java.net.URI;
@ -13,6 +14,8 @@ public class Peer extends NetworkObject
private String userName = ""; private String userName = "";
private String systemName = ""; private String systemName = "";
private long lastKnownChangeID = 0L;
public Peer(ObjectID objectID) public Peer(ObjectID objectID)
{ {
super(objectID); super(objectID);
@ -71,6 +74,7 @@ public class Peer extends NetworkObject
ret.put("userName", userName); ret.put("userName", userName);
ret.put("systemName", systemName); ret.put("systemName", systemName);
ret.put("addresses", addresses); ret.put("addresses", addresses);
ret.put("lastKnownChangeID", lastKnownChangeID);
return ret; return ret;
} }
@ -81,6 +85,7 @@ public class Peer extends NetworkObject
systemName = map.get("systemName").toString(); systemName = map.get("systemName").toString();
addresses.clear(); addresses.clear();
addresses.addAll((Collection<? extends URI>) map.get("addresses")); addresses.addAll((Collection<? extends URI>) map.get("addresses"));
lastKnownChangeID = Util.unconditionalNumberToLong(map.getOrDefault("lastKnownChangeID", 0L));
} }
public void addAddress(URI address) public void addAddress(URI address)
@ -129,6 +134,16 @@ public class Peer extends NetworkObject
this.systemName = systemName; this.systemName = systemName;
} }
public long getLastKnownChangeID()
{
return lastKnownChangeID;
}
public void setLastKnownChangeID(long lastKnownChangeID)
{
this.lastKnownChangeID = lastKnownChangeID;
}
@Override @Override
public String getFriendlyName() public String getFriendlyName()
{ {

View file

@ -20,7 +20,7 @@ public class FileDownloadTask implements RunnableFuture<File>
private final NetworkFile file; private final NetworkFile file;
private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final long timeoutPerPieceMs = 1_000; private final long timeoutPerPieceMs = 30_000;
private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128; private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128;
private final SortedSet<Integer> missingPieceIndices = new TreeSet<>(); private final SortedSet<Integer> missingPieceIndices = new TreeSet<>();
@ -65,23 +65,28 @@ public class FileDownloadTask implements RunnableFuture<File>
Peer selfPeer = Main.getInstance().getModel().getLocalPeer(); Peer selfPeer = Main.getInstance().getModel().getLocalPeer();
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done) while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
{ {
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces."); System.out.println("FileDownloadTask: Need to get " + missingPieceIndices.size() + " missing pieces.");
// Map<Peer, PeerFileState> fileStates = file.getFileStates(); // Map<Peer, PeerFileState> fileStates = file.getFileStates();
// determine what nodes we can connect to // determine what nodes we can connect to
List<PeerConnection> connections = new ArrayList<>(); List<PeerConnection> connections = new ArrayList<>(file.getPeersWithCopy().stream()
for (Peer peer : file.getPeersWithCopy()) .filter(peer -> peer != selfPeer)
{ .map(connectionManager::getNodeConnectionAsync)
if (peer == selfPeer) .map(CompletableFuture::join)
continue; // yeah that's us dipshit .flatMap(Optional::stream)
PeerConnection connection = connectionManager.getNodeConnection(peer); .toList());
if (connection != null) // for (Peer peer : file.getPeersWithCopy())
{ // {
System.out.println("FileDownloadTask: Will download from " + peer.getNodeName()); // if (peer == selfPeer)
connections.add(connection); // continue; // yeah that's us dipshit
} // PeerConnection connection = connectionManager.getNodeConnection(peer);
} // if (connection != null)
// {
// System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
// connections.add(connection);
// }
// }
// connectionLine = "Connected to " + connections.size() + " peers."; // connectionLine = "Connected to " + connections.size() + " peers.";
// notification.setBody(connectionLine + "\n" + progressLine); // notification.setBody(connectionLine + "\n" + progressLine);
@ -90,6 +95,7 @@ public class FileDownloadTask implements RunnableFuture<File>
// shuffle the connections list // shuffle the connections list
Collections.shuffle(connections); Collections.shuffle(connections);
if (connections.isEmpty()) if (connections.isEmpty())
{ {
System.err.println("FileDownloadTask: No peers have the file, download failed!"); System.err.println("FileDownloadTask: No peers have the file, download failed!");
@ -98,6 +104,8 @@ public class FileDownloadTask implements RunnableFuture<File>
break; break;
} }
System.out.println("FileDownloadTask: Will download from " + connections.size() + " peers: " + connections.stream().map(PeerConnection::getUri).toList());
// find a continuous run of pieces to download // find a continuous run of pieces to download
// TODO: allow for runs with regular gaps (e.g. every 2) to account for previous failed download attempts // TODO: allow for runs with regular gaps (e.g. every 2) to account for previous failed download attempts
int runStart = -1; int runStart = -1;
@ -156,7 +164,7 @@ public class FileDownloadTask implements RunnableFuture<File>
if (file.getDownloadPercentage() >= 100.0) if (file.getDownloadPercentage() >= 100.0)
{ {
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID())) try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(selfPeer.getObjectID()))
{ {
transaction.addObjectBeforeChange(file); transaction.addObjectBeforeChange(file);
file.addPeerWithCopy(selfPeer); file.addPeerWithCopy(selfPeer);

View file

@ -53,7 +53,7 @@ public class JoinNetworkTask implements Runnable
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// create our local peer object // create our local peer object
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID)) try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(peerID))
{ {
// create and submit our Peer object if it doesn't exist // create and submit our Peer object if it doesn't exist
Peer selfPeer = controller.getLocalData().getLocalPeer(); Peer selfPeer = controller.getLocalData().getLocalPeer();

View file

@ -1,6 +1,7 @@
package moe.nekojimi.friendcloud.tasks; package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util; import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest; import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
@ -31,6 +32,8 @@ public class PullStateTask implements Runnable
System.out.println("PullStateTask: Pulling state from peers..."); System.out.println("PullStateTask: Pulling state from peers...");
Set<PeerConnection> connections = new HashSet<>(); Set<PeerConnection> connections = new HashSet<>();
// TODO: pull changes first, then select the peer(s) with the latest changes to pull state from
for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers()) for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers())
{ {
String[] split = knownPeerAddress.split(":"); String[] split = knownPeerAddress.split(":");
@ -50,7 +53,7 @@ public class PullStateTask implements Runnable
{ {
connections.add(nodeConnection); connections.add(nodeConnection);
} }
} catch (URISyntaxException | IOException e) } catch (URISyntaxException e)
{ {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -80,24 +83,33 @@ public class PullStateTask implements Runnable
for (PeerConnection connection : connections) for (PeerConnection connection : connections)
{ {
futures.add(connection.makeRequest(new ObjectListRequest(Set.of( CompletableFuture<Void> changeFuture = connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
{
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
});
CompletableFuture<Long> objectFuture = connection.makeRequest(new ObjectListRequest(Set.of(
ObjectStatements.ObjectType.OBJECT_TYPE_FILE, ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER, ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects -> ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenApply(objectListRequestResult ->
{ {
List<NetworkObject> networkObjects = objectListRequestResult.objects;
System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects."); System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects.");
for (NetworkObject object : networkObjects) for (NetworkObject object : networkObjects)
{ {
Main.getInstance().getModel().addNetworkObject(object); Main.getInstance().getModel().addNetworkObject(object);
} }
})); return objectListRequestResult.changeID;
});
futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords -> CompletableFuture<Void> bothFuture = changeFuture.thenAcceptBoth(objectFuture, (unused, changeID) ->
{ {
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records."); ObjectChangeRecord record = Main.getInstance().getModel().getChangeRecord(changeID);
Main.getInstance().getModel().addChangeRecords(objectChangeRecords); System.out.println("PullStateTask: updating current change ID to " + record);
})); Main.getInstance().getModel().setCurrentChangeRecord(record);
});
futures.add(bothFuture);
} }
Util.collectFutures(futures).join(); Util.collectFutures(futures).join();

View file

@ -21,6 +21,7 @@ message HelloMessage {
message CheckInMessage { message CheckInMessage {
repeated uint64 current_change_heads = 1; repeated uint64 current_change_heads = 1;
uint64 current_change = 2;
} }
enum Error { enum Error {