Compare commits

...

9 commits

19 changed files with 471 additions and 176 deletions

54
README.md Normal file
View file

@ -0,0 +1,54 @@
# FriendCloud (aka SamoyedShare?)
🐕☁🐕☁🐕☁🐕☁🐕
FriendCloud is a decentralised mesh-network-based file-sharing platform for you and your friends!
This repository is for the **backend** (or the reference implementation, at least). See also the [repository for the frontend]().
⚠ Currently FriendCloud is in SUPER WOBBLY TURBO-ALPHA. Don't use this! Your files may crumble into dust! Security is an unknown concept, still but a theory in the mind's-eye of divine astrologers! The database is but a collection of JSON files in a worm-infested crypt, left weak and vulnerable to the ravages of entropy!
# About FriendCloud
- It's like NextCloud; you have your own private space for you and your friends to store files, but it's decentralised and no one person controls it!
- It's like Syncthing; you can sync your files among all your friend's devices, but not everyone needs to have a copy of every file!
- It's like Git; file and object changes are tracked and merged, but (mostly) automagically!
- It's like Bittorrent; files are distributed among a bunch of peers, but you can view and open them with your normal file browser!
- It's like nothing anyone's ever made! Probably for good reason!
(For clarity, FriendCloud is not intended as a replacement for the above services, and is aiming for a slightly different use-case)
# Features
FriendCloud currently supports:
- Connecting to a bunch of other peers and synchronizing state (i.e. what peers and files exist)
- Mounting the virtual filesystem using FUSE (on Linux & Windows)
- Downloading/streaming files by opening them in your file browser
- Creating cloud folders and moving files in your file browser
- Zero security of any kind!!
Currently planned:
- Open API so people can write their own frontend/backend clients if they want (probably better than mine)
- File monitoring/tracking
- Being able to securely invite your friends to the network
- Some kind of public-key-based encrypted transport/user authentication
- Optional remote file modification (other people can change the files you've shared)
- JavaFX-based frontend for sharing/managing the network
- File replication; peers can donate some of their disk space for automatically saving redundant copies of each other's files
- And possibly downloading files on each other's behalf, for peers that can't connect to each other (or are in differnt timezones)
- KIO integration, for KDE on Linux
- One-user, many-peer; you can somehow prove that you control multiple peers and they all share permissions so you can manage your files from each of them
Maybe one day:
- One-peer, many-user; multiple (system) user accounts on the same machine can all have their own FriendCloud user accounts on the same network, even though there's just the one peer (running as a system daemon)
- GIO integration, for GNOME on Linux
- Windows [this thing](https://learn.microsoft.com/en-us/windows/win32/shell/nse-works) for Wandow 5 Macro suport (it's 3am)
- Maybe we can implement CalDAV or something and you can sync your calendar through it or some shit. attack and dethrone NextCloud
- I had weird vague dreams of it reading the metadata of your files like artist, album and stuff and it would automatically organise them for you. but you can just do that manually
- make wplace in it.
- You can be a member of multiple different networks and have the same files shared to each of them
# Building
Git! Java! Maven! I believe in you!

View file

@ -154,6 +154,7 @@ public class Controller
public synchronized void addChangeRecord(ObjectChangeRecord record)
{
System.out.println("Controller: Adding change record " + record );
DataStore.DAO<ObjectChangeRecord> dao = dataStore.getDAOForClass(ObjectChangeRecord.class);
dao.update(record);
// update the change heads; if any of this change's heads are included in ours, then this change replaces them
@ -165,13 +166,13 @@ public class Controller
for (long changeHeadID : record.getChangeHeads())
{
ObjectChangeRecord headRecord = dao.get(changeHeadID);
recordIsNewHead = changeHeads.remove(headRecord);
recordIsNewHead |= changeHeads.remove(headRecord);
}
}
if (recordIsNewHead)
{
changeHeads.add(record);
System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
System.out.println("Controller: Change heads updated: " + changeHeads);
}
}
}
@ -204,7 +205,7 @@ public class Controller
System.out.println("Controller: Applying change record " + record);
long changeID = localData.getCurrentChangeRecord().getChangeID();
if (!record.getChangeHeads().contains(changeID))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID));
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord());
addChangeRecord(record);
record.applyToLocalState();
@ -256,7 +257,7 @@ public class Controller
changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
Set<Long> referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet());
changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID()));
System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
System.out.println("Controller: Determined change heads to be " + changeHeads);
}
return changeHeads;
}
@ -335,7 +336,7 @@ public class Controller
String path = newpath.substring(0, lastSlash);
if (path.isEmpty())
path = "/";
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID()))
{
transaction.addObjectBeforeChange(fsNode);
if (!name.equals(fsNode.getName()))
@ -372,7 +373,7 @@ public class Controller
if (path.equals("/") || path.isEmpty())
return null;
NetworkFolder ret = null;
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID()))
{
NetworkFSNode node = getFSNode(path);
if (node != null && !(node instanceof NetworkFolder))

View file

@ -5,10 +5,9 @@ import moe.nekojimi.friendcloud.objects.NetworkFile;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FileRemoteAccess
{
@ -61,12 +60,14 @@ public class FileRemoteAccess
else
piecesToDownload = neededPieces;
CompletableFuture<Void> downloadFuture = null;
if (!piecesToDownload.isEmpty())
{
System.out.println("FRA: will fetch pieces " + piecesToDownload);
DownloadManager.getInstance().downloadPieces(file, piecesToDownload)
.thenRun(() -> {
downloadFuture = DownloadManager.getInstance().downloadPieces(file, piecesToDownload)
.thenRun(() ->
{
preemptiveDownloadInProgress = false;
});
// Main.getInstance().getExecutor().submit(futureTask);
@ -74,7 +75,7 @@ public class FileRemoteAccess
if (!neededPieces.isEmpty())
{
boolean ok = waitForPieceRange(neededPieces, 1000);
boolean ok = waitForPieceRange(neededPieces, 30_000, downloadFuture);
if (!ok)
{
System.err.println("FRA: timed out while waiting for pieces " + neededPieces);
@ -101,14 +102,35 @@ public class FileRemoteAccess
return ret;
}
private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs)
private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs, CompletableFuture<Void> downloadFuture)
{
for (int pieceIdx : pieces)
{
boolean ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs);
boolean ok = false;
while (pieceTimeoutMs > 0 && !ok)
{
long waitStartTime = Instant.now().toEpochMilli();
try
{
ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs);
}
catch (InterruptedException ignored)
{
ok = false;
}
if (!ok)
{
if (downloadFuture != null && (downloadFuture.isCancelled() || downloadFuture.isCancelled()))
{
System.err.println("FRA: Download failed.");
return false;
}
}
long waitEndTime = Instant.now().toEpochMilli();
long waitTime = waitEndTime - waitStartTime;
pieceTimeoutMs -= waitTime;
}
}
return true;
}

View file

@ -44,9 +44,20 @@ public class Main
public static void main(String[] argv)
{
instance = new Main();
Args args = new Args();
JCommander.newBuilder().addObject(args).build().parse(argv);
JCommander jCommander = JCommander.newBuilder()
.addObject(args)
.programName("FriendCloud")
.build();
jCommander.parse(argv);
if (args.help)
{
jCommander.usage();
return;
}
instance = new Main();
instance.args = args;
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info");
@ -66,6 +77,7 @@ public class Main
private void run() throws IOException
{
DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation));
controller = new Controller(dataStore);
@ -216,23 +228,32 @@ public class Main
@SuppressWarnings("FieldCanBeLocal")
public static class Args
{
@Parameter(names="-share")
@Parameter(names="-help", help = true, description = "Display this help message and quit.")
private boolean help = false;
@Parameter(names="-share", description = "Add a file path to be shared once the application starts.")
private List<String> sharedFilePaths = new ArrayList<>();
@Parameter(names="-known-peer")
@Parameter(names="-known-peer", description = "Add a URI (e.g. tcp://192.168.1.69:42069) of a known peer that will be connected to on first run.")
private List<String> knownPeers = new ArrayList<>();
@Parameter(names="-tcp-port")
@Parameter(names="-tcp-port", description = "The TCP port to listen for connections on.")
private int tcpPort = 7777;
@Parameter(names="-no-upnp")
@Parameter(names="-advertise-address", description = "Manually add a URI (e.g. tcp://192.168.1.69:42069) that will be given to other peers to connect to this peer with. Useful to provide your global IP if UPnP doesn't work.")
private List<String> advertiseAddresses = new ArrayList<>();
@Parameter(names="-no-upnp", description = "Disables UPnP.")
private boolean noUpnp = false;
@Parameter(names="-create-network")
private boolean createNetwork = false;
// @Parameter(names="-create-network")
// private boolean createNetwork = false;
@Parameter(names = "-storage")
private String storageLocation = ".";
@Parameter(names = "-storage", description = "The location on disk where the local copy of the state database will be stored.")
private String storageLocation = "./storage";
@Parameter(names = "-artificial-lag", description = "Set to a value above 0 to introduce artificial delay when sending messages, in milliseconds. Use for testing only!")
private int artificialLagMs = 0;
public List<String> getSharedFilePaths()
{
@ -254,14 +275,24 @@ public class Main
return noUpnp;
}
public boolean isCreateNetwork()
{
return createNetwork;
}
// public boolean isCreateNetwork()
// {
// return createNetwork;
// }
public String getStorageLocation()
{
return storageLocation;
}
public List<String> getAdvertiseAddresses()
{
return advertiseAddresses;
}
public int getArtificialLagMs()
{
return artificialLagMs;
}
}
}

View file

@ -55,13 +55,27 @@ public class ObjectChangeRecord implements Storable
try
{
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return digest.digest(toString().getBytes(StandardCharsets.UTF_8));
return digest.digest(getCanonicalStringRepresentation().getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
}
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
ObjectChangeRecord record = (ObjectChangeRecord) o;
return Objects.equals(getChangeID(), record.getChangeID());
}
@Override
public int hashCode()
{
return (int) getChangeID();
}
public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage()
{
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
@ -108,7 +122,7 @@ public class ObjectChangeRecord implements Storable
}
}
public String toString()
public String getCanonicalStringRepresentation()
{
StringBuilder sb = new StringBuilder();
sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";");
@ -119,11 +133,21 @@ public class ObjectChangeRecord implements Storable
sb.append(";");
for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList())
{
sb.append(change.toString()).append(";");
sb.append(change.getCanonicalStringRepresentation()).append(";");
}
return sb.toString();
}
@Override
public String toString()
{
return "Change{" + Long.toHexString(getChangeID()) +
", heads=" + changeHeads.stream().map(Long::toHexString).collect(Collectors.toSet()) +
", time=" + creationTime.toString() +
", creator=" + creatorPeer.toString() +
"}";
}
public long getChangeID()
{
MessageDigest digest = null;
@ -134,7 +158,7 @@ public class ObjectChangeRecord implements Storable
{
throw new RuntimeException(e);
}
String stringVal = toString();
String stringVal = getCanonicalStringRepresentation();
byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8));
// System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal);
return Util.xorBytesToLong(bytes);
@ -195,7 +219,7 @@ public class ObjectChangeRecord implements Storable
return null;
}
public String toString()
public String getCanonicalStringRepresentation()
{
StringBuilder sb = new StringBuilder();
sb.append(objectID).append(";"); // The object ID, then ;
@ -213,7 +237,6 @@ public class ObjectChangeRecord implements Storable
return sb.toString();
}
public ObjectStatements.ObjectChange.Builder buildObjectChange()
{
ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();

View file

@ -1,6 +1,5 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
@ -16,22 +15,19 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class ObjectChangeTransaction implements AutoCloseable
{
private final ObjectID creator;
private final ConnectionManager connectionManager;
private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
private static ObjectChangeTransaction currentTransaction = null;
private int openCount = 0;
private boolean ended = false;
private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator)
private ObjectChangeTransaction(ObjectID creator)
{
this.creator = creator;
this.connectionManager = connectionManager;
System.out.println("ObjectChangeTransaction: opening transaction");
@ -51,10 +47,10 @@ public class ObjectChangeTransaction implements AutoCloseable
}
}
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer)
public static ObjectChangeTransaction startTransaction(ObjectID creatorPeer)
{
if (currentTransaction == null)
currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer);
currentTransaction = new ObjectChangeTransaction(creatorPeer);
currentTransaction.increaseOpenCount();
return currentTransaction;
}
@ -95,7 +91,8 @@ public class ObjectChangeTransaction implements AutoCloseable
if (changes.isEmpty())
return null;
return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes);
ObjectChangeRecord currentChangeRecord = Main.getInstance().getModel().getLocalData().getCurrentChangeRecord();
return ObjectChangeRecord.createFromChanges(creator, currentChangeRecord != null ? Set.of(currentChangeRecord.getChangeID()) : Set.of(), changes);
}
public void commit()

