Compare commits
9 commits
cae44b6f85
...
8b0d91037c
Author | SHA1 | Date | |
---|---|---|---|
|
8b0d91037c | ||
|
480b6429f9 | ||
|
fee20875a1 | ||
|
a14701f5c5 | ||
|
47b97c0485 | ||
|
6685e6fd7e | ||
|
1131d47dc0 | ||
|
eae105ab61 | ||
|
2d4609e9e8 |
54
README.md
Normal file
54
README.md
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
# FriendCloud (aka SamoyedShare?)
|
||||||
|
|
||||||
|
🐕☁🐕☁🐕☁🐕☁🐕
|
||||||
|
|
||||||
|
FriendCloud is a decentralised mesh-network-based file-sharing platform for you and your friends!
|
||||||
|
|
||||||
|
This repository is for the **backend** (or the reference implementation, at least). See also the [repository for the frontend]().
|
||||||
|
|
||||||
|
⚠ Currently FriendCloud is in SUPER WOBBLY TURBO-ALPHA. Don't use this! Your files may crumble into dust! Security is an unknown concept, still but a theory in the mind's-eye of divine astrologers! The database is but a collection of JSON files in a worm-infested crypt, left weak and vulnerable to the ravages of entropy!
|
||||||
|
# About FriendCloud
|
||||||
|
|
||||||
|
- It's like NextCloud; you have your own private space for you and your friends to store files, but it's decentralised and no one person controls it!
|
||||||
|
- It's like Syncthing; you can sync your files among all your friend's devices, but not everyone needs to have a copy of every file!
|
||||||
|
- It's like Git; file and object changes are tracked and merged, but (mostly) automagically!
|
||||||
|
- It's like Bittorrent; files are distributed among a bunch of peers, but you can view and open them with your normal file browser!
|
||||||
|
- It's like nothing anyone's ever made! Probably for good reason!
|
||||||
|
|
||||||
|
(For clarity, FriendCloud is not intended as a replacement for the above services, and is aiming for a slightly different use-case)
|
||||||
|
|
||||||
|
# Features
|
||||||
|
|
||||||
|
FriendCloud currently supports:
|
||||||
|
|
||||||
|
- Connecting to a bunch of other peers and synchronizing state (i.e. what peers and files exist)
|
||||||
|
- Mounting the virtual filesystem using FUSE (on Linux & Windows)
|
||||||
|
- Downloading/streaming files by opening them in your file browser
|
||||||
|
- Creating cloud folders and moving files in your file browser
|
||||||
|
- Zero security of any kind!!
|
||||||
|
|
||||||
|
Currently planned:
|
||||||
|
|
||||||
|
- Open API so people can write their own frontend/backend clients if they want (probably better than mine)
|
||||||
|
- File monitoring/tracking
|
||||||
|
- Being able to securely invite your friends to the network
|
||||||
|
- Some kind of public-key-based encrypted transport/user authentication
|
||||||
|
- Optional remote file modification (other people can change the files you've shared)
|
||||||
|
- JavaFX-based frontend for sharing/managing the network
|
||||||
|
- File replication; peers can donate some of their disk space for automatically saving redundant copies of each other's files
|
||||||
|
- And possibly downloading files on each other's behalf, for peers that can't connect to each other (or are in differnt timezones)
|
||||||
|
- KIO integration, for KDE on Linux
|
||||||
|
- One-user, many-peer; you can somehow prove that you control multiple peers and they all share permissions so you can manage your files from each of them
|
||||||
|
|
||||||
|
Maybe one day:
|
||||||
|
- One-peer, many-user; multiple (system) user accounts on the same machine can all have their own FriendCloud user accounts on the same network, even though there's just the one peer (running as a system daemon)
|
||||||
|
- GIO integration, for GNOME on Linux
|
||||||
|
- Windows [this thing](https://learn.microsoft.com/en-us/windows/win32/shell/nse-works) for Wandow 5 Macro suport (it's 3am)
|
||||||
|
- Maybe we can implement CalDAV or something and you can sync your calendar through it or some shit. attack and dethrone NextCloud
|
||||||
|
- I had weird vague dreams of it reading the metadata of your files like artist, album and stuff and it would automatically organise them for you. but you can just do that manually
|
||||||
|
- make wplace in it.
|
||||||
|
- You can be a member of multiple different networks and have the same files shared to each of them
|
||||||
|
|
||||||
|
# Building
|
||||||
|
|
||||||
|
Git! Java! Maven! I believe in you!
|
|
@ -154,6 +154,7 @@ public class Controller
|
||||||
|
|
||||||
public synchronized void addChangeRecord(ObjectChangeRecord record)
|
public synchronized void addChangeRecord(ObjectChangeRecord record)
|
||||||
{
|
{
|
||||||
|
System.out.println("Controller: Adding change record " + record );
|
||||||
DataStore.DAO<ObjectChangeRecord> dao = dataStore.getDAOForClass(ObjectChangeRecord.class);
|
DataStore.DAO<ObjectChangeRecord> dao = dataStore.getDAOForClass(ObjectChangeRecord.class);
|
||||||
dao.update(record);
|
dao.update(record);
|
||||||
// update the change heads; if any of this change's heads are included in ours, then this change replaces them
|
// update the change heads; if any of this change's heads are included in ours, then this change replaces them
|
||||||
|
@ -165,13 +166,13 @@ public class Controller
|
||||||
for (long changeHeadID : record.getChangeHeads())
|
for (long changeHeadID : record.getChangeHeads())
|
||||||
{
|
{
|
||||||
ObjectChangeRecord headRecord = dao.get(changeHeadID);
|
ObjectChangeRecord headRecord = dao.get(changeHeadID);
|
||||||
recordIsNewHead = changeHeads.remove(headRecord);
|
recordIsNewHead |= changeHeads.remove(headRecord);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (recordIsNewHead)
|
if (recordIsNewHead)
|
||||||
{
|
{
|
||||||
changeHeads.add(record);
|
changeHeads.add(record);
|
||||||
System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
|
System.out.println("Controller: Change heads updated: " + changeHeads);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -204,7 +205,7 @@ public class Controller
|
||||||
System.out.println("Controller: Applying change record " + record);
|
System.out.println("Controller: Applying change record " + record);
|
||||||
long changeID = localData.getCurrentChangeRecord().getChangeID();
|
long changeID = localData.getCurrentChangeRecord().getChangeID();
|
||||||
if (!record.getChangeHeads().contains(changeID))
|
if (!record.getChangeHeads().contains(changeID))
|
||||||
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID));
|
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord());
|
||||||
addChangeRecord(record);
|
addChangeRecord(record);
|
||||||
|
|
||||||
record.applyToLocalState();
|
record.applyToLocalState();
|
||||||
|
@ -256,7 +257,7 @@ public class Controller
|
||||||
changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
|
changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
|
||||||
Set<Long> referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet());
|
Set<Long> referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet());
|
||||||
changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID()));
|
changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID()));
|
||||||
System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
|
System.out.println("Controller: Determined change heads to be " + changeHeads);
|
||||||
}
|
}
|
||||||
return changeHeads;
|
return changeHeads;
|
||||||
}
|
}
|
||||||
|
@ -335,7 +336,7 @@ public class Controller
|
||||||
String path = newpath.substring(0, lastSlash);
|
String path = newpath.substring(0, lastSlash);
|
||||||
if (path.isEmpty())
|
if (path.isEmpty())
|
||||||
path = "/";
|
path = "/";
|
||||||
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
|
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID()))
|
||||||
{
|
{
|
||||||
transaction.addObjectBeforeChange(fsNode);
|
transaction.addObjectBeforeChange(fsNode);
|
||||||
if (!name.equals(fsNode.getName()))
|
if (!name.equals(fsNode.getName()))
|
||||||
|
@ -372,7 +373,7 @@ public class Controller
|
||||||
if (path.equals("/") || path.isEmpty())
|
if (path.equals("/") || path.isEmpty())
|
||||||
return null;
|
return null;
|
||||||
NetworkFolder ret = null;
|
NetworkFolder ret = null;
|
||||||
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
|
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID()))
|
||||||
{
|
{
|
||||||
NetworkFSNode node = getFSNode(path);
|
NetworkFSNode node = getFSNode(path);
|
||||||
if (node != null && !(node instanceof NetworkFolder))
|
if (node != null && !(node instanceof NetworkFolder))
|
||||||
|
|
|
@ -5,10 +5,9 @@ import moe.nekojimi.friendcloud.objects.NetworkFile;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.FutureTask;
|
|
||||||
|
|
||||||
public class FileRemoteAccess
|
public class FileRemoteAccess
|
||||||
{
|
{
|
||||||
|
@ -61,12 +60,14 @@ public class FileRemoteAccess
|
||||||
else
|
else
|
||||||
piecesToDownload = neededPieces;
|
piecesToDownload = neededPieces;
|
||||||
|
|
||||||
|
CompletableFuture<Void> downloadFuture = null;
|
||||||
if (!piecesToDownload.isEmpty())
|
if (!piecesToDownload.isEmpty())
|
||||||
{
|
{
|
||||||
System.out.println("FRA: will fetch pieces " + piecesToDownload);
|
System.out.println("FRA: will fetch pieces " + piecesToDownload);
|
||||||
|
|
||||||
DownloadManager.getInstance().downloadPieces(file, piecesToDownload)
|
downloadFuture = DownloadManager.getInstance().downloadPieces(file, piecesToDownload)
|
||||||
.thenRun(() -> {
|
.thenRun(() ->
|
||||||
|
{
|
||||||
preemptiveDownloadInProgress = false;
|
preemptiveDownloadInProgress = false;
|
||||||
});
|
});
|
||||||
// Main.getInstance().getExecutor().submit(futureTask);
|
// Main.getInstance().getExecutor().submit(futureTask);
|
||||||
|
@ -74,7 +75,7 @@ public class FileRemoteAccess
|
||||||
|
|
||||||
if (!neededPieces.isEmpty())
|
if (!neededPieces.isEmpty())
|
||||||
{
|
{
|
||||||
boolean ok = waitForPieceRange(neededPieces, 1000);
|
boolean ok = waitForPieceRange(neededPieces, 30_000, downloadFuture);
|
||||||
if (!ok)
|
if (!ok)
|
||||||
{
|
{
|
||||||
System.err.println("FRA: timed out while waiting for pieces " + neededPieces);
|
System.err.println("FRA: timed out while waiting for pieces " + neededPieces);
|
||||||
|
@ -101,14 +102,35 @@ public class FileRemoteAccess
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs)
|
private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs, CompletableFuture<Void> downloadFuture)
|
||||||
{
|
{
|
||||||
for (int pieceIdx : pieces)
|
for (int pieceIdx : pieces)
|
||||||
{
|
{
|
||||||
boolean ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs);
|
boolean ok = false;
|
||||||
|
while (pieceTimeoutMs > 0 && !ok)
|
||||||
|
{
|
||||||
|
long waitStartTime = Instant.now().toEpochMilli();
|
||||||
|
try
|
||||||
|
{
|
||||||
|
ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs);
|
||||||
|
}
|
||||||
|
catch (InterruptedException ignored)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
}
|
||||||
if (!ok)
|
if (!ok)
|
||||||
|
{
|
||||||
|
if (downloadFuture != null && (downloadFuture.isCancelled() || downloadFuture.isCancelled()))
|
||||||
|
{
|
||||||
|
System.err.println("FRA: Download failed.");
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
long waitEndTime = Instant.now().toEpochMilli();
|
||||||
|
long waitTime = waitEndTime - waitStartTime;
|
||||||
|
pieceTimeoutMs -= waitTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,9 +44,20 @@ public class Main
|
||||||
|
|
||||||
public static void main(String[] argv)
|
public static void main(String[] argv)
|
||||||
{
|
{
|
||||||
instance = new Main();
|
|
||||||
Args args = new Args();
|
Args args = new Args();
|
||||||
JCommander.newBuilder().addObject(args).build().parse(argv);
|
JCommander jCommander = JCommander.newBuilder()
|
||||||
|
.addObject(args)
|
||||||
|
.programName("FriendCloud")
|
||||||
|
.build();
|
||||||
|
jCommander.parse(argv);
|
||||||
|
|
||||||
|
if (args.help)
|
||||||
|
{
|
||||||
|
jCommander.usage();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
instance = new Main();
|
||||||
instance.args = args;
|
instance.args = args;
|
||||||
|
|
||||||
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info");
|
System.setProperty(SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "Info");
|
||||||
|
@ -66,6 +77,7 @@ public class Main
|
||||||
|
|
||||||
private void run() throws IOException
|
private void run() throws IOException
|
||||||
{
|
{
|
||||||
|
|
||||||
DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation));
|
DataStore dataStore = new StupidJSONFileStore(new File(args.storageLocation));
|
||||||
controller = new Controller(dataStore);
|
controller = new Controller(dataStore);
|
||||||
|
|
||||||
|
@ -216,23 +228,32 @@ public class Main
|
||||||
@SuppressWarnings("FieldCanBeLocal")
|
@SuppressWarnings("FieldCanBeLocal")
|
||||||
public static class Args
|
public static class Args
|
||||||
{
|
{
|
||||||
@Parameter(names="-share")
|
@Parameter(names="-help", help = true, description = "Display this help message and quit.")
|
||||||
|
private boolean help = false;
|
||||||
|
|
||||||
|
@Parameter(names="-share", description = "Add a file path to be shared once the application starts.")
|
||||||
private List<String> sharedFilePaths = new ArrayList<>();
|
private List<String> sharedFilePaths = new ArrayList<>();
|
||||||
|
|
||||||
@Parameter(names="-known-peer")
|
@Parameter(names="-known-peer", description = "Add a URI (e.g. tcp://192.168.1.69:42069) of a known peer that will be connected to on first run.")
|
||||||
private List<String> knownPeers = new ArrayList<>();
|
private List<String> knownPeers = new ArrayList<>();
|
||||||
|
|
||||||
@Parameter(names="-tcp-port")
|
@Parameter(names="-tcp-port", description = "The TCP port to listen for connections on.")
|
||||||
private int tcpPort = 7777;
|
private int tcpPort = 7777;
|
||||||
|
|
||||||
@Parameter(names="-no-upnp")
|
@Parameter(names="-advertise-address", description = "Manually add a URI (e.g. tcp://192.168.1.69:42069) that will be given to other peers to connect to this peer with. Useful to provide your global IP if UPnP doesn't work.")
|
||||||
|
private List<String> advertiseAddresses = new ArrayList<>();
|
||||||
|
|
||||||
|
@Parameter(names="-no-upnp", description = "Disables UPnP.")
|
||||||
private boolean noUpnp = false;
|
private boolean noUpnp = false;
|
||||||
|
|
||||||
@Parameter(names="-create-network")
|
// @Parameter(names="-create-network")
|
||||||
private boolean createNetwork = false;
|
// private boolean createNetwork = false;
|
||||||
|
|
||||||
@Parameter(names = "-storage")
|
@Parameter(names = "-storage", description = "The location on disk where the local copy of the state database will be stored.")
|
||||||
private String storageLocation = ".";
|
private String storageLocation = "./storage";
|
||||||
|
|
||||||
|
@Parameter(names = "-artificial-lag", description = "Set to a value above 0 to introduce artificial delay when sending messages, in milliseconds. Use for testing only!")
|
||||||
|
private int artificialLagMs = 0;
|
||||||
|
|
||||||
public List<String> getSharedFilePaths()
|
public List<String> getSharedFilePaths()
|
||||||
{
|
{
|
||||||
|
@ -254,14 +275,24 @@ public class Main
|
||||||
return noUpnp;
|
return noUpnp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isCreateNetwork()
|
// public boolean isCreateNetwork()
|
||||||
{
|
// {
|
||||||
return createNetwork;
|
// return createNetwork;
|
||||||
}
|
// }
|
||||||
|
|
||||||
public String getStorageLocation()
|
public String getStorageLocation()
|
||||||
{
|
{
|
||||||
return storageLocation;
|
return storageLocation;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<String> getAdvertiseAddresses()
|
||||||
|
{
|
||||||
|
return advertiseAddresses;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getArtificialLagMs()
|
||||||
|
{
|
||||||
|
return artificialLagMs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -55,13 +55,27 @@ public class ObjectChangeRecord implements Storable
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
MessageDigest digest = MessageDigest.getInstance("SHA-256");
|
||||||
return digest.digest(toString().getBytes(StandardCharsets.UTF_8));
|
return digest.digest(getCanonicalStringRepresentation().getBytes(StandardCharsets.UTF_8));
|
||||||
} catch (NoSuchAlgorithmException e)
|
} catch (NoSuchAlgorithmException e)
|
||||||
{
|
{
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object o)
|
||||||
|
{
|
||||||
|
if (o == null || getClass() != o.getClass()) return false;
|
||||||
|
ObjectChangeRecord record = (ObjectChangeRecord) o;
|
||||||
|
return Objects.equals(getChangeID(), record.getChangeID());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode()
|
||||||
|
{
|
||||||
|
return (int) getChangeID();
|
||||||
|
}
|
||||||
|
|
||||||
public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage()
|
public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage()
|
||||||
{
|
{
|
||||||
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
|
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
|
||||||
|
@ -108,7 +122,7 @@ public class ObjectChangeRecord implements Storable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString()
|
public String getCanonicalStringRepresentation()
|
||||||
{
|
{
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";");
|
sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";");
|
||||||
|
@ -119,11 +133,21 @@ public class ObjectChangeRecord implements Storable
|
||||||
sb.append(";");
|
sb.append(";");
|
||||||
for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList())
|
for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList())
|
||||||
{
|
{
|
||||||
sb.append(change.toString()).append(";");
|
sb.append(change.getCanonicalStringRepresentation()).append(";");
|
||||||
}
|
}
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "Change{" + Long.toHexString(getChangeID()) +
|
||||||
|
", heads=" + changeHeads.stream().map(Long::toHexString).collect(Collectors.toSet()) +
|
||||||
|
", time=" + creationTime.toString() +
|
||||||
|
", creator=" + creatorPeer.toString() +
|
||||||
|
"}";
|
||||||
|
}
|
||||||
|
|
||||||
public long getChangeID()
|
public long getChangeID()
|
||||||
{
|
{
|
||||||
MessageDigest digest = null;
|
MessageDigest digest = null;
|
||||||
|
@ -134,7 +158,7 @@ public class ObjectChangeRecord implements Storable
|
||||||
{
|
{
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
String stringVal = toString();
|
String stringVal = getCanonicalStringRepresentation();
|
||||||
byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8));
|
byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8));
|
||||||
// System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal);
|
// System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal);
|
||||||
return Util.xorBytesToLong(bytes);
|
return Util.xorBytesToLong(bytes);
|
||||||
|
@ -195,7 +219,7 @@ public class ObjectChangeRecord implements Storable
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String toString()
|
public String getCanonicalStringRepresentation()
|
||||||
{
|
{
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append(objectID).append(";"); // The object ID, then ;
|
sb.append(objectID).append(";"); // The object ID, then ;
|
||||||
|
@ -213,7 +237,6 @@ public class ObjectChangeRecord implements Storable
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public ObjectStatements.ObjectChange.Builder buildObjectChange()
|
public ObjectStatements.ObjectChange.Builder buildObjectChange()
|
||||||
{
|
{
|
||||||
ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();
|
ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
package moe.nekojimi.friendcloud;
|
package moe.nekojimi.friendcloud;
|
||||||
|
|
||||||
import moe.nekojimi.friendcloud.network.ConnectionManager;
|
|
||||||
import moe.nekojimi.friendcloud.objects.NetworkObject;
|
import moe.nekojimi.friendcloud.objects.NetworkObject;
|
||||||
import moe.nekojimi.friendcloud.objects.ObjectID;
|
import moe.nekojimi.friendcloud.objects.ObjectID;
|
||||||
import moe.nekojimi.friendcloud.protos.ObjectStatements;
|
import moe.nekojimi.friendcloud.protos.ObjectStatements;
|
||||||
|
@ -16,22 +15,19 @@ import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class ObjectChangeTransaction implements AutoCloseable
|
public class ObjectChangeTransaction implements AutoCloseable
|
||||||
{
|
{
|
||||||
private final ObjectID creator;
|
private final ObjectID creator;
|
||||||
private final ConnectionManager connectionManager;
|
|
||||||
private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
|
private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
|
||||||
private static ObjectChangeTransaction currentTransaction = null;
|
private static ObjectChangeTransaction currentTransaction = null;
|
||||||
private int openCount = 0;
|
private int openCount = 0;
|
||||||
|
|
||||||
private boolean ended = false;
|
private boolean ended = false;
|
||||||
|
|
||||||
private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator)
|
private ObjectChangeTransaction(ObjectID creator)
|
||||||
{
|
{
|
||||||
this.creator = creator;
|
this.creator = creator;
|
||||||
this.connectionManager = connectionManager;
|
|
||||||
|
|
||||||
System.out.println("ObjectChangeTransaction: opening transaction");
|
System.out.println("ObjectChangeTransaction: opening transaction");
|
||||||
|
|
||||||
|
@ -51,10 +47,10 @@ public class ObjectChangeTransaction implements AutoCloseable
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer)
|
public static ObjectChangeTransaction startTransaction(ObjectID creatorPeer)
|
||||||
{
|
{
|
||||||
if (currentTransaction == null)
|
if (currentTransaction == null)
|
||||||
currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer);
|
currentTransaction = new ObjectChangeTransaction(creatorPeer);
|
||||||
currentTransaction.increaseOpenCount();
|
currentTransaction.increaseOpenCount();
|
||||||
return currentTransaction;
|
return currentTransaction;
|
||||||
}
|
}
|
||||||
|
@ -95,7 +91,8 @@ public class ObjectChangeTransaction implements AutoCloseable
|
||||||
|
|
||||||
if (changes.isEmpty())
|
if (changes.isEmpty())
|
||||||
return null;
|
return null;
|
||||||
return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes);
|
ObjectChangeRecord currentChangeRecord = Main.getInstance().getModel().getLocalData().getCurrentChangeRecord();
|
||||||
|
return ObjectChangeRecord.createFromChanges(creator, currentChangeRecord != null ? Set.of(currentChangeRecord.getChangeID()) : Set.of(), changes);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void commit()
|
public void commit()
|
||||||
|
|
|
@ -18,7 +18,7 @@ public class SharedFileManager
|
||||||
if (files.isEmpty())
|
if (files.isEmpty())
|
||||||
return;
|
return;
|
||||||
Controller controller = Main.getInstance().getModel();
|
Controller controller = Main.getInstance().getModel();
|
||||||
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID()))
|
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(controller.getLocalPeer().getObjectID()))
|
||||||
{
|
{
|
||||||
List<NetworkObject> knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
|
List<NetworkObject> knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));
|
||||||
|
|
||||||
|
|
|
@ -73,7 +73,7 @@ public class Util
|
||||||
}
|
}
|
||||||
ret.totalDigest = totalDigest.digest();
|
ret.totalDigest = totalDigest.digest();
|
||||||
System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest));
|
System.out.println("Total hash: " + HexFormat.of().formatHex(ret.totalDigest));
|
||||||
long pieceCount = file.length() / pieceSize;
|
long pieceCount = Math.ceilDiv(file.length() , pieceSize);
|
||||||
System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces.");
|
System.out.println("Have " + ret.pieces.cardinality() + " of " + pieceCount + " pieces.");
|
||||||
return ret;
|
return ret;
|
||||||
} catch (NoSuchAlgorithmException | IOException e)
|
} catch (NoSuchAlgorithmException | IOException e)
|
||||||
|
|
|
@ -1,12 +1,17 @@
|
||||||
package moe.nekojimi.friendcloud.network;
|
package moe.nekojimi.friendcloud.network;
|
||||||
|
|
||||||
|
import moe.nekojimi.friendcloud.Main;
|
||||||
import moe.nekojimi.friendcloud.objects.ObjectID;
|
import moe.nekojimi.friendcloud.objects.ObjectID;
|
||||||
import moe.nekojimi.friendcloud.objects.Peer;
|
import moe.nekojimi.friendcloud.objects.Peer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
public class ConnectionManager
|
public class ConnectionManager
|
||||||
{
|
{
|
||||||
|
@ -31,17 +36,17 @@ public class ConnectionManager
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public PeerConnection getNodeConnection(URI uri) throws IOException
|
public PeerConnection getNodeConnection(URI uri)
|
||||||
{
|
{
|
||||||
return getNodeConnection(uri, null);
|
return getNodeConnection(uri, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public PeerConnection getNodeConnection(URI uri, Peer peer) throws IOException
|
public PeerConnection getNodeConnection(URI uri, Peer peer)
|
||||||
{
|
{
|
||||||
purgeDeadConnections();
|
purgeDeadConnections();
|
||||||
for (PeerConnection peerConnection: activeConnections)
|
for (PeerConnection peerConnection: activeConnections)
|
||||||
{
|
{
|
||||||
if (peerConnection.getUri() == uri)
|
if (peerConnection.getUri().equals(uri))
|
||||||
return peerConnection;
|
return peerConnection;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,19 +73,19 @@ public class ConnectionManager
|
||||||
|
|
||||||
for (URI address : peer.getAddresses())
|
for (URI address : peer.getAddresses())
|
||||||
{
|
{
|
||||||
try
|
PeerConnection connection = getNodeConnection(address, peer);
|
||||||
{
|
if (connection != null)
|
||||||
return getNodeConnection(address, peer);
|
return connection;
|
||||||
}
|
|
||||||
catch (IOException ex)
|
|
||||||
{
|
|
||||||
System.err.println("ConnectionManager: Couldn't create PeerConnection to " + address + " : " + ex.getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
System.err.println("ConnectionManager: Failed to create PeerConnection to " + peer + "!");
|
System.err.println("ConnectionManager: Failed to create PeerConnection to " + peer + "!");
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public CompletableFuture<Optional<PeerConnection>> getNodeConnectionAsync(Peer peer)
|
||||||
|
{
|
||||||
|
return CompletableFuture.supplyAsync(() -> Optional.ofNullable(getNodeConnection(peer)), Main.getInstance().getExecutor());
|
||||||
|
}
|
||||||
|
|
||||||
public void shutdown()
|
public void shutdown()
|
||||||
{
|
{
|
||||||
for (ConnectionBackend<?> backend : backends.values())
|
for (ConnectionBackend<?> backend : backends.values())
|
||||||
|
@ -123,9 +128,24 @@ public class ConnectionManager
|
||||||
|
|
||||||
public List<URI> getURIs()
|
public List<URI> getURIs()
|
||||||
{
|
{
|
||||||
return backends.values().stream()
|
List<URI> ret = new ArrayList<>();
|
||||||
.filter(ConnectionBackend::isListening)
|
for (String advertiseAddress : Main.getInstance().getArgs().getAdvertiseAddresses())
|
||||||
.flatMap(connectionBackend -> connectionBackend.getURIs().stream())
|
{
|
||||||
.toList();
|
try
|
||||||
|
{
|
||||||
|
URI uri = new URI(advertiseAddress);
|
||||||
|
ret.add(uri);
|
||||||
|
} catch (URISyntaxException e)
|
||||||
|
{
|
||||||
|
System.err.println("ERROR: " + advertiseAddress + " isn't a valid URI: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (ConnectionBackend<?> backend: backends.values())
|
||||||
|
{
|
||||||
|
if (!backend.isListening())
|
||||||
|
continue;
|
||||||
|
ret.addAll(backend.getURIs());
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package moe.nekojimi.friendcloud.network;
|
||||||
|
|
||||||
|
|
||||||
import com.google.protobuf.*;
|
import com.google.protobuf.*;
|
||||||
|
import moe.nekojimi.friendcloud.Controller;
|
||||||
import moe.nekojimi.friendcloud.FilePieceAccess;
|
import moe.nekojimi.friendcloud.FilePieceAccess;
|
||||||
import moe.nekojimi.friendcloud.Main;
|
import moe.nekojimi.friendcloud.Main;
|
||||||
import moe.nekojimi.friendcloud.ObjectChangeRecord;
|
import moe.nekojimi.friendcloud.ObjectChangeRecord;
|
||||||
|
@ -20,6 +21,7 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public abstract class PeerConnection extends Thread
|
public abstract class PeerConnection extends Thread
|
||||||
{
|
{
|
||||||
|
@ -27,22 +29,18 @@ public abstract class PeerConnection extends Thread
|
||||||
private ObjectID peerID = new ObjectID(0);
|
private ObjectID peerID = new ObjectID(0);
|
||||||
private long nextMessageId = 1;
|
private long nextMessageId = 1;
|
||||||
private final URI uri;
|
private final URI uri;
|
||||||
private long artificalDelayMs = 0;
|
private long artificalDelayMs;
|
||||||
|
|
||||||
private final Map<String, MessageHandler<?>> messageHandlers = new HashMap<>();
|
private final Map<String, MessageHandler<?>> messageHandlers = new HashMap<>();
|
||||||
|
|
||||||
public PeerConnection()
|
public PeerConnection(@NotNull URI uri)
|
||||||
{
|
|
||||||
this(null);
|
|
||||||
}
|
|
||||||
|
|
||||||
public PeerConnection(URI uri)
|
|
||||||
{
|
{
|
||||||
this.uri = uri;
|
this.uri = uri;
|
||||||
installDefaultMessageHandlers();
|
installDefaultMessageHandlers();
|
||||||
|
artificalDelayMs = Main.getInstance().getArgs().getArtificialLagMs();
|
||||||
}
|
}
|
||||||
|
|
||||||
public PeerConnection(URI uri, @NotNull ObjectID peerID)
|
public PeerConnection(@NotNull URI uri, @NotNull ObjectID peerID)
|
||||||
{
|
{
|
||||||
this(uri);
|
this(uri);
|
||||||
this.peerID = peerID;
|
this.peerID = peerID;
|
||||||
|
@ -58,19 +56,18 @@ public abstract class PeerConnection extends Thread
|
||||||
public synchronized <T> CompletableFuture<T> makeRequest(Request<?, T> request)
|
public synchronized <T> CompletableFuture<T> makeRequest(Request<?, T> request)
|
||||||
{
|
{
|
||||||
if (!isAlive())
|
if (!isAlive())
|
||||||
throw new IllegalStateException("Request made to PeerConnection that isn't running!");
|
throw new IllegalStateException("PeerConnection (" + getUri() + "): Request made to PeerConnection that isn't running!");
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
Message message = request.buildMessage();
|
Message message = request.buildMessage();
|
||||||
|
|
||||||
CommonMessages.FriendCloudMessage wrappedMessage = wrapMessage(message);
|
CommonMessages.FriendCloudMessage wrappedMessage = wrapMessage(message);
|
||||||
pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request);
|
|
||||||
sendMessage(wrappedMessage);
|
sendMessage(wrappedMessage);
|
||||||
|
pendingRequests.put(wrappedMessage.getHeader().getMessageId(), request);
|
||||||
return request.getFuture();
|
return request.getFuture();
|
||||||
} catch (Exception e)
|
} catch (Exception e)
|
||||||
{
|
{
|
||||||
System.err.println("Request failed!");
|
System.err.println("PeerConnection (" + getUri() + "): Request failed!");
|
||||||
e.printStackTrace(System.err);
|
e.printStackTrace(System.err);
|
||||||
return CompletableFuture.failedFuture(e);
|
return CompletableFuture.failedFuture(e);
|
||||||
}
|
}
|
||||||
|
@ -107,7 +104,7 @@ public abstract class PeerConnection extends Thread
|
||||||
|
|
||||||
private void replyWithError(CommonMessages.Error error, CommonMessages.MessageHeader replyHeader) throws IOException
|
private void replyWithError(CommonMessages.Error error, CommonMessages.MessageHeader replyHeader) throws IOException
|
||||||
{
|
{
|
||||||
System.err.println("Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
|
System.err.println("PeerConnection (" + getUri() + "): Sending error reply: " + error.name() + " to message ID " + replyHeader.getReplyToMessageId());
|
||||||
CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build();
|
CommonMessages.ErrorMessage errorMessage = CommonMessages.ErrorMessage.newBuilder().setError(error).build();
|
||||||
sendMessage(wrapMessage(errorMessage, replyHeader));
|
sendMessage(wrapMessage(errorMessage, replyHeader));
|
||||||
}
|
}
|
||||||
|
@ -118,7 +115,7 @@ public abstract class PeerConnection extends Thread
|
||||||
Any body = message.getBody();
|
Any body = message.getBody();
|
||||||
long replyToMessageId = header.getReplyToMessageId();
|
long replyToMessageId = header.getReplyToMessageId();
|
||||||
ObjectID senderID = new ObjectID(header.getSenderId());
|
ObjectID senderID = new ObjectID(header.getSenderId());
|
||||||
System.out.println("Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
|
System.out.println("PeerConnection (" + getUri() + "): Received message! type=" + body.getTypeUrl() + ", sender=" + senderID + ", ID=" + header.getMessageId() + ", reply_to=" + replyToMessageId);
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
|
@ -127,7 +124,7 @@ public abstract class PeerConnection extends Thread
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
System.err.println("WARNING: artifical lag activated! Waiting " + artificalDelayMs + "ms...");
|
System.err.println("WARNING: artificial lag activated! Waiting " + artificalDelayMs + "ms...");
|
||||||
Thread.sleep(artificalDelayMs);
|
Thread.sleep(artificalDelayMs);
|
||||||
} catch (InterruptedException e)
|
} catch (InterruptedException e)
|
||||||
{
|
{
|
||||||
|
@ -137,9 +134,15 @@ public abstract class PeerConnection extends Thread
|
||||||
|
|
||||||
if (!senderID.isNull())
|
if (!senderID.isNull())
|
||||||
{
|
{
|
||||||
if (peerID.isNull())
|
Peer localPeer = Main.getInstance().getModel().getLocalPeer();
|
||||||
|
if (localPeer != null && Objects.equals(senderID, localPeer.getObjectID()))
|
||||||
{
|
{
|
||||||
System.out.println("PeerConnection: Identified sender as " + senderID);
|
System.err.println("PeerConnection (" + getUri() + "): Connected to ourselves, terminating connection!");
|
||||||
|
shutdown();
|
||||||
|
}
|
||||||
|
else if (peerID.isNull())
|
||||||
|
{
|
||||||
|
System.out.println("PeerConnection (" + getUri() + "): Identified sender as " + senderID);
|
||||||
peerID = senderID;
|
peerID = senderID;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -184,7 +187,7 @@ public abstract class PeerConnection extends Thread
|
||||||
|
|
||||||
private void handleErrorToUnsolicitedMessage(CommonMessages.MessageHeader header, CommonMessages.ErrorMessage body)
|
private void handleErrorToUnsolicitedMessage(CommonMessages.MessageHeader header, CommonMessages.ErrorMessage body)
|
||||||
{
|
{
|
||||||
throw new RuntimeException("Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
|
throw new RuntimeException("PeerConnection (" + getUri() + "): Our message ID " + header.getReplyToMessageId() + " caused a remote error: " + body.getError().name());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||||
|
@ -200,7 +203,7 @@ public abstract class PeerConnection extends Thread
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
System.err.println("PeerConnection: don't have a MessageHandler for message type " + typeUrl + "!");
|
System.err.println("PeerConnection (" + getUri() + "): don't have a MessageHandler for message type " + typeUrl + "!");
|
||||||
replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header);
|
replyWithError(CommonMessages.Error.ERROR_MESSAGE_BODY_UNKNOWN, header);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -208,7 +211,7 @@ public abstract class PeerConnection extends Thread
|
||||||
private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException
|
private void handleReplyMessage(CommonMessages.MessageHeader header, Any body) throws InvalidProtocolBufferException, ReplyWithErrorException
|
||||||
{
|
{
|
||||||
long replyToMessageId = header.getReplyToMessageId();
|
long replyToMessageId = header.getReplyToMessageId();
|
||||||
System.out.println("Received reply to message ID " + replyToMessageId);
|
System.out.println("PeerConnection (" + getUri() + "): Received reply to message ID " + replyToMessageId);
|
||||||
Request<?, ?> request = pendingRequests.get(replyToMessageId);
|
Request<?, ?> request = pendingRequests.get(replyToMessageId);
|
||||||
boolean doneWithRequest = request.handleReply(body);
|
boolean doneWithRequest = request.handleReply(body);
|
||||||
if (doneWithRequest)
|
if (doneWithRequest)
|
||||||
|
@ -236,14 +239,17 @@ public abstract class PeerConnection extends Thread
|
||||||
|
|
||||||
private void installDefaultMessageHandlers()
|
private void installDefaultMessageHandlers()
|
||||||
{
|
{
|
||||||
|
Controller controller = Main.getInstance().getModel();
|
||||||
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class)
|
installMessageHandler(new MessageHandler<>(ObjectStatements.ObjectListRequest.class)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException
|
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectListRequest message) throws IOException
|
||||||
{
|
{
|
||||||
List<NetworkObject> objects = Main.getInstance().getModel().listObjects(new HashSet<>(message.getTypesList()));
|
List<NetworkObject> objects = controller.listObjects(new HashSet<>(message.getTypesList()));
|
||||||
|
|
||||||
ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
|
ObjectStatements.ObjectList.Builder objectList = ObjectStatements.ObjectList.newBuilder();
|
||||||
|
ObjectChangeRecord currentChangeRecord = controller.getLocalData().getCurrentChangeRecord();
|
||||||
|
objectList.setChangeHead(currentChangeRecord == null ? 0L : currentChangeRecord.getChangeID());
|
||||||
for (NetworkObject object : objects)
|
for (NetworkObject object : objects)
|
||||||
{
|
{
|
||||||
objectList.addStates(object.buildObjectState());
|
objectList.addStates(object.buildObjectState());
|
||||||
|
@ -263,7 +269,7 @@ public abstract class PeerConnection extends Thread
|
||||||
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
|
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
|
||||||
}
|
}
|
||||||
|
|
||||||
NetworkFile networkFile = Main.getInstance().getModel().getObject(new ObjectID(message.getFileId()));
|
NetworkFile networkFile = controller.getObject(new ObjectID(message.getFileId()));
|
||||||
if (networkFile == null)
|
if (networkFile == null)
|
||||||
{
|
{
|
||||||
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
|
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);
|
||||||
|
@ -273,7 +279,7 @@ public abstract class PeerConnection extends Thread
|
||||||
{
|
{
|
||||||
int startIndex = message.getStartPieceIndex();
|
int startIndex = message.getStartPieceIndex();
|
||||||
int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1;
|
int endIndex = (message.getStartPieceIndex() + message.getPieceCount()) - 1;
|
||||||
System.out.println("Been asked for pieces from " + startIndex + " to " + endIndex);
|
System.out.println("PeerConnection (" + getUri() + "): Been asked for pieces from " + startIndex + " to " + endIndex);
|
||||||
List<Long> indices = new ArrayList<>();
|
List<Long> indices = new ArrayList<>();
|
||||||
for (int index = startIndex; index <= endIndex; index += message.getPieceMod())
|
for (int index = startIndex; index <= endIndex; index += message.getPieceMod())
|
||||||
{
|
{
|
||||||
|
@ -286,7 +292,7 @@ public abstract class PeerConnection extends Thread
|
||||||
byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index));
|
byte[] buffer = filePieceAccess.readPiece(Math.toIntExact(index));
|
||||||
if (buffer != null)
|
if (buffer != null)
|
||||||
{
|
{
|
||||||
System.out.println("Replying to file piece request with piece " + index);
|
System.out.println("PeerConnection (" + getUri() + "): Replying to file piece request with piece " + index);
|
||||||
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
|
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
|
||||||
.setPieceIndex(Math.toIntExact(index))
|
.setPieceIndex(Math.toIntExact(index))
|
||||||
.setFileId(networkFile.getObjectID().toLong())
|
.setFileId(networkFile.getObjectID().toLong())
|
||||||
|
@ -296,7 +302,7 @@ public abstract class PeerConnection extends Thread
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
System.err.println("Don't have requested piece " + index + "!");
|
System.err.println("PeerConnection (" + getUri() + "): Don't have requested piece " + index + "!");
|
||||||
replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
|
replyWithError(CommonMessages.Error.ERROR_PIECE_NOT_POSSESSED, header);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -310,8 +316,8 @@ public abstract class PeerConnection extends Thread
|
||||||
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException
|
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeRequest message) throws IOException
|
||||||
{
|
{
|
||||||
List<Long> changesSinceList = message.getChangesSinceList();
|
List<Long> changesSinceList = message.getChangesSinceList();
|
||||||
System.out.println("PeerConnection: Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString));
|
System.out.println("PeerConnection (" + getUri() + "): Been asked for all changes since " + changesSinceList.stream().map(Long::toHexString).collect(Collectors.toSet()));
|
||||||
Set<ObjectChangeRecord> changes = Main.getInstance().getModel().findChangesSince(changesSinceList);
|
Set<ObjectChangeRecord> changes = controller.findChangesSince(changesSinceList);
|
||||||
if (changes == null)
|
if (changes == null)
|
||||||
{
|
{
|
||||||
replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header);
|
replyWithError(CommonMessages.Error.ERROR_END_OF_HISTORY, header);
|
||||||
|
@ -323,7 +329,7 @@ public abstract class PeerConnection extends Thread
|
||||||
{
|
{
|
||||||
reply.addChangeMessages(change.buildObjectChangeMessage());
|
reply.addChangeMessages(change.buildObjectChangeMessage());
|
||||||
}
|
}
|
||||||
System.out.println("PeerConnection: Replying with " + reply.getChangeMessagesCount() + " changes");
|
System.out.println("PeerConnection (" + getUri() + "): Replying with " + reply.getChangeMessagesCount() + " changes");
|
||||||
sendMessage(wrapMessage(reply.build(), header));
|
sendMessage(wrapMessage(reply.build(), header));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -334,19 +340,26 @@ public abstract class PeerConnection extends Thread
|
||||||
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message)
|
protected void handle(CommonMessages.MessageHeader header, ObjectStatements.ObjectChangeMessage message)
|
||||||
{
|
{
|
||||||
ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message);
|
ObjectChangeRecord record = ObjectChangeRecord.createFromChangeMessage(message);
|
||||||
Main.getInstance().getModel().applyChangeRecord(record);
|
controller.applyChangeRecord(record);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class)
|
installMessageHandler(new MessageHandler<>(CommonMessages.CheckInMessage.class)
|
||||||
{
|
{
|
||||||
@Override
|
@Override
|
||||||
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message)
|
protected void handle(CommonMessages.MessageHeader header, CommonMessages.CheckInMessage message) throws IOException
|
||||||
{
|
{
|
||||||
List<Long> remoteChangeHeads = message.getCurrentChangeHeadsList();
|
Peer peer = controller.getObject(peerID);
|
||||||
|
if (peer != null)
|
||||||
|
{
|
||||||
|
peer.setLastKnownChangeID(message.getCurrentChange());
|
||||||
|
controller.objectChanged(peer);
|
||||||
|
}
|
||||||
|
|
||||||
|
Set<Long> remoteChangeHeads = new HashSet<>(message.getCurrentChangeHeadsList());
|
||||||
boolean potentialNewChanges = false;
|
boolean potentialNewChanges = false;
|
||||||
for (long remoteChangeHead : remoteChangeHeads)
|
for (long remoteChangeHead : remoteChangeHeads)
|
||||||
{
|
{
|
||||||
boolean exists = Main.getInstance().getModel().getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
|
boolean exists = controller.getDataStore().getDAOForClass(ObjectChangeRecord.class).exists(remoteChangeHead);
|
||||||
if (!exists)
|
if (!exists)
|
||||||
{
|
{
|
||||||
potentialNewChanges = true;
|
potentialNewChanges = true;
|
||||||
|
@ -355,10 +368,30 @@ public abstract class PeerConnection extends Thread
|
||||||
}
|
}
|
||||||
if (potentialNewChanges)
|
if (potentialNewChanges)
|
||||||
{
|
{
|
||||||
PullChangesTask task = new PullChangesTask(Set.of(Main.getInstance().getModel().getObject(peerID)));
|
if (peer != null)
|
||||||
|
{
|
||||||
|
PullChangesTask task = new PullChangesTask(Set.of(peer));
|
||||||
Main.getInstance().getExecutor().submit(task);
|
Main.getInstance().getExecutor().submit(task);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Set<Long> changeHeadIDs = controller.getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet());
|
||||||
|
// if there's no new changes then we know all the changes that the remote has
|
||||||
|
// so if they're not the same as our latest changes then they don't know about something we do
|
||||||
|
// so send them a checkin
|
||||||
|
boolean remoteOutOfDate = !changeHeadIDs.equals(remoteChangeHeads);
|
||||||
|
|
||||||
|
if (remoteOutOfDate)
|
||||||
|
{
|
||||||
|
CommonMessages.CheckInMessage checkInMessage = CommonMessages.CheckInMessage.newBuilder()
|
||||||
|
.addAllCurrentChangeHeads(changeHeadIDs)
|
||||||
|
|
||||||
|
.build();
|
||||||
|
sendMessage(wrapMessage(checkInMessage));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,11 +13,10 @@ import moe.nekojimi.friendcloud.objects.ObjectID;
|
||||||
import moe.nekojimi.friendcloud.objects.Peer;
|
import moe.nekojimi.friendcloud.objects.Peer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.*;
|
||||||
import java.net.ServerSocket;
|
|
||||||
import java.net.Socket;
|
|
||||||
import java.net.URI;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Enumeration;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -30,22 +29,57 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
|
||||||
List<URI> getURIs()
|
List<URI> getURIs()
|
||||||
{
|
{
|
||||||
List<URI> ret = new ArrayList<>();
|
List<URI> ret = new ArrayList<>();
|
||||||
ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort()));
|
InetAddress inetAddress = serverSocket.getInetAddress();
|
||||||
ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort()));
|
addAddressToList(inetAddress, ret);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
|
||||||
|
for (Iterator<NetworkInterface> it = networkInterfaces.asIterator(); it.hasNext(); )
|
||||||
|
{
|
||||||
|
NetworkInterface networkInterface = it.next();
|
||||||
|
if (networkInterface.isLoopback())
|
||||||
|
continue;
|
||||||
|
if (!networkInterface.isUp())
|
||||||
|
continue;
|
||||||
|
for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses())
|
||||||
|
addAddressToList(interfaceAddress.getAddress(), ret);
|
||||||
|
}
|
||||||
|
} catch (SocketException e)
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
if (mappedPort != null)
|
if (mappedPort != null)
|
||||||
{
|
{
|
||||||
ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort()));
|
addAddressToList(mappedPort.getExternalAddress(), ret);
|
||||||
ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort()));
|
|
||||||
}
|
}
|
||||||
|
System.out.println("Local addresses: " + ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* utility method for processing URIs
|
||||||
|
*/
|
||||||
|
private void addAddressToList(InetAddress address, List<URI> list)
|
||||||
|
{
|
||||||
|
if (address.isAnyLocalAddress())
|
||||||
|
return;
|
||||||
|
String hostAddress = address.getHostAddress();
|
||||||
|
if (!hostAddress.isEmpty())
|
||||||
|
list.add(makeURI(hostAddress, serverSocket.getLocalPort()));
|
||||||
|
String hostName = address.getHostName();
|
||||||
|
if (!hostName.isEmpty() && !hostName.equals(hostAddress))
|
||||||
|
list.add(makeURI(hostName, serverSocket.getLocalPort()));
|
||||||
|
}
|
||||||
|
|
||||||
public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException
|
public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException
|
||||||
{
|
{
|
||||||
super("TCP Listen Thread", "tcp", connectionManager);
|
super("TCP Listen Thread", "tcp", connectionManager);
|
||||||
serverSocket = new ServerSocket(port);
|
serverSocket = new ServerSocket(port);
|
||||||
|
|
||||||
// setupIGP(port);
|
if (!Main.getInstance().getArgs().isNoUpnp())
|
||||||
|
setupIGP(port);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setupIGP(int port)
|
private void setupIGP(int port)
|
||||||
|
@ -60,6 +94,9 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
|
||||||
// Discover port forwarding devices and take the first one found
|
// Discover port forwarding devices and take the first one found
|
||||||
System.out.println("Discovering port mappers...");
|
System.out.println("Discovering port mappers...");
|
||||||
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);
|
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);
|
||||||
|
|
||||||
|
if (mappers.isEmpty())
|
||||||
|
return;
|
||||||
;
|
;
|
||||||
PortMapper mapper = mappers.getFirst();
|
PortMapper mapper = mappers.getFirst();
|
||||||
System.out.println("Got mapper " + mapper + ", mapping port...");
|
System.out.println("Got mapper " + mapper + ", mapping port...");
|
||||||
|
@ -102,6 +139,7 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
|
||||||
protected TCPPeerConnection getConnection() throws IOException
|
protected TCPPeerConnection getConnection() throws IOException
|
||||||
{
|
{
|
||||||
Socket socket = serverSocket.accept();
|
Socket socket = serverSocket.accept();
|
||||||
|
System.out.println("TCPConnectionBackend: Received connection from " + socket.getRemoteSocketAddress());
|
||||||
return new TCPPeerConnection(socket);
|
return new TCPPeerConnection(socket);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,12 +2,12 @@ package moe.nekojimi.friendcloud.network;
|
||||||
|
|
||||||
import moe.nekojimi.friendcloud.objects.ObjectID;
|
import moe.nekojimi.friendcloud.objects.ObjectID;
|
||||||
import moe.nekojimi.friendcloud.protos.CommonMessages;
|
import moe.nekojimi.friendcloud.protos.CommonMessages;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.net.Socket;
|
import java.net.*;
|
||||||
import java.net.URI;
|
|
||||||
|
|
||||||
public class TCPPeerConnection extends PeerConnection
|
public class TCPPeerConnection extends PeerConnection
|
||||||
{
|
{
|
||||||
|
@ -18,24 +18,37 @@ public class TCPPeerConnection extends PeerConnection
|
||||||
{
|
{
|
||||||
super(tcpURL, peer);
|
super(tcpURL, peer);
|
||||||
socket = new Socket(tcpURL.getHost(), tcpURL.getPort());
|
socket = new Socket(tcpURL.getHost(), tcpURL.getPort());
|
||||||
System.out.println("TCP Connection: connected to " + tcpURL + " OK!");
|
System.out.println("TCPPeerConnection: connected to " + tcpURL + " OK!");
|
||||||
}
|
}
|
||||||
|
|
||||||
public TCPPeerConnection(Socket openSocket)
|
public TCPPeerConnection(Socket openSocket)
|
||||||
{
|
{
|
||||||
super();
|
super(getSocketURI(openSocket.getInetAddress(), openSocket.getPort()));
|
||||||
socket = openSocket;
|
socket = openSocket;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static URI getSocketURI(@NotNull InetAddress address, int port)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
return new URI("tcp://" + address.getHostAddress() + ":" + port);
|
||||||
|
} catch (URISyntaxException e)
|
||||||
|
{
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run()
|
public void run()
|
||||||
{
|
{
|
||||||
super.run();
|
super.run();
|
||||||
|
|
||||||
try
|
try(InputStream inputStream = socket.getInputStream())
|
||||||
{
|
{
|
||||||
InputStream inputStream = socket.getInputStream();
|
socket.setKeepAlive(true);
|
||||||
while (!socket.isClosed())
|
socket.setSoTimeout(keepAliveTimeS * 1000);
|
||||||
|
|
||||||
|
while (!socket.isClosed() && !socket.isInputShutdown())
|
||||||
{
|
{
|
||||||
CommonMessages.FriendCloudMessage message = CommonMessages.FriendCloudMessage.parseDelimitedFrom(inputStream);
|
CommonMessages.FriendCloudMessage message = CommonMessages.FriendCloudMessage.parseDelimitedFrom(inputStream);
|
||||||
// Any any = Any.parseDelimitedFrom(inputStream);
|
// Any any = Any.parseDelimitedFrom(inputStream);
|
||||||
|
@ -45,23 +58,39 @@ public class TCPPeerConnection extends PeerConnection
|
||||||
messageReceived(message);
|
messageReceived(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception ex)
|
}
|
||||||
|
catch (SocketTimeoutException ex)
|
||||||
|
{
|
||||||
|
System.out.println("TCPPeerConnection (" + getUri() + "): Read timed out, closing connection.");
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
// fuck
|
// fuck
|
||||||
ex.printStackTrace(System.err);
|
ex.printStackTrace(System.err);
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("TCP Connection: connection closed");
|
System.out.println("TCPPeerConnection (" + getUri() + "): connection closed");
|
||||||
|
shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException
|
protected void sendMessage(CommonMessages.FriendCloudMessage message) throws IOException
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
OutputStream outputStream = socket.getOutputStream();
|
OutputStream outputStream = socket.getOutputStream();
|
||||||
System.out.println("Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
|
System.out.println("TCPPeerConnection (" + getUri() + "): Sending message " + message.getHeader().getMessageId() + ": " + message.getBody().getTypeUrl());
|
||||||
message.writeDelimitedTo(outputStream);
|
message.writeDelimitedTo(outputStream);
|
||||||
outputStream.flush();
|
outputStream.flush();
|
||||||
}
|
}
|
||||||
|
catch (SocketException ex)
|
||||||
|
{
|
||||||
|
// handle this type of exception by closing the connection
|
||||||
|
System.err.println("TCPPeerConnection (" + getUri() + "): Failed to send, closing connection:" + ex.getMessage());
|
||||||
|
shutdown();
|
||||||
|
throw ex; // upper layer needs to know it failed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void shutdown()
|
public synchronized void shutdown()
|
||||||
|
@ -69,9 +98,10 @@ public class TCPPeerConnection extends PeerConnection
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
socket.close();
|
socket.close();
|
||||||
|
interrupt();
|
||||||
} catch (IOException e)
|
} catch (IOException e)
|
||||||
{
|
{
|
||||||
System.err.println("TCPPeerConnection: failed to shut down!");
|
System.err.println("TCPPeerConnection (" + getUri() + "): failed to shut down!");
|
||||||
e.printStackTrace(System.err);
|
e.printStackTrace(System.err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, List<NetworkObject>>
|
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, ObjectListRequest.ObjectListRequestResult>
|
||||||
{
|
{
|
||||||
private final Set<ObjectStatements.ObjectType> types;
|
private final Set<ObjectStatements.ObjectType> types;
|
||||||
|
|
||||||
|
@ -39,7 +39,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
|
||||||
|
|
||||||
// System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
|
// System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
|
||||||
|
|
||||||
List<NetworkObject> ret = new ArrayList<>();
|
ObjectListRequestResult ret = new ObjectListRequestResult();
|
||||||
|
|
||||||
|
ret.objects = new ArrayList<>();
|
||||||
|
ret.changeID = objectList.getChangeHead();
|
||||||
|
|
||||||
for (ObjectStatements.ObjectState objectState : objectList.getStatesList())
|
for (ObjectStatements.ObjectState objectState : objectList.getStatesList())
|
||||||
{
|
{
|
||||||
|
@ -47,7 +50,7 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
|
||||||
System.out.println("ObjectListRequest: Received state of object " + objectID);
|
System.out.println("ObjectListRequest: Received state of object " + objectID);
|
||||||
NetworkObject object = NetworkObject.createByID(objectID);
|
NetworkObject object = NetworkObject.createByID(objectID);
|
||||||
object.updateFromStateMessage(objectState);
|
object.updateFromStateMessage(objectState);
|
||||||
ret.add(object);
|
ret.objects.add(object);
|
||||||
}
|
}
|
||||||
|
|
||||||
future.complete(ret);
|
future.complete(ret);
|
||||||
|
@ -63,4 +66,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class ObjectListRequestResult
|
||||||
|
{
|
||||||
|
public List<NetworkObject> objects;
|
||||||
|
public long changeID;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,7 +132,9 @@ public class NetworkFile extends NetworkFSNode
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retreives the local file used for storing file data, or if it doesn't exist, creates it.
|
* Retreives the local file used for storing file data, or if it doesn't exist, creates it.
|
||||||
|
*
|
||||||
* @return a File object for the (potentially new) file storage.
|
* @return a File object for the (potentially new) file storage.
|
||||||
|
*
|
||||||
* @throws IOException if the file could not be created due to an IO error.
|
* @throws IOException if the file could not be created due to an IO error.
|
||||||
*/
|
*/
|
||||||
public File getOrCreateLocalFile() throws IOException
|
public File getOrCreateLocalFile() throws IOException
|
||||||
|
@ -146,7 +148,9 @@ public class NetworkFile extends NetworkFSNode
|
||||||
{
|
{
|
||||||
localFile = new File(tempDirectory, getName());
|
localFile = new File(tempDirectory, getName());
|
||||||
// localFile = File.createTempFile("FriendCloud", getNetworkPath());
|
// localFile = File.createTempFile("FriendCloud", getNetworkPath());
|
||||||
localFile.createNewFile();
|
boolean created = localFile.createNewFile();
|
||||||
|
if (created)
|
||||||
|
System.out.println("NetworkFile: Created local file " + localFile.getAbsolutePath() + " for " + getObjectID());
|
||||||
}
|
}
|
||||||
return localFile;
|
return localFile;
|
||||||
}
|
}
|
||||||
|
@ -237,21 +241,19 @@ public class NetworkFile extends NetworkFSNode
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for a certain piece of the file to become available, or for a timeout to expire.
|
* Waits for a certain piece of the file to become available, or for a timeout to expire.
|
||||||
|
*
|
||||||
* @param pieceIdx the index of the piece to wait for. Note: <=0 means "don't wait at all", not "wait forever".
|
* @param pieceIdx the index of the piece to wait for. Note: <=0 means "don't wait at all", not "wait forever".
|
||||||
* @param timeoutMs the amount of time, in milliseconds, to wait.
|
* @param timeoutMs the amount of time, in milliseconds, to wait.
|
||||||
* @return true if the piece is available (haspiece(pieceIdx) will return true), false if the timeout was reached.
|
* @throws InterruptedException if the waiting was interrupted.
|
||||||
|
*
|
||||||
|
* @return true if the piece is available (hasPiece(pieceIdx) will return true), false if the timeout was reached.
|
||||||
*/
|
*/
|
||||||
public boolean waitForFilePiece(int pieceIdx, long timeoutMs)
|
public boolean waitForFilePiece(int pieceIdx, long timeoutMs) throws InterruptedException
|
||||||
{
|
{
|
||||||
synchronized (pieces)
|
synchronized (pieces)
|
||||||
{
|
{
|
||||||
if (hasPiece(pieceIdx))
|
if (hasPiece(pieceIdx))
|
||||||
return true;
|
return true;
|
||||||
while (timeoutMs > 0)
|
|
||||||
{
|
|
||||||
long startTime = System.currentTimeMillis();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
System.out.println("NetworkFile: waiting " + timeoutMs + "ms for piece " + pieceIdx + " of file " + name);
|
System.out.println("NetworkFile: waiting " + timeoutMs + "ms for piece " + pieceIdx + " of file " + name);
|
||||||
pieces.wait(timeoutMs);
|
pieces.wait(timeoutMs);
|
||||||
if (hasPiece(pieceIdx))
|
if (hasPiece(pieceIdx))
|
||||||
|
@ -259,13 +261,6 @@ public class NetworkFile extends NetworkFSNode
|
||||||
System.out.println("NetworkFile: got piece we were waiting for.");
|
System.out.println("NetworkFile: got piece we were waiting for.");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ignored)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
long endTime = System.currentTimeMillis();
|
|
||||||
long timeWaited = (endTime - startTime);
|
|
||||||
timeoutMs -= timeWaited;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
System.err.println("Timed out waiting for piece!");
|
System.err.println("Timed out waiting for piece!");
|
||||||
return false;
|
return false;
|
||||||
|
@ -286,11 +281,17 @@ public class NetworkFile extends NetworkFSNode
|
||||||
|
|
||||||
public enum StorageType
|
public enum StorageType
|
||||||
{
|
{
|
||||||
/** The file will be stored as a complete file in the storage directory under it's own name and file path. */
|
/**
|
||||||
|
* The file will be stored as a complete file in the storage directory under it's own name and file path.
|
||||||
|
*/
|
||||||
COMPLETE,
|
COMPLETE,
|
||||||
/** The file will be stored as a complete file in a temporary directory. It will be deleted when the computer restarts. */
|
/**
|
||||||
|
* The file will be stored as a complete file in a temporary directory. It will be deleted when the computer restarts.
|
||||||
|
*/
|
||||||
TEMPORARY_COMPLETE,
|
TEMPORARY_COMPLETE,
|
||||||
/** Each piece of the file will be stored permanently in an individual piece file, in the pieces subdirectory. */
|
/**
|
||||||
|
* Each piece of the file will be stored permanently in an individual piece file, in the pieces subdirectory.
|
||||||
|
*/
|
||||||
PIECES,
|
PIECES,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package moe.nekojimi.friendcloud.objects;
|
package moe.nekojimi.friendcloud.objects;
|
||||||
|
|
||||||
|
import moe.nekojimi.friendcloud.Util;
|
||||||
import moe.nekojimi.friendcloud.protos.ObjectStatements;
|
import moe.nekojimi.friendcloud.protos.ObjectStatements;
|
||||||
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -13,6 +14,8 @@ public class Peer extends NetworkObject
|
||||||
private String userName = "";
|
private String userName = "";
|
||||||
private String systemName = "";
|
private String systemName = "";
|
||||||
|
|
||||||
|
private long lastKnownChangeID = 0L;
|
||||||
|
|
||||||
public Peer(ObjectID objectID)
|
public Peer(ObjectID objectID)
|
||||||
{
|
{
|
||||||
super(objectID);
|
super(objectID);
|
||||||
|
@ -71,6 +74,7 @@ public class Peer extends NetworkObject
|
||||||
ret.put("userName", userName);
|
ret.put("userName", userName);
|
||||||
ret.put("systemName", systemName);
|
ret.put("systemName", systemName);
|
||||||
ret.put("addresses", addresses);
|
ret.put("addresses", addresses);
|
||||||
|
ret.put("lastKnownChangeID", lastKnownChangeID);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,6 +85,7 @@ public class Peer extends NetworkObject
|
||||||
systemName = map.get("systemName").toString();
|
systemName = map.get("systemName").toString();
|
||||||
addresses.clear();
|
addresses.clear();
|
||||||
addresses.addAll((Collection<? extends URI>) map.get("addresses"));
|
addresses.addAll((Collection<? extends URI>) map.get("addresses"));
|
||||||
|
lastKnownChangeID = Util.unconditionalNumberToLong(map.getOrDefault("lastKnownChangeID", 0L));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addAddress(URI address)
|
public void addAddress(URI address)
|
||||||
|
@ -129,6 +134,16 @@ public class Peer extends NetworkObject
|
||||||
this.systemName = systemName;
|
this.systemName = systemName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getLastKnownChangeID()
|
||||||
|
{
|
||||||
|
return lastKnownChangeID;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setLastKnownChangeID(long lastKnownChangeID)
|
||||||
|
{
|
||||||
|
this.lastKnownChangeID = lastKnownChangeID;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getFriendlyName()
|
public String getFriendlyName()
|
||||||
{
|
{
|
||||||
|
|
|
@ -20,7 +20,7 @@ public class FileDownloadTask implements RunnableFuture<File>
|
||||||
private final NetworkFile file;
|
private final NetworkFile file;
|
||||||
private final ConnectionManager connectionManager;
|
private final ConnectionManager connectionManager;
|
||||||
|
|
||||||
private final long timeoutPerPieceMs = 1_000;
|
private final long timeoutPerPieceMs = 30_000;
|
||||||
private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128;
|
private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128;
|
||||||
|
|
||||||
private final SortedSet<Integer> missingPieceIndices = new TreeSet<>();
|
private final SortedSet<Integer> missingPieceIndices = new TreeSet<>();
|
||||||
|
@ -65,23 +65,28 @@ public class FileDownloadTask implements RunnableFuture<File>
|
||||||
Peer selfPeer = Main.getInstance().getModel().getLocalPeer();
|
Peer selfPeer = Main.getInstance().getModel().getLocalPeer();
|
||||||
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
|
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
|
||||||
{
|
{
|
||||||
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces.");
|
System.out.println("FileDownloadTask: Need to get " + missingPieceIndices.size() + " missing pieces.");
|
||||||
|
|
||||||
// Map<Peer, PeerFileState> fileStates = file.getFileStates();
|
// Map<Peer, PeerFileState> fileStates = file.getFileStates();
|
||||||
|
|
||||||
// determine what nodes we can connect to
|
// determine what nodes we can connect to
|
||||||
List<PeerConnection> connections = new ArrayList<>();
|
List<PeerConnection> connections = new ArrayList<>(file.getPeersWithCopy().stream()
|
||||||
for (Peer peer : file.getPeersWithCopy())
|
.filter(peer -> peer != selfPeer)
|
||||||
{
|
.map(connectionManager::getNodeConnectionAsync)
|
||||||
if (peer == selfPeer)
|
.map(CompletableFuture::join)
|
||||||
continue; // yeah that's us dipshit
|
.flatMap(Optional::stream)
|
||||||
PeerConnection connection = connectionManager.getNodeConnection(peer);
|
.toList());
|
||||||
if (connection != null)
|
// for (Peer peer : file.getPeersWithCopy())
|
||||||
{
|
// {
|
||||||
System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
|
// if (peer == selfPeer)
|
||||||
connections.add(connection);
|
// continue; // yeah that's us dipshit
|
||||||
}
|
// PeerConnection connection = connectionManager.getNodeConnection(peer);
|
||||||
}
|
// if (connection != null)
|
||||||
|
// {
|
||||||
|
// System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
|
||||||
|
// connections.add(connection);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
// connectionLine = "Connected to " + connections.size() + " peers.";
|
// connectionLine = "Connected to " + connections.size() + " peers.";
|
||||||
// notification.setBody(connectionLine + "\n" + progressLine);
|
// notification.setBody(connectionLine + "\n" + progressLine);
|
||||||
|
@ -90,6 +95,7 @@ public class FileDownloadTask implements RunnableFuture<File>
|
||||||
// shuffle the connections list
|
// shuffle the connections list
|
||||||
Collections.shuffle(connections);
|
Collections.shuffle(connections);
|
||||||
|
|
||||||
|
|
||||||
if (connections.isEmpty())
|
if (connections.isEmpty())
|
||||||
{
|
{
|
||||||
System.err.println("FileDownloadTask: No peers have the file, download failed!");
|
System.err.println("FileDownloadTask: No peers have the file, download failed!");
|
||||||
|
@ -98,6 +104,8 @@ public class FileDownloadTask implements RunnableFuture<File>
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
System.out.println("FileDownloadTask: Will download from " + connections.size() + " peers: " + connections.stream().map(PeerConnection::getUri).toList());
|
||||||
|
|
||||||
// find a continuous run of pieces to download
|
// find a continuous run of pieces to download
|
||||||
// TODO: allow for runs with regular gaps (e.g. every 2) to account for previous failed download attempts
|
// TODO: allow for runs with regular gaps (e.g. every 2) to account for previous failed download attempts
|
||||||
int runStart = -1;
|
int runStart = -1;
|
||||||
|
@ -156,7 +164,7 @@ public class FileDownloadTask implements RunnableFuture<File>
|
||||||
|
|
||||||
if (file.getDownloadPercentage() >= 100.0)
|
if (file.getDownloadPercentage() >= 100.0)
|
||||||
{
|
{
|
||||||
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID()))
|
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(selfPeer.getObjectID()))
|
||||||
{
|
{
|
||||||
transaction.addObjectBeforeChange(file);
|
transaction.addObjectBeforeChange(file);
|
||||||
file.addPeerWithCopy(selfPeer);
|
file.addPeerWithCopy(selfPeer);
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class JoinNetworkTask implements Runnable
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
// create our local peer object
|
// create our local peer object
|
||||||
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID))
|
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(peerID))
|
||||||
{
|
{
|
||||||
// create and submit our Peer object if it doesn't exist
|
// create and submit our Peer object if it doesn't exist
|
||||||
Peer selfPeer = controller.getLocalData().getLocalPeer();
|
Peer selfPeer = controller.getLocalData().getLocalPeer();
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package moe.nekojimi.friendcloud.tasks;
|
package moe.nekojimi.friendcloud.tasks;
|
||||||
|
|
||||||
import moe.nekojimi.friendcloud.Main;
|
import moe.nekojimi.friendcloud.Main;
|
||||||
|
import moe.nekojimi.friendcloud.ObjectChangeRecord;
|
||||||
import moe.nekojimi.friendcloud.Util;
|
import moe.nekojimi.friendcloud.Util;
|
||||||
import moe.nekojimi.friendcloud.network.PeerConnection;
|
import moe.nekojimi.friendcloud.network.PeerConnection;
|
||||||
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
|
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
|
||||||
|
@ -31,6 +32,8 @@ public class PullStateTask implements Runnable
|
||||||
System.out.println("PullStateTask: Pulling state from peers...");
|
System.out.println("PullStateTask: Pulling state from peers...");
|
||||||
Set<PeerConnection> connections = new HashSet<>();
|
Set<PeerConnection> connections = new HashSet<>();
|
||||||
|
|
||||||
|
// TODO: pull changes first, then select the peer(s) with the latest changes to pull state from
|
||||||
|
|
||||||
for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers())
|
for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers())
|
||||||
{
|
{
|
||||||
String[] split = knownPeerAddress.split(":");
|
String[] split = knownPeerAddress.split(":");
|
||||||
|
@ -50,7 +53,7 @@ public class PullStateTask implements Runnable
|
||||||
{
|
{
|
||||||
connections.add(nodeConnection);
|
connections.add(nodeConnection);
|
||||||
}
|
}
|
||||||
} catch (URISyntaxException | IOException e)
|
} catch (URISyntaxException e)
|
||||||
{
|
{
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -80,24 +83,33 @@ public class PullStateTask implements Runnable
|
||||||
|
|
||||||
for (PeerConnection connection : connections)
|
for (PeerConnection connection : connections)
|
||||||
{
|
{
|
||||||
futures.add(connection.makeRequest(new ObjectListRequest(Set.of(
|
CompletableFuture<Void> changeFuture = connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
|
||||||
|
{
|
||||||
|
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
|
||||||
|
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
|
||||||
|
});
|
||||||
|
|
||||||
|
CompletableFuture<Long> objectFuture = connection.makeRequest(new ObjectListRequest(Set.of(
|
||||||
ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
|
ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
|
||||||
ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER,
|
ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER,
|
||||||
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects ->
|
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenApply(objectListRequestResult ->
|
||||||
{
|
{
|
||||||
|
List<NetworkObject> networkObjects = objectListRequestResult.objects;
|
||||||
System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects.");
|
System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects.");
|
||||||
for (NetworkObject object : networkObjects)
|
for (NetworkObject object : networkObjects)
|
||||||
{
|
{
|
||||||
Main.getInstance().getModel().addNetworkObject(object);
|
Main.getInstance().getModel().addNetworkObject(object);
|
||||||
}
|
}
|
||||||
}));
|
return objectListRequestResult.changeID;
|
||||||
|
});
|
||||||
|
|
||||||
futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
|
CompletableFuture<Void> bothFuture = changeFuture.thenAcceptBoth(objectFuture, (unused, changeID) ->
|
||||||
{
|
{
|
||||||
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
|
ObjectChangeRecord record = Main.getInstance().getModel().getChangeRecord(changeID);
|
||||||
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
|
System.out.println("PullStateTask: updating current change ID to " + record);
|
||||||
}));
|
Main.getInstance().getModel().setCurrentChangeRecord(record);
|
||||||
|
});
|
||||||
|
futures.add(bothFuture);
|
||||||
}
|
}
|
||||||
|
|
||||||
Util.collectFutures(futures).join();
|
Util.collectFutures(futures).join();
|
||||||
|
|
|
@ -21,6 +21,7 @@ message HelloMessage {
|
||||||
|
|
||||||
message CheckInMessage {
|
message CheckInMessage {
|
||||||
repeated uint64 current_change_heads = 1;
|
repeated uint64 current_change_heads = 1;
|
||||||
|
uint64 current_change = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum Error {
|
enum Error {
|
||||||
|
|
Loading…
Reference in a new issue