View file

@ -18,7 +18,7 @@ public class SharedFileManager
if (files.isEmpty())
return;
Controller controller = Main.getInstance().getModel();
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID()))
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(controller.getLocalPeer().getObjectID()))
{
List<NetworkObject> knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));

View file

@ -73,7 +73,7 @@ public class Util
}
ret.totalDigest = totalDigest.digest();
System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest));
long pieceCount = file.length() / pieceSize;
long pieceCount = Math.ceilDiv(file.length() , pieceSize);
System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces.");
return ret;
} catch (NoSuchAlgorithmException | IOException e)

View file

@ -1,12 +1,17 @@
package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Stream;
public class ConnectionManager
{
@ -31,17 +36,17 @@ public class ConnectionManager
}
}
public PeerConnection getNodeConnection(URI uri) throws IOException
public PeerConnection getNodeConnection(URI uri)
{
return getNodeConnection(uri, null);
}
public PeerConnection getNodeConnection(URI uri, Peer peer) throws IOException
public PeerConnection getNodeConnection(URI uri, Peer peer)
{
purgeDeadConnections();
for (PeerConnection peerConnection: activeConnections)
{
if (peerConnection.getUri() == uri)
if (peerConnection.getUri().equals(uri))
return peerConnection;
}
@ -68,19 +73,19 @@ public class ConnectionManager
for (URI address : peer.getAddresses())
{
try
{
return getNodeConnection(address, peer);
}
catch (IOException ex)
{
System.err.println("ConnectionManager: Couldn't create PeerConnection to " + address + " : " + ex.getMessage());
}
PeerConnection connection = getNodeConnection(address, peer);
if (connection != null)
return connection;
}
System.err.println("ConnectionManager: Failed to create PeerConnection to " + peer + "!");
return null;
}
public CompletableFuture<Optional<PeerConnection>> getNodeConnectionAsync(Peer peer)
{
return CompletableFuture.supplyAsync(() -> Optional.ofNullable(getNodeConnection(peer)), Main.getInstance().getExecutor());
}
public void shutdown()
{
for (ConnectionBackend<?> backend : backends.values())
@ -123,9 +128,24 @@ public class ConnectionManager
public List<URI> getURIs()
{
return backends.values().stream()
.filter(ConnectionBackend::isListening)
.flatMap(connectionBackend -> connectionBackend.getURIs().stream())
.toList();
List<URI> ret = new ArrayList<>();
for (String advertiseAddress : Main.getInstance().getArgs().getAdvertiseAddresses())
{
try
{
URI uri = new URI(advertiseAddress);
ret.add(uri);
} catch (URISyntaxException e)
{
System.err.println("ERROR: " + advertiseAddress + " isn't a valid URI: " + e.getMessage());
}
}
for (ConnectionBackend<?> backend: backends.values())
{
if (!backend.isListening())
continue;
ret.addAll(backend.getURIs());
}
return ret;
}
}

View file

@ -2,6 +2,7 @@ package moe.nekojimi.friendcloud.network;
import com.google.protobuf.*;
import moe.nekojimi.friendcloud.Controller;
import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
@ -20,6 +21,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
public abstract class PeerConnection extends Thread
{
@ -27,22 +29,18 @@ public abstract class PeerConnection extends Thread
private ObjectID peerID = new ObjectID(0);
private long nextMessageId = 1;
private final URI uri;
private long artificalDelayMs = 0;
private long artificalDelayMs;
private final Map<String, MessageHandler<?>> messageHandlers = new HashMap<>();
public PeerConnection()
{
this(null);
}
public PeerConnection(URI uri)
public PeerConnection(@NotNull URI uri)
{
this.uri = uri;
installDefaultMessageHandlers();
artificalDelayMs = Main.getInstance().getArgs().getArtificialLagMs();
}
public PeerConnection(URI uri, @NotNull ObjectID peerID)
public PeerConnection(@NotNull URI uri, @NotNull ObjectID peerID)
{
this(uri);
this.peerID = peerID;
@ -58,19 +56,18 @@ public abstract class PeerConnection extends Thread
public synchronized <T> CompletableFuture<T> makeRequest(Request<?, T> request)
{
if (!isAlive())
throw new IllegalStateException("Request made to PeerConnection that isn't running!");
throw new IllegalStateException("PeerConnection (" + getUri() + "): Request made to PeerConnection that isn't running!");
try
{
Message message = request.buildMessage();
CommonMessages.FriendCloudMessage wrappedMessage = wrapMessage(message);
pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request);
sendMessage(wrappedMessage);
pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request);
return request.getFuture();
} catch (Exception e)
{
System.err.println("Request failed!");
System.err.println("PeerConnection (" + getUri() + "): Request failed!");
e.printStackTrace(System.err);
return CompletableFuture.failedFuture(e);
}
@ -107,7 +104,7 @@ public abstract class PeerConnection extends Thread
private void replyWithError(CommonMessages.Error error, CommonMessages.MessageHeader replyHeader) throws IOException
{
System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
System.err.println("PeerConnection (" + getUri() + "): Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build();
sendMessage(wrapMessage(errorMessage, replyHeader));
}
@ -118,7 +115,7 @@ public abstract class PeerConnection extends Thread
Any body = message.getBody();
long replyToMessageId = header.getReplyToMessageId();
ObjectID senderID = new ObjectID(header.getSenderId());
System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
System.out.println("PeerConnection (" + getUri() + "): Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
try
{
try
@ -127,7 +124,7 @@ public abstract class PeerConnection extends Thread
{
try
{
System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms...");
System.err.println("WARNING: artificial lag activated! Waiting " + artificalDelayMs + "ms...");
Thread.sleep(artificalDelayMs);
} catch (InterruptedException e)
{
@ -137,9 +134,15 @@ public abstract class PeerConnection extends Thread
if (!senderID.isNull())
{
if (peerID.isNull())
Peer localPeer = Main.getInstance().getModel().getLocalPeer();
if (localPeer != null && Objects.equals(senderID, localPeer.getObjectID()))
{
System.out.println("PeerConnection: Identified sender as " + senderID);
System.err.println("PeerConnection (" + getUri() + "): Connected to ourselves, terminating connection!");
shutdown();
}
else if (peerID.isNull())
{
System.out.println("PeerConnection (" + getUri() + "): Identified sender as " + senderID);
peerID = senderID;
}
else
@ -184,7 +187,7 @@ public abstract class PeerConnection extends Thread
private void handleErrorToUnsolicitedMessage(CommonMessages.MessageHeader header, CommonMessages.ErrorMessage body)
{
throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
throw new RuntimeException("PeerConnection (" + getUri() + "): Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
}
@SuppressWarnings({"rawtypes", "unchecked"})
@ -200,7 +203,7 @@ public abstract class PeerConnection extends Thread
}
else
{
System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!");
System.err.println("PeerConnection (" + getUri() + "): don't have a MessageHandler for message type " + typeUrl + "!");
replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header);
}
}
@ -208,7 +211,7 @@ public abstract class PeerConnection extends Thread
private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException
{
long replyToMessageId = header.getReplyToMessageId();
System.out.println("Received reply to message ID " + replyToMessageId);
System.out.println("PeerConnection (" + getUri() + "): Received reply to message ID " + replyToMessageId);
Request<?, ?> request = pendingRequests.get(replyToMessageId);
boolean doneWithRequest = request.handleReply(body);
if (doneWithRequest)
@ -236,14 +239,17 @@ public abstract class PeerConnection extends Thread
private void installDefaultMessageHandlers()
{
Controller controller = Main.getInstance().getModel();
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException
{
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList()));
List<NetworkObject> objects = controller.listObjects(new HashSet<>(message.getTypesList()));
ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
ObjectChangeRecord currentChangeRecord = controller.getLocalData().getCurrentChangeRecord();
objectList.setChangeHead(currentChangeRecord == null ? 0L : currentChangeRecord.getChangeID());
for (NetworkObject object : objects)
{
objectList.addStates(object.buildObjectState());
@ -263,7 +269,7 @@ public abstract class PeerConnection extends Thread
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
}
NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId()));
NetworkFile networkFile = controller.getObject(new ObjectID(message.getFileId()));
if (networkFile == null)
{
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
@ -273,7 +279,7 @@ public abstract class PeerConnection extends Thread
{
int startIndex = message.getStartPieceIndex();
int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1;
System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex);
System.out.println("PeerConnection (" + getUri() + "): Been asked for pieces from " + startIndex + " to " + endIndex);
List<Long> indices = new ArrayList<>();
for (int index = startIndex; index <= endIndex; index += message.getPieceMod())
{
@ -286,7 +292,7 @@ public abstract class PeerConnection extends Thread
byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index));
if (buffer != null)
{
System.out.println("Replying to file piece request with piece " + index);
System.out.println("PeerConnection (" + getUri() + "): Replying to file piece request with piece " + index);
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
.setPieceIndex(Math.toIntExact(index))
.setFileId(networkFile.getObjectID().toLong())
@ -296,7 +302,7 @@ public abstract class PeerConnection extends Thread
}
else
{
System.err.println("Don't have requested piece " + index + "!");
System.err.println("PeerConnection (" + getUri() + "): Don't have requested piece " + index + "!");
replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
break;
}
@ -310,8 +316,8 @@ public abstract class PeerConnection extends Thread
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException
{
List<Long> changesSinceList = message.getChangesSinceList();
System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString));
Set<ObjectChangeRecord> changes = Main.getInstance().getModel().findChangesSince(changesSinceList);
System.out.println("PeerConnection (" + getUri() + "): Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString).collect(Collectors.toSet()));
Set<ObjectChangeRecord> changes = controller.findChangesSince(changesSinceList);
if (changes == null)
{
replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header);
@ -323,7 +329,7 @@ public abstract class PeerConnection extends Thread
{
reply.addChangeMessages(change.buildObjectChangeMessage());
}
System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes");
System.out.println("PeerConnection (" + getUri() + "): Replying with " + reply.getChangeMessagesCount() + " changes");
sendMessage(wrapMessage(reply.build(), header));
}
}
@ -334,19 +340,26 @@ public abstract class PeerConnection extends Thread
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message)
{
ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message);
Main.getInstance().getModel().applyChangeRecord(record);
controller.applyChangeRecord(record);
}
});
installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class)
{
@Override
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message)
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message) throws IOException
{
List<Long> remoteChangeHeads = message.getCurrentChangeHeadsList();
Peer peer = controller.getObject(peerID);
if (peer != null)
{
peer.setLastKnownChangeID(message.getCurrentChange());
controller.objectChanged(peer);
}
Set<Long> remoteChangeHeads = new HashSet<>(message.getCurrentChangeHeadsList());
boolean potentialNewChanges = false;
for (long remoteChangeHead : remoteChangeHeads)
{
boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
boolean exists = controller.getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
if (!exists)
{
potentialNewChanges = true;
@ -355,10 +368,30 @@ public abstract class PeerConnection extends Thread
}
if (potentialNewChanges)
{
PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID)));
if (peer != null)
{
PullChangesTask task = new PullChangesTask(Set.of(peer));
Main.getInstance().getExecutor().submit(task);
}
}
else
{
Set<Long> changeHeadIDs = controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet());
// if there's no new changes then we know all the changes that the remote has
// so if they're not the same as our latest changes then they don't know about something we do
// so send them a checkin
boolean remoteOutOfDate = !changeHeadIDs.equals(remoteChangeHeads);
if (remoteOutOfDate)
{
CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder()
.addAllCurrentChangeHeads(changeHeadIDs)
.build();
sendMessage(wrapMessage(checkInMessage));
}
}
}
});
}

View file

@ -13,11 +13,10 @@ import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.*;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -30,22 +29,57 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
List<URI> getURIs()
{
List<URI> ret = new ArrayList<>();
ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort()));
ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort()));
InetAddress inetAddress = serverSocket.getInetAddress();
addAddressToList(inetAddress, ret);
try
{
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
for (Iterator<NetworkInterface> it = networkInterfaces.asIterator(); it.hasNext(); )
{
NetworkInterface networkInterface = it.next();
if (networkInterface.isLoopback())
continue;
if (!networkInterface.isUp())
continue;
for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses())
addAddressToList(interfaceAddress.getAddress(), ret);
}
} catch (SocketException e)
{
}
if (mappedPort != null)
{
ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort()));
ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort()));
addAddressToList(mappedPort.getExternalAddress(), ret);
}
System.out.println("Local addresses: " + ret);
return ret;
}
/**
* utility method for processing URIs
*/
private void addAddressToList(InetAddress address, List<URI> list)
{
if (address.isAnyLocalAddress())
return;
String hostAddress = address.getHostAddress();
if (!hostAddress.isEmpty())
list.add(makeURI(hostAddress, serverSocket.getLocalPort()));
String hostName = address.getHostName();
if (!hostName.isEmpty() && !hostName.equals(hostAddress))
list.add(makeURI(hostName, serverSocket.getLocalPort()));
}
public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException
{
super("TCP Listen Thread", "tcp", connectionManager);
serverSocket = new ServerSocket(port);
// setupIGP(port);
if (!Main.getInstance().getArgs().isNoUpnp())
setupIGP(port);
}
private void setupIGP(int port)
@ -60,6 +94,9 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
// Discover port forwarding devices and take the first one found
System.out.println("Discovering port mappers...");
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);
if (mappers.isEmpty())
return;
;
PortMapper mapper = mappers.getFirst();
System.out.println("Got mapper " + mapper + ", mapping port...");
@ -102,6 +139,7 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
protected TCPPeerConnection getConnection() throws IOException
{
Socket socket = serverSocket.accept();
System.out.println("TCPConnectionBackend: Received connection from " + socket.getRemoteSocketAddress());
return new TCPPeerConnection(socket);
}

View file

@ -2,12 +2,12 @@ package moe.nekojimi.friendcloud.network;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
import java.net.*;
public class TCPPeerConnection extends PeerConnection
{
@ -18,24 +18,37 @@ public class TCPPeerConnection extends PeerConnection
{
super(tcpURL, peer);
socket = new Socket(tcpURL.getHost(), tcpURL.getPort());
System.out.println("TCP Connection: connected to " + tcpURL + " OK!");
System.out.println("TCPPeerConnection: connected to " + tcpURL + " OK!");
}
public TCPPeerConnection(Socket openSocket)
{
super();
super(getSocketURI(openSocket.getInetAddress(), openSocket.getPort()));
socket = openSocket;
}
private static URI getSocketURI(@NotNull InetAddress address, int port)
{
try
{
return new URI("tcp://" + address.getHostAddress() + ":" + port);
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
}
@Override
public void run()
{
super.run();
try
try(InputStream inputStream = socket.getInputStream())
{
InputStream inputStream = socket.getInputStream();
while (!socket.isClosed())
socket.setKeepAlive(true);
socket.setSoTimeout(keepAliveTimeS * 1000);
while (!socket.isClosed() && !socket.isInputShutdown())
{
CommonMessages.FriendCloudMessage message = CommonMessages.FriendCloudMessage.parseDelimitedFrom(inputStream);
// Any any = Any.parseDelimitedFrom(inputStream);
@ -45,23 +58,39 @@ public class TCPPeerConnection extends PeerConnection
messageReceived(message);
}
}
} catch (Exception ex)
}
catch (SocketTimeoutException ex)
{
System.out.println("TCPPeerConnection (" + getUri() + "): Read timed out, closing connection.");
}
catch (Exception ex)
{
// fuck
ex.printStackTrace(System.err);
}
System.out.println("TCP Connection: connection closed");
System.out.println("TCPPeerConnection (" + getUri() + "): connection closed");
shutdown();
}
@Override
protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException
{
try
{
OutputStream outputStream = socket.getOutputStream();
System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
System.out.println("TCPPeerConnection (" + getUri() + "): Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
message.writeDelimitedTo(outputStream);
outputStream.flush();
}
catch (SocketException ex)
{
// handle this type of exception by closing the connection
System.err.println("TCPPeerConnection (" + getUri() + "): Failed to send, closing connection:" + ex.getMessage());
shutdown();
throw ex; // upper layer needs to know it failed
}
}
@Override
public synchronized void shutdown()
@ -69,9 +98,10 @@ public class TCPPeerConnection extends PeerConnection
try
{
socket.close();
interrupt();
} catch (IOException e)
{
System.err.println("TCPPeerConnection: failed to shut down!");
System.err.println("TCPPeerConnection (" + getUri() + "): failed to shut down!");
e.printStackTrace(System.err);
}
}

View file

@ -11,7 +11,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, List<NetworkObject>>
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, ObjectListRequest.ObjectListRequestResult>
{
private final Set<ObjectStatements.ObjectType> types;
@ -39,7 +39,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
// System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
List<NetworkObject> ret = new ArrayList<>();
ObjectListRequestResult ret = new ObjectListRequestResult();
ret.objects = new ArrayList<>();
ret.changeID = objectList.getChangeHead();
for (ObjectStatements.ObjectState objectState : objectList.getStatesList())
{
@ -47,7 +50,7 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
System.out.println("ObjectListRequest: Received state of object " + objectID);
NetworkObject object = NetworkObject.createByID(objectID);
object.updateFromStateMessage(objectState);
ret.add(object);
ret.objects.add(object);
}
future.complete(ret);
@ -63,4 +66,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
return false;
}
public static class ObjectListRequestResult
{
public List<NetworkObject> objects;
public long changeID;
}
}

View file

@ -25,7 +25,7 @@ public class NetworkFile extends NetworkFSNode
private File localFile = null;
private final SortedSet<ObjectID> peersWithCopy = new TreeSet<>();
// private final Map<Peer, PeerFileState> fileStates = new HashMap<>();
// private final Map<Peer, PeerFileState> fileStates = new HashMap<>();
private BitSet pieces = new BitSet();
// private List<FilePiece> pieces = new ArrayList<>();
@ -75,9 +75,9 @@ public class NetworkFile extends NetworkFSNode
{
peersWithCopy.clear();
String[] peers = after.get("peersWithCopy").split(",");
for (String peer: peers)
for (String peer : peers)
{
peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16)));
peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer, 16)));
}
}
}
@ -122,7 +122,7 @@ public class NetworkFile extends NetworkFSNode
else
localFile = new File(localFilePath);
peersWithCopy.clear();
peersWithCopy.addAll(((List<Object>)map.get("peersWithCopy")).stream().map(Util::unconditionalNumberToLong).map(ObjectID::new).toList());
peersWithCopy.addAll(((List<Object>) map.get("peersWithCopy")).stream().map(Util::unconditionalNumberToLong).map(ObjectID::new).toList());
}
public File getLocalFile()
@ -132,7 +132,9 @@ public class NetworkFile extends NetworkFSNode
/**
* Retreives the local file used for storing file data, or if it doesn't exist, creates it.
*
* @return a File object for the (potentially new) file storage.
*
* @throws IOException if the file could not be created due to an IO error.
*/
public File getOrCreateLocalFile() throws IOException
@ -146,7 +148,9 @@ public class NetworkFile extends NetworkFSNode
{
localFile = new File(tempDirectory, getName());
// localFile = File.createTempFile("FriendCloud", getNetworkPath());
localFile.createNewFile();
boolean created = localFile.createNewFile();
if (created)
System.out.println("NetworkFile: Created local file " + localFile.getAbsolutePath() + " for " + getObjectID());
}
return localFile;
}
@ -237,35 +241,26 @@ public class NetworkFile extends NetworkFSNode
/**
* Waits for a certain piece of the file to become available, or for a timeout to expire.
*
* @param pieceIdx the index of the piece to wait for. Note: <=0 means "don't wait at all", not "wait forever".
* @param timeoutMs the amount of time, in milliseconds, to wait.
* @return true if the piece is available (haspiece(pieceIdx) will return true), false if the timeout was reached.
* @throws InterruptedException if the waiting was interrupted.
*
* @return true if the piece is available (hasPiece(pieceIdx) will return true), false if the timeout was reached.
*/
public boolean waitForFilePiece(int pieceIdx, long timeoutMs)
public boolean waitForFilePiece(int pieceIdx, long timeoutMs) throws InterruptedException
{
synchronized (pieces)
{
if (hasPiece(pieceIdx))
return true;
while (timeoutMs > 0)
{
long startTime = System.currentTimeMillis();
try
{
System.out.println("NetworkFile: waiting "+ timeoutMs + "ms for piece " + pieceIdx + " of file " + name);
System.out.println("NetworkFile: waiting " + timeoutMs + "ms for piece " + pieceIdx + " of file " + name);
pieces.wait(timeoutMs);
if (hasPiece(pieceIdx))
{
System.out.println("NetworkFile: got piece we were waiting for.");
return true;
}
} catch (InterruptedException ignored)
{
}
long endTime = System.currentTimeMillis();
long timeWaited = (endTime - startTime);
timeoutMs -= timeWaited;
}
}
System.err.println("Timed out waiting for piece!");
return false;
@ -286,11 +281,17 @@ public class NetworkFile extends NetworkFSNode
public enum StorageType
{
/** The file will be stored as a complete file in the storage directory under it's own name and file path. */
/**
* The file will be stored as a complete file in the storage directory under it's own name and file path.
*/
COMPLETE,
/** The file will be stored as a complete file in a temporary directory. It will be deleted when the computer restarts. */
/**
* The file will be stored as a complete file in a temporary directory. It will be deleted when the computer restarts.
*/
TEMPORARY_COMPLETE,
/** Each piece of the file will be stored permanently in an individual piece file, in the pieces subdirectory. */
/**
* Each piece of the file will be stored permanently in an individual piece file, in the pieces subdirectory.
*/
PIECES,
}
}

View file

@ -1,5 +1,6 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.net.URI;
@ -13,6 +14,8 @@ public class Peer extends NetworkObject
private String userName = "";
private String systemName = "";
private long lastKnownChangeID = 0L;
public Peer(ObjectID objectID)
{
super(objectID);
@ -71,6 +74,7 @@ public class Peer extends NetworkObject
ret.put("userName", userName);
ret.put("systemName", systemName);
ret.put("addresses", addresses);
ret.put("lastKnownChangeID", lastKnownChangeID);
return ret;
}
@ -81,6 +85,7 @@ public class Peer extends NetworkObject
systemName = map.get("systemName").toString();
addresses.clear();
addresses.addAll((Collection<? extends URI>) map.get("addresses"));
lastKnownChangeID = Util.unconditionalNumberToLong(map.getOrDefault("lastKnownChangeID", 0L));
}
public void addAddress(URI address)
@ -129,6 +134,16 @@ public class Peer extends NetworkObject
this.systemName = systemName;
}
public long getLastKnownChangeID()
{
return lastKnownChangeID;
}
public void setLastKnownChangeID(long lastKnownChangeID)
{
this.lastKnownChangeID = lastKnownChangeID;
}
@Override
public String getFriendlyName()
{

View file

@ -20,7 +20,7 @@ public class FileDownloadTask implements RunnableFuture<File>
private final NetworkFile file;
private final ConnectionManager connectionManager;
private final long timeoutPerPieceMs = 1_000;
private final long timeoutPerPieceMs = 30_000;
private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128;
private final SortedSet<Integer> missingPieceIndices = new TreeSet<>();
@ -65,23 +65,28 @@ public class FileDownloadTask implements RunnableFuture<File>
Peer selfPeer = Main.getInstance().getModel().getLocalPeer();
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
{
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces.");
System.out.println("FileDownloadTask: Need to get " + missingPieceIndices.size() + " missing pieces.");
// Map<Peer, PeerFileState> fileStates = file.getFileStates();
// determine what nodes we can connect to
List<PeerConnection> connections = new ArrayList<>();
for (Peer peer : file.getPeersWithCopy())
{
if (peer == selfPeer)
continue; // yeah that's us dipshit
PeerConnection connection = connectionManager.getNodeConnection(peer);
if (connection != null)
{
System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
connections.add(connection);
}
}
List<PeerConnection> connections = new ArrayList<>(file.getPeersWithCopy().stream()
.filter(peer -> peer != selfPeer)
.map(connectionManager::getNodeConnectionAsync)
.map(CompletableFuture::join)
.flatMap(Optional::stream)
.toList());
// for (Peer peer : file.getPeersWithCopy())
// {
// if (peer == selfPeer)
// continue; // yeah that's us dipshit
// PeerConnection connection = connectionManager.getNodeConnection(peer);
// if (connection != null)
// {
// System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
// connections.add(connection);
// }
// }
// connectionLine = "Connected to " + connections.size() + " peers.";
// notification.setBody(connectionLine + "\n" + progressLine);
@ -90,6 +95,7 @@ public class FileDownloadTask implements RunnableFuture<File>
// shuffle the connections list
Collections.shuffle(connections);
if (connections.isEmpty())
{
System.err.println("FileDownloadTask: No peers have the file, download failed!");
@ -98,6 +104,8 @@ public class FileDownloadTask implements RunnableFuture<File>
break;
}
System.out.println("FileDownloadTask: Will download from " + connections.size() + " peers: " + connections.stream().map(PeerConnection::getUri).toList());
// find a continuous run of pieces to download
// TODO: allow for runs with regular gaps (e.g. every 2) to account for previous failed download attempts
int runStart = -1;
@ -156,7 +164,7 @@ public class FileDownloadTask implements RunnableFuture<File>
if (file.getDownloadPercentage() >= 100.0)
{
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID()))
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(selfPeer.getObjectID()))
{
transaction.addObjectBeforeChange(file);
file.addPeerWithCopy(selfPeer);

View file

@ -53,7 +53,7 @@ public class JoinNetworkTask implements Runnable
throw new RuntimeException(e);
}
// create our local peer object
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID))
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(peerID))
{
// create and submit our Peer object if it doesn't exist
Peer selfPeer = controller.getLocalData().getLocalPeer();

View file

@ -1,6 +1,7 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
@ -31,6 +32,8 @@ public class PullStateTask implements Runnable
System.out.println("PullStateTask: Pulling state from peers...");
Set<PeerConnection> connections = new HashSet<>();
// TODO: pull changes first, then select the peer(s) with the latest changes to pull state from
for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers())
{
String[] split = knownPeerAddress.split(":");
@ -50,7 +53,7 @@ public class PullStateTask implements Runnable
{
connections.add(nodeConnection);
}
} catch (URISyntaxException | IOException e)
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
@ -80,24 +83,33 @@ public class PullStateTask implements Runnable
for (PeerConnection connection : connections)
{
futures.add(connection.makeRequest(new ObjectListRequest(Set.of(
CompletableFuture<Void> changeFuture = connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
{
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
});
CompletableFuture<Long> objectFuture = connection.makeRequest(new ObjectListRequest(Set.of(
ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects ->
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenApply(objectListRequestResult ->
{
List<NetworkObject> networkObjects = objectListRequestResult.objects;
System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects.");
for (NetworkObject object : networkObjects)
{
Main.getInstance().getModel().addNetworkObject(object);
}
}));
return objectListRequestResult.changeID;
});
futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
CompletableFuture<Void> bothFuture = changeFuture.thenAcceptBoth(objectFuture, (unused, changeID) ->
{
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
}));
ObjectChangeRecord record = Main.getInstance().getModel().getChangeRecord(changeID);
System.out.println("PullStateTask: updating current change ID to " + record);
Main.getInstance().getModel().setCurrentChangeRecord(record);
});
futures.add(bothFuture);
}
Util.collectFutures(futures).join();

View file

@ -21,6 +21,7 @@ message HelloMessage {
message CheckInMessage {
repeated uint64 current_change_heads = 1;
uint64 current_change = 2;
}
enum Error {