Fixed some stuff

This commit is contained in:
Nekojimi 2025-09-23 12:27:42 +01:00
parent 5b7c6a857a
commit 855bca957f
24 changed files with 576 additions and 257 deletions

34
pom.xml
View file

@ -74,24 +74,24 @@
<artifactId>jlibnotify</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.github.hypfvieh</groupId>
<artifactId>dbus-java-core</artifactId>
<version>5.1.0-SNAPSHOT</version>
</dependency>
<!-- Unixsocket support using native unix socket implementation of Java (since Java 16) -->
<dependency>
<groupId>com.github.hypfvieh</groupId>
<artifactId>dbus-java-transport-native-unixsocket</artifactId>
<version>5.1.0-SNAPSHOT</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>com.github.hypfvieh</groupId>-->
<!-- <artifactId>dbus-java-core</artifactId>-->
<!-- <version>5.1.0-SNAPSHOT</version>-->
<!-- </dependency>-->
<!-- &lt;!&ndash; Unixsocket support using native unix socket implementation of Java (since Java 16) &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.github.hypfvieh</groupId>-->
<!-- <artifactId>dbus-java-transport-native-unixsocket</artifactId>-->
<!-- <version>5.1.0-SNAPSHOT</version>-->
<!-- </dependency>-->
<!-- Add this if you want support for TCP based DBus connections -->
<dependency>
<groupId>com.github.hypfvieh</groupId>
<artifactId>dbus-java-transport-tcp</artifactId>
<version>5.1.0-SNAPSHOT</version>
</dependency>
<!-- &lt;!&ndash; Add this if you want support for TCP based DBus connections &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>com.github.hypfvieh</groupId>-->
<!-- <artifactId>dbus-java-transport-tcp</artifactId>-->
<!-- <version>5.1.0-SNAPSHOT</version>-->
<!-- </dependency>-->
</dependencies>

View file

@ -48,12 +48,13 @@ public class ConnectionManager extends Thread
{
consumer.accept(nodeTCPConnection);
}
} catch (IOException e)
} catch (Exception e)
{
System.err.println("ConnectionManager TCP experienced exception:" + e.getMessage());
System.err.println("ConnectionManager experienced exception:" + e.getMessage());
e.printStackTrace(System.err);
}
}
System.err.println("ConnectionManager: thread dying!");
}
public PeerConnection getNodeConnection(URI uri) throws IOException
@ -81,13 +82,14 @@ public class ConnectionManager extends Thread
return nodeConnection;
}
public PeerConnection getNodeConnection(Peer peer) throws IOException
public PeerConnection getNodeConnection(Peer peer)
{
// try to find if we already have an active connection to this peer
purgeDeadConnections();
System.out.println("ConnectionManager: trying to get connection to " + peer + " (have " + activeConnections.size() + " connections open)");
for (PeerConnection peerConnection: activeConnections)
{
if (peerConnection.getNode() == peer)
if (peerConnection.getNode().equals(peer))
return peerConnection;
}
@ -95,14 +97,14 @@ public class ConnectionManager extends Thread
{
try
{
return getNodeConnection(address);
return getNodeConnection(address, peer);
}
catch (IOException ex)
{
System.err.println("Couldn't create PeerConnection to " + address + " : " + ex.getMessage());
System.err.println("ConnectionManager: Couldn't create PeerConnection to " + address + " : " + ex.getMessage());
}
}
System.err.println("Failed to create PeerConnection to " + peer + "!");
System.err.println("ConnectionManager: Failed to create PeerConnection to " + peer + "!");
return null;
}
@ -121,7 +123,10 @@ public class ConnectionManager extends Thread
for (PeerConnection peerConnection: activeConnections)
{
if (!peerConnection.isAlive())
{
System.out.println("ConnectionManager: purged dead connection to " + peerConnection.getUri());
deadConnections.add(peerConnection);
}
}
activeConnections.removeAll(deadConnections);
}

View file

@ -21,9 +21,9 @@ public class FilePieceAccess implements Closeable
randomAccessFile.setLength(file.length());
}
public int getPieceOffset(int index)
public long getPieceOffset(int index)
{
return Math.toIntExact(index * networkFile.getPieceSize());
return (index * networkFile.getPieceSize());
}
public int getPieceSize(int index)
@ -47,7 +47,7 @@ public class FilePieceAccess implements Closeable
int pieceSize = getPieceSize(index);
byte[] buffer = new byte[pieceSize];
int pieceOffset = getPieceOffset(index);
long pieceOffset = getPieceOffset(index);
System.out.println("Reading piece " + index + " from file " + file.getName() + " (offset=" + pieceOffset + ", size=" + pieceSize + ")");
randomAccessFile.seek(pieceOffset);
randomAccessFile.read(buffer);

View file

@ -74,7 +74,7 @@ public class FileRemoteAccess
if (!neededPieces.isEmpty())
{
boolean ok = waitForPieceRange(neededPieces, 10000);
boolean ok = waitForPieceRange(neededPieces, 1000);
if (!ok)
{
System.err.println("FRA: timed out while waiting for pieces " + neededPieces);

View file

@ -24,12 +24,15 @@ import moe.nekojimi.friendcloud.network.requests.ObjectListRequest;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.objects.PeerFileState;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.DataStore;
import moe.nekojimi.friendcloud.storage.Model;
import moe.nekojimi.friendcloud.storage.StupidJSONFileStore;
import moe.nekojimi.friendcloud.tasks.JoinNetworkTask;
import org.slf4j.simple.SimpleLogger;
import java.awt.*;
import java.io.File;
import java.io.IOException;
import java.net.*;
@ -60,12 +63,16 @@ public class Main
@Parameter(names="-create-network")
private boolean createNetwork = false;
@Parameter(names = "-storage")
private String storageLocation = ".";
// @Parameter(names="-file")
private ConnectionManager connectionManager;
private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(16);
private final FUSEAccess fuseAccess = new FUSEAccess();
private final Model model = new Model(new StupidJSONFileStore(new File("storage")));
private Model model;
private final NotificationManager notificationManager = new NotificationManager();
public static void main(String[] args)
{
@ -77,8 +84,9 @@ public class Main
try
{
instance.run();
} catch (IOException | InterruptedException | JLibnotifyLoadException | JLibnotifyInitException e)
} catch (Exception e)
{
System.err.println("main() received exception, dying horribly!!");
e.printStackTrace(System.err);
try
{
@ -94,6 +102,10 @@ public class Main
private void run() throws IOException, InterruptedException, JLibnotifyLoadException, JLibnotifyInitException
{
DataStore dataStore = new StupidJSONFileStore(new File(storageLocation));
model = new Model(dataStore);
model.init();
connectionManager = new ConnectionManager(tcpPort);
Path mountPoint;
@ -106,7 +118,6 @@ public class Main
mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + tcpPort);
boolean created = mountPoint.toFile().mkdirs();
System.out.println("Created FUSE mount point " + mountPoint);
}
fuseAccess.mount(mountPoint);
System.out.println("Mounted virtual filesystem at " + mountPoint);
@ -122,10 +133,11 @@ public class Main
}
}));
DefaultJLibnotify libnotify = (DefaultJLibnotify) DefaultJLibnotifyLoader.init().load();
libnotify.init("FriendCloud");
JLibnotifyNotification notification = libnotify.createNotification("Holy balls a notification!", "Woah!!!", "dialog-information");
notification.show();
// if (Desktop.isDesktopSupported())
// {
// Desktop desktop = Desktop.getDesktop();
// desktop.browse(mountPoint.toFile().toURI());
// }
connectionManager.addNewConnectionConsumer(this::requestCompleteState);
@ -184,8 +196,8 @@ public class Main
}
}
JoinNetworkTask joinNetworkTask = new JoinNetworkTask();
executor.submit(joinNetworkTask);
// JoinNetworkTask joinNetworkTask = new JoinNetworkTask();
// executor.submit(joinNetworkTask);
for (String knownPeerAddress : knownPeers)
{
@ -309,4 +321,9 @@ public class Main
{
return model;
}
public NotificationManager getNotificationManager()
{
return notificationManager;
}
}

View file

@ -0,0 +1,105 @@
package moe.nekojimi.friendcloud;
import es.blackleg.jlibnotify.JLibnotify;
import es.blackleg.jlibnotify.JLibnotifyNotification;
import es.blackleg.jlibnotify.core.DefaultJLibnotifyLoader;
import es.blackleg.jlibnotify.exception.JLibnotifyInitException;
import es.blackleg.jlibnotify.exception.JLibnotifyLoadException;
public class NotificationManager
{
private final JLibnotify libnotify;
public NotificationManager()
{
JLibnotify n;
try
{
n = DefaultJLibnotifyLoader.init().load();
n.init("FriendCloud");
System.out.println("Libnotify capabilities detected: " + n.getServerCapabilities());
JLibnotifyNotification notification = n.createNotification("FriendCloud started", "It works! Cool!", "dialog-information");
notification.show();
} catch (JLibnotifyLoadException | JLibnotifyInitException e)
{
n = null;
System.err.println("Failed to initialise notification manager.");
e.printStackTrace(System.err);
}
libnotify = n;
}
public enum NotificationType
{
HELLO,
TRANSFER_IN_PROGRESS,
TRANSFER_DONE,
}
protected String getNotificationIcon(NotificationType type)
{
return switch (type)
{
default -> "dialog-information";
};
}
public Notification createNotification(String heading, String body, NotificationType type)
{
try
{
return new Notification(heading, body, type);
} catch (Exception e)
{
e.printStackTrace(System.err);
return null;
}
}
public class Notification
{
private String heading;
private String body;
private NotificationType type;
private JLibnotifyNotification notification;
public Notification(String heading, String body, NotificationType type)
{
this.heading = heading;
this.body = body;
this.type = type;
if (libnotify != null)
{
notification = libnotify.createNotification(heading, body, getNotificationIcon(type));
// notification.setTimeOut(10);
notification.show();
}
}
public void setHeading(String heading)
{
this.heading = heading;
update();
}
public void setBody(String body)
{
this.body = body;
update();
}
public void setType(NotificationType type)
{
this.type = type;
update();
}
public void update()
{
notification.update(heading, body, getNotificationIcon(type));
// notification.setTimeOut(10);
}
}
}

View file

@ -1,6 +1,6 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.Storable;
@ -13,18 +13,13 @@ public class ObjectChangeRecord implements Storable
{
// private final long changeID;
private NetworkObject.ObjectID creatorPeer;
private ObjectID creatorPeer;
private Set<Long> changeHeads = new HashSet<>();
private Set<Change> changes = new HashSet<>();
public ObjectChangeRecord(NetworkObject.ObjectID creatorPeer)
{
this.creatorPeer = creatorPeer;
}
public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage)
{
ObjectChangeRecord record = new ObjectChangeRecord(new NetworkObject.ObjectID(0)); // TODO: decode creator
ObjectChangeRecord record = new ObjectChangeRecord(); // TODO: decode creator
record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList());
for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList())
{
@ -39,9 +34,10 @@ public class ObjectChangeRecord implements Storable
return record;
}
public static ObjectChangeRecord createFromChanges(NetworkObject.ObjectID creator, Set<Change> changes)
public static ObjectChangeRecord createFromChanges(ObjectID creator, Set<Change> changes)
{
ObjectChangeRecord record = new ObjectChangeRecord(creator);
ObjectChangeRecord record = new ObjectChangeRecord();
record.creatorPeer = creator;
record.changes.addAll(changes);
return record;
}
@ -83,7 +79,7 @@ public class ObjectChangeRecord implements Storable
{
changeHeads = new HashSet<>((Collection) map.get("changeHeads"));
changes = new HashSet<>((Collection) map.get("changes"));
creatorPeer = new NetworkObject.ObjectID((Long) map.get("creator"));
creatorPeer = new ObjectID((Long) map.get("creator"));
}
public String toString()
@ -115,7 +111,7 @@ public class ObjectChangeRecord implements Storable
return Util.xorBytesToLong(bytes);
}
public NetworkObject.ObjectID getCreatorPeer()
public ObjectID getCreatorPeer()
{
return creatorPeer;
}
@ -131,12 +127,12 @@ public class ObjectChangeRecord implements Storable
return changeHeads;
}
public record Change(NetworkObject.ObjectID objectID, Map<String, String> beforeValues, Map<String, String> afterValues)
public record Change(ObjectID objectID, Map<String, String> beforeValues, Map<String, String> afterValues)
{
public static Change createFromObjectChange(ObjectStatements.ObjectChange change)
{
return new Change(new NetworkObject.ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap());
return new Change(new ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap());
}
public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after)
@ -155,7 +151,7 @@ public class ObjectChangeRecord implements Storable
}
if (!afterValues.isEmpty())
{
return new Change(new NetworkObject.ObjectID(before.getObjectId()), beforeValues, afterValues);
return new Change(new ObjectID(before.getObjectId()), beforeValues, afterValues);
}
return null;
}

View file

@ -1,6 +1,7 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.tasks.PropagateMessageTask;
@ -13,29 +14,29 @@ import java.util.Set;
public class ObjectChangeTransaction implements Closeable
{
private final NetworkObject.ObjectID creator;
private final ObjectID creator;
private final ConnectionManager connectionManager;
private final Map<NetworkObject.ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
private boolean ended = false;
ObjectChangeTransaction(ConnectionManager connectionManager, NetworkObject.ObjectID creator)
ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator)
{
this.creator = creator;
this.connectionManager = connectionManager;
}
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, NetworkObject.ObjectID creatorPeer, NetworkObject.ObjectID... objects)
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer, ObjectID... objects)
{
ObjectChangeTransaction builder = new ObjectChangeTransaction(connectionManager, creatorPeer);
for (NetworkObject.ObjectID id : objects)
for (ObjectID id : objects)
{
builder.addObjectBeforeChange(id);
}
return builder;
}
public ObjectChangeTransaction addObjectBeforeChange(NetworkObject.ObjectID id)
public ObjectChangeTransaction addObjectBeforeChange(ObjectID id)
{
NetworkObject object = Main.getInstance().getModel().getObject(id);
if (object != null)
@ -50,7 +51,7 @@ public class ObjectChangeTransaction implements Closeable
ended = true;
Set<ObjectChangeRecord.Change> changes = new HashSet<>();
for (Map.Entry<NetworkObject.ObjectID, ObjectStatements.ObjectState> entry : beforeStates.entrySet())
for (Map.Entry<ObjectID, ObjectStatements.ObjectState> entry : beforeStates.entrySet())
{
ObjectStatements.ObjectState afterState = Main.getInstance().getModel().getObject(entry.getKey()).buildObjectState().build();
ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState);

View file

@ -17,4 +17,16 @@ public class Util
return ret;
}
public static long unconditionalNumberToLong(Object number)
{
assert (number instanceof Number);
return ((Number)number).longValue();
}
public static double unconditionalNumberToDouble(Object number)
{
assert (number instanceof Number);
return ((Number)number).doubleValue();
}
}

View file

@ -9,6 +9,7 @@ import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.CommonMessages;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
@ -129,9 +130,9 @@ public abstract class PeerConnection extends Thread
}
}
NetworkObject.ObjectID senderID = new NetworkObject.ObjectID(header.getSenderId());
ObjectID senderID = new ObjectID(header.getSenderId());
if (peer == null)
peer = (Peer) Main.getInstance().getModel().getOrCreateObject(senderID);
peer = Main.getInstance().getModel().getOrCreateObject(senderID);
else
{
if (!senderID.equals(peer.getObjectID()))
@ -194,7 +195,7 @@ public abstract class PeerConnection extends Thread
{
objectList.addStates(object.buildObjectState());
}
System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList());
// System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList());
sendMessage(wrapMessage(objectList.build(), header));
}
else if (body.is(PieceMessages.FilePiecesRequestMessage.class))
@ -205,7 +206,7 @@ public abstract class PeerConnection extends Thread
replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header);
}
NetworkFile networkFile = (NetworkFile) Main.getInstance().getModel().getObject(new NetworkObject.ObjectID(filePiecesRequestMessage.getFileId()));
NetworkFile networkFile = (NetworkFile) Main.getInstance().getModel().getObject(new ObjectID(filePiecesRequestMessage.getFileId()));
if (networkFile == null)
{
replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header);

View file

@ -47,10 +47,13 @@ public class PeerTCPConnection extends PeerConnection
messageReceived(message);
}
}
} catch (IOException ex)
} catch (Exception ex)
{
// fuck
ex.printStackTrace(System.err);
}
System.out.println("TCP Connection: connection closed");
}
@Override

View file

@ -4,6 +4,7 @@ import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.IOException;
@ -37,15 +38,16 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
{
ObjectStatements.ObjectList objectList = reply.unpack(ObjectStatements.ObjectList.class);
System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
// System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
List<NetworkObject> ret = new ArrayList<>();
for (ObjectStatements.ObjectState objectState : objectList.getStatesList())
{
System.out.println("Received state of object " + objectState.getObjectId());
NetworkObject object = Main.getInstance().getModel().getOrCreateObject(new NetworkObject.ObjectID(objectState.getObjectId()));
NetworkObject object = Main.getInstance().getModel().getOrCreateObject(new ObjectID(objectState.getObjectId()));
object.updateFromStateMessage(objectState);
Main.getInstance().getModel().objectChanged(object);
ret.add(object);
}

View file

@ -1,21 +1,22 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class NetworkFile extends NetworkFSNode
{
private static final int MIN_PIECE_SIZE = 0x400; // 1KiB
private static final int MAX_PIECE_SIZE = 0x100000; // 1 MiB
private static final int MAX_PIECE_SIZE = 0x100000; // 1MiB
private static final int IDEAL_PIECE_COUNT = 1024;
private static File tempDirectory = null;
@ -26,7 +27,8 @@ public class NetworkFile extends NetworkFSNode
private File localFile = null;
private final Map<Peer, PeerFileState> fileStates = new HashMap<>();
private final SortedSet<ObjectID> peersWithCopy = new TreeSet<>();
// private final Map<Peer, PeerFileState> fileStates = new HashMap<>();
private BitSet pieces = new BitSet();
// private List<FilePiece> pieces = new ArrayList<>();
@ -97,6 +99,11 @@ public class NetworkFile extends NetworkFSNode
System.out.println("Total hash: " + HexFormat.of().formatHex(hash));
System.out.println("Have " + pieces.cardinality() + " of " + getPieceCount() + " pieces.");
if (pieces.cardinality() >= getPieceCount())
{
peersWithCopy.add(Main.getInstance().getModel().getSelfPeer().getObjectID());
}
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
@ -115,6 +122,15 @@ public class NetworkFile extends NetworkFSNode
hash = HexFormat.of().parseHex(state.getValuesOrThrow("hash"));
if (state.containsValues("pieceSize"))
pieceSize = Long.parseLong(state.getValuesOrThrow("pieceSize"));
if (state.containsValues("peersWithCopy"))
{
peersWithCopy.clear();
String[] peers = state.getValuesOrThrow("peersWithCopy").split(",");
for (String peer: peers)
{
peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16)));
}
}
}
@Override
@ -124,7 +140,8 @@ public class NetworkFile extends NetworkFSNode
// .putValues("path", path)
.putValues("size", Long.toString(size))
.putValues("hash", HexFormat.of().formatHex(hash))
.putValues("pieceSize", Long.toString(pieceSize));
.putValues("pieceSize", Long.toString(pieceSize))
.putValues("peersWithCopy", peersWithCopy.stream().map(ObjectID::toLong).map(Long::toHexString).collect(Collectors.joining(",")));
}
@Override
@ -136,6 +153,7 @@ public class NetworkFile extends NetworkFSNode
ret.put("pieceSize", pieceSize);
ret.put("pieces", Arrays.stream(pieces.toLongArray()).boxed().toList());
ret.put("localFile", localFile != null ? localFile.getAbsolutePath() : "");
ret.put("peersWithCopy", peersWithCopy.stream().map(ObjectID::toLong).toList());
return ret;
}
@ -146,13 +164,14 @@ public class NetworkFile extends NetworkFSNode
size = ((Number) map.get("size")).longValue();
hash = HexFormat.of().parseHex((CharSequence) map.get("hash"));
pieceSize = ((Number) map.get("pieceSize")).longValue();
ArrayList<Number> pieces1 = (ArrayList<Number>) map.get("pieces");
pieces = BitSet.valueOf(pieces1.stream().mapToLong(Number::longValue).toArray());
pieces = BitSet.valueOf(((ArrayList<Number>) map.get("pieces")).stream().mapToLong(Number::longValue).toArray());
String localFilePath = (String) map.get("localFile");
if (localFilePath.isEmpty())
localFile = null;
else
localFile = new File(localFilePath);
peersWithCopy.clear();
peersWithCopy.addAll(((List<Object>)map.get("peersWithCopy")).stream().map(Util::unconditionalNumberToLong).map(ObjectID::new).toList());
}
public File getLocalFile()
@ -241,7 +260,7 @@ public class NetworkFile extends NetworkFSNode
synchronized (pieces)
{
pieces.set(pieceIdx, has);
pieces.notifyAll();
notifyPieceWaiters();
}
}
@ -255,14 +274,24 @@ public class NetworkFile extends NetworkFSNode
return size;
}
void addFileState(PeerFileState peerFileState)
// void addFileState(PeerFileState peerFileState)
// {
// fileStates.put(peerFileState.getNode(), peerFileState);
// }
//
// public Map<Peer, PeerFileState> getFileStates()
// {
// return fileStates;
// }
public void addPeerWithCopy(Peer selfPeer)
{
fileStates.put(peerFileState.getNode(), peerFileState);
peersWithCopy.add(selfPeer.getObjectID());
}
public Map<Peer, PeerFileState> getFileStates()
public List<Peer> getPeersWithCopy()
{
return fileStates;
return peersWithCopy.stream().map(objectID -> (Peer) Main.getInstance().getModel().getObject(objectID)).toList();
}
/**
@ -301,6 +330,14 @@ public class NetworkFile extends NetworkFSNode
return false;
}
public void notifyPieceWaiters()
{
synchronized (pieces)
{
pieces.notifyAll();
}
}
public enum StorageType
{
/** The file will be stored as a complete file in the storage directory under it's own name and file path. */

View file

@ -3,9 +3,6 @@ package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.WeakHashMap;
public class NetworkFolder extends NetworkFSNode
{

View file

@ -2,12 +2,13 @@ package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import moe.nekojimi.friendcloud.storage.Storable;
import org.jetbrains.annotations.NotNull;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
public abstract class NetworkObject implements Storable
public abstract class NetworkObject implements Storable, Comparable<NetworkObject>
{
private final ObjectID objectID;
@ -53,67 +54,23 @@ public abstract class NetworkObject implements Storable
.setObjectId(objectID.toLong());
}
public static class ObjectID
@Override
public int compareTo(@NotNull NetworkObject networkObject)
{
private final ObjectStatements.ObjectType type;
private final int systemID;
private final int uniqueID;
return Long.compare(getObjectID().toLong(), networkObject.getObjectID().toLong());
}
public ObjectID(long id)
{
uniqueID = (int)(0x00000000_FFFFFFFFL & id);
systemID = Math.toIntExact((0x00FFFFFF_00000000L & id) >>> 32);
type = ObjectStatements.ObjectType.forNumber(Math.toIntExact(((0xFF000000_00000000L & id) >>> 56)));
}
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
NetworkObject that = (NetworkObject) o;
return Objects.equals(objectID, that.objectID);
}
public ObjectID(ObjectStatements.ObjectType type, int systemID, int uniqueID)
{
this.type = type;
this.systemID = systemID;
this.uniqueID = uniqueID;
}
public long toLong()
{
long uniquePart = Integer.toUnsignedLong(uniqueID);
long systemPart = Integer.toUnsignedLong(systemID) << 32;
long typePart = ((long) type.getNumber()) << 56;
return typePart | systemPart | uniquePart;
}
public ObjectStatements.ObjectType getType()
{
return type;
}
public int getSystemID()
{
return systemID;
}
public int getUniqueID()
{
return uniqueID;
}
@Override
public String toString()
{
return "OBJ{" + Long.toHexString(toLong()) + "}";
}
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
ObjectID objectID = (ObjectID) o;
return systemID == objectID.systemID && uniqueID == objectID.uniqueID && type == objectID.type;
}
@Override
public int hashCode()
{
return Objects.hash(type, systemID, uniqueID);
}
@Override
public int hashCode()
{
return Objects.hashCode(objectID);
}
}

View file

@ -0,0 +1,75 @@
package moe.nekojimi.friendcloud.objects;
import org.jetbrains.annotations.NotNull;
import java.util.Objects;
public class ObjectID implements Comparable<ObjectID>
{
private final moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType type;
private final int systemID;
private final int uniqueID;
public ObjectID(long id)
{
uniqueID = (int) (0x00000000_FFFFFFFFL & id);
systemID = Math.toIntExact((0x00FFFFFF_00000000L & id) >>> 32);
type = moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType.forNumber(Math.toIntExact(((0xFF000000_00000000L & id) >>> 56)));
}
public ObjectID(moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType type, int systemID, int uniqueID)
{
this.type = type;
this.systemID = systemID;
this.uniqueID = uniqueID;
}
public long toLong()
{
long uniquePart = Integer.toUnsignedLong(uniqueID);
long systemPart = Integer.toUnsignedLong(systemID) << 32;
long typePart = ((long) type.getNumber()) << 56;
return typePart | systemPart | uniquePart;
}
public moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType getType()
{
return type;
}
public int getSystemID()
{
return systemID;
}
public int getUniqueID()
{
return uniqueID;
}
@Override
public String toString()
{
return "OBJ{" + Integer.toHexString(type.getNumber()) + "-" + Integer.toHexString(systemID) + "-" + Integer.toHexString(uniqueID) + "}";
}
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
ObjectID objectID = (ObjectID) o;
return systemID == objectID.systemID && uniqueID == objectID.uniqueID && type == objectID.type;
}
@Override
public int hashCode()
{
return Objects.hash(type, systemID, uniqueID);
}
@Override
public int compareTo(@NotNull ObjectID objectID)
{
return Long.compare(toLong(), objectID.toLong());
}
}

View file

@ -9,11 +9,11 @@ import java.util.stream.Collectors;
public class Peer extends NetworkObject
{
private final List<URI> addresses = new ArrayList<>();
private final SortedSet<URI> addresses = new TreeSet<>();
private String userName = "";
private String systemName = "";
private Map<NetworkFile, PeerFileState> fileStates = new HashMap<>();
// private Map<NetworkFile, PeerFileState> fileStates = new HashMap<>();
private volatile int lastTriedAddressIdx = -1;
@ -94,25 +94,17 @@ public class Peer extends NetworkObject
addresses.add(address);
}
public URI getNextAddress()
{
lastTriedAddressIdx++;
if (lastTriedAddressIdx >= addresses.size())
lastTriedAddressIdx = 0;
return addresses.get(lastTriedAddressIdx);
}
// void addFileState(PeerFileState peerFileState)
// {
// fileStates.put(peerFileState.getFile(), peerFileState);
// }
//
// public Map<NetworkFile, PeerFileState> getFileStates()
// {
// return fileStates;
// }
void addFileState(PeerFileState peerFileState)
{
fileStates.put(peerFileState.getFile(), peerFileState);
}
public Map<NetworkFile, PeerFileState> getFileStates()
{
return fileStates;
}
public List<URI> getAddresses()
public SortedSet<URI> getAddresses()
{
return addresses;
}

View file

@ -1,14 +1,14 @@
package moe.nekojimi.friendcloud.objects;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
import java.util.Map;
public class PeerFileState extends NetworkObject
{
private Peer peer;
private NetworkFile file;
private ObjectID peerID;
private ObjectID fileID;
private double progress = 0;
@ -21,13 +21,10 @@ public class PeerFileState extends NetworkObject
public void updateFromStateMessage(ObjectStatements.ObjectState state)
{
super.updateFromStateMessage(state);
peer = (Peer) Main.getInstance().getModel().getOrCreateObject(new ObjectID(Long.parseLong(state.getValuesOrThrow("peer"))));
file = (NetworkFile) Main.getInstance().getModel().getOrCreateObject(new ObjectID(Long.parseLong(state.getValuesOrThrow("file"))));
peerID = new ObjectID(Long.parseLong(state.getValuesOrThrow("peer")));
fileID = new ObjectID(Long.parseLong(state.getValuesOrThrow("file")));
if (state.containsValues("progress"))
progress = Double.parseDouble(state.getValuesOrThrow("progress"));
peer.addFileState(this);
file.addFileState(this);
}
@Override
@ -40,20 +37,20 @@ public class PeerFileState extends NetworkObject
public ObjectStatements.ObjectState.Builder buildObjectState()
{
return super.buildObjectState()
.putValues("peer", Long.toString(peer.getObjectID().toLong()))
.putValues("file", Long.toString(file.getObjectID().toLong()))
// .putValues("peer", Long.toString(peer.getObjectID().toLong()))
// .putValues("file", Long.toString(file.getObjectID().toLong()))
.putValues("progress", Double.toString(progress));
}
public void setNode(Peer peer)
{
this.peer = peer;
}
public void setFile(NetworkFile file)
{
this.file = file;
}
// public void setNode(Peer peer)
// {
// this.peer = peer;
// }
//
// public void setFile(NetworkFile file)
// {
// this.file = file;
// }
public double getProgress()
{
@ -65,19 +62,31 @@ public class PeerFileState extends NetworkObject
this.progress = progress;
}
public NetworkFile getFile()
{
return file;
}
// public NetworkFile getFile()
// {
// return file;
// }
//
// public Peer getNode()
// {
// return peer;
// }
public Peer getNode()
@Override
public Map<String, Object> getStateMap()
{
return peer;
Map<String, Object> ret = super.getStateMap();
// ret.put("peer", peer == null ? 0L : peer.getObjectID().toLong());
// ret.put("file", file == null? 0L : file.getObjectID().toLong());
ret.put("progress", progress);
return ret;
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
// peer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("peer"))));
// file = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("file"))));
progress = Util.unconditionalNumberToDouble( map.get("progress"));
}
}

View file

@ -1,7 +1,9 @@
package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.util.Map;
@ -9,6 +11,8 @@ import java.util.Map;
public class LocalData implements Storable
{
private Peer localPeer;
private ObjectChangeRecord currentChangeRecord;
private int systemID;
public Peer getLocalPeer()
{
@ -20,6 +24,26 @@ public class LocalData implements Storable
this.localPeer = localPeer;
}
public ObjectChangeRecord getCurrentChangeRecord()
{
return currentChangeRecord;
}
public void setCurrentChangeRecord(ObjectChangeRecord currentChangeRecord)
{
this.currentChangeRecord = currentChangeRecord;
}
public int getSystemID()
{
return systemID;
}
public void setSystemID(int systemID)
{
this.systemID = systemID;
}
@Override
public long getStorageID()
{
@ -29,12 +53,17 @@ public class LocalData implements Storable
@Override
public Map<String, Object> getStateMap()
{
return Map.of("localPeer", localPeer.getObjectID().toLong());
return Map.of("localPeer", localPeer == null ? 0L : localPeer.getObjectID().toLong(),
"currentChangeRecord", currentChangeRecord == null ? 0L : currentChangeRecord.getChangeID(),
"systemID", systemID);
}
@Override
public void updateFromStateMap(Map<String, Object> map)
{
localPeer = (Peer) Main.getInstance().getModel().getObject(new NetworkObject.ObjectID((Long) map.get("localPeer")));
localPeer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.getOrDefault("localPeer",0))));
currentChangeRecord = Main.getInstance().getModel().getChangeRecord(Util.unconditionalNumberToLong(map.getOrDefault("currentChangeRecord",0)));
systemID = (int) map.getOrDefault("systemID", 0);
System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + currentChangeRecord + ", systemID=" + Integer.toHexString(systemID));
}
}

View file

@ -11,37 +11,53 @@ public class Model
{
private final CachingDataStore dataStore;
private final int systemID;
private Peer selfPeer = null;
private ObjectChangeRecord currentChange;
private LocalData localData;
public Model(DataStore dataStore)
{
this.dataStore = new CachingDataStore(dataStore);
Random ran = new Random();
systemID = ran.nextInt() & 0x00FFFFFF;
}
public void setSelfPeer(Peer selfPeer)
public synchronized void init()
{
this.selfPeer = selfPeer;
List<LocalData> localDataList = dataStore.getDAOForClass(LocalData.class).getAll();
if (localDataList.isEmpty())
{
localData = dataStore.getDAOForClass(LocalData.class).create(0);
}
else if (localDataList.size() == 1)
{
localData = localDataList.getFirst();
}
else
{
throw new IllegalStateException("We have more than one LocalData somehow!!");
}
if (localData.getSystemID() == 0)
{
Random ran = new Random();
localData.setSystemID(ran.nextInt() & 0x00FFFFFF);
objectChanged(localData);
}
}
public synchronized Peer getSelfPeer()
{
if (selfPeer == null)
selfPeer = createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER);
return selfPeer;
if (localData.getLocalPeer() == null)
{
localData.setLocalPeer(createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER));
objectChanged(localData);
}
return localData.getLocalPeer();
}
// private Map<Long, Node> nodes = new HashMap<>();
public synchronized NetworkObject.ObjectID getNextObjectID(ObjectStatements.ObjectType type)
public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type)
{
Random ran = new Random();
int randomNumber = ran.nextInt();
NetworkObject.ObjectID objectID = new NetworkObject.ObjectID(type, systemID, randomNumber);
ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber);
System.out.println("Assigned new object ID: " + objectID);
return objectID;
}
@ -59,7 +75,7 @@ public class Model
};
}
public synchronized <T extends NetworkObject> T createObjectByID(NetworkObject.ObjectID id)
public synchronized <T extends NetworkObject> T createObjectByID(ObjectID id)
{
if (id.toLong() == 0)
throw new IllegalArgumentException("Cannot create an object with ID=0!");
@ -76,7 +92,7 @@ public class Model
return createObjectByID(getNextObjectID(type));
}
public synchronized <T extends NetworkObject> T getObject(NetworkObject.ObjectID id)
public synchronized <T extends NetworkObject> T getObject(ObjectID id)
{
if (id.toLong() == 0)
return null;
@ -84,7 +100,7 @@ public class Model
return dataStore.getDAOForClass(clazz).get(id.toLong());
}
public synchronized <T extends NetworkObject> T getOrCreateObject(NetworkObject.ObjectID id)
public synchronized <T extends NetworkObject> T getOrCreateObject(ObjectID id)
{
if (id.toLong() == 0)
return null;
@ -138,13 +154,15 @@ public class Model
public ObjectChangeRecord getChangeRecord(long id)
{
return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id);
if (id == 0)
return null;
return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id);
}
public void applyChangeRecord(ObjectChangeRecord record)
{
if (!record.getChangeHeads().contains(currentChange.getChangeID()))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + currentChange.getChangeID());
if (!record.getChangeHeads().contains(localData.getCurrentChangeRecord().getChangeID()))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord().getChangeID());
addChangeRecord(record);

View file

@ -1,5 +1,6 @@
package moe.nekojimi.friendcloud.storage;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.objects.*;
import org.jetbrains.annotations.NotNull;
import org.json.JSONArray;
@ -8,10 +9,10 @@ import org.json.JSONObject;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.lang.reflect.Modifier;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.util.*;
public class StupidJSONFileStore extends DataStore
@ -36,17 +37,21 @@ public class StupidJSONFileStore extends DataStore
if (daos.containsKey(clazz))
return (DAO<T>) daos.get(clazz);
DAO<T> ret;
DAO<?> ret;
if (clazz.equals(NetworkFile.class))
ret = (DAO<T>) new NetworkFileDAO();
ret = new NetworkFileDAO();
else if (clazz.equals(NetworkFolder.class))
ret = (DAO<T>) new NetworkFolderDAO();
ret = new NetworkFolderDAO();
else if (clazz.equals(Peer.class))
ret = (DAO<T>) new PeerDAO();
ret = new PeerDAO();
else if (clazz.equals(PeerFileState.class))
ret = (DAO<T>) new PeerFileStateDAO();
ret = new PeerFileStateDAO();
else if (clazz.equals(NetworkFSNode.class))
ret = (DAO<T>) new NetworkFSNodeDAO();
ret = new NetworkFSNodeDAO();
else if (clazz.equals(LocalData.class))
ret = new LocalDataDAO();
else if (clazz.equals(ObjectChangeRecord.class))
ret = new ObjectChangeRecordDAO();
else
throw new UnsupportedOperationException("Requested DAO for unsupported type " + clazz.getCanonicalName());
@ -202,11 +207,17 @@ public class StupidJSONFileStore extends DataStore
public T get(long id)
{
File file = new File(getNamespaceDirectory(), Long.toHexString(id) + ".json");
file.getParentFile().mkdirs();
try
{
JSONObject json = new JSONObject(Files.readString(file.toPath()));
return jsonToObject(json);
} catch (IOException e)
} catch (NoSuchFileException ex)
{
System.err.println("JSONFileStore: failed to find object with ID=" + id + ", expected in " + file.getAbsolutePath());
return null;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
@ -216,6 +227,7 @@ public class StupidJSONFileStore extends DataStore
public void update(T object)
{
File file = new File(getNamespaceDirectory(), Long.toHexString(object.getStorageID()) + ".json");
file.getParentFile().mkdirs();
try(FileWriter writer = new FileWriter(file, false))
{
objectToJson(object).write(writer);
@ -255,7 +267,7 @@ public class StupidJSONFileStore extends DataStore
@Override
protected NetworkFile makeBlank(long id)
{
return new NetworkFile(new NetworkObject.ObjectID(id));
return new NetworkFile(new ObjectID(id));
}
}
@ -269,7 +281,7 @@ public class StupidJSONFileStore extends DataStore
@Override
protected NetworkFolder makeBlank(long id)
{
return new NetworkFolder(new NetworkObject.ObjectID(id));
return new NetworkFolder(new ObjectID(id));
}
}
@ -283,7 +295,7 @@ public class StupidJSONFileStore extends DataStore
@Override
protected Peer makeBlank(long id)
{
return new Peer(new NetworkObject.ObjectID(id));
return new Peer(new ObjectID(id));
}
@Override
@ -328,7 +340,37 @@ public class StupidJSONFileStore extends DataStore
@Override
protected PeerFileState makeBlank(long id)
{
return new PeerFileState(new NetworkObject.ObjectID(id));
return new PeerFileState(new ObjectID(id));
}
}
private class LocalDataDAO extends JSONObjectDAO<LocalData>
{
@Override
protected String getNamespace()
{
return "localData";
}
@Override
protected LocalData makeBlank(long id)
{
return new LocalData();
}
}
private class ObjectChangeRecordDAO extends JSONObjectDAO<ObjectChangeRecord>
{
@Override
protected String getNamespace()
{
return "changes";
}
@Override
protected ObjectChangeRecord makeBlank(long id)
{
return new ObjectChangeRecord();
}
}
}

View file

@ -2,24 +2,27 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.ConnectionManager;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.NotificationManager;
import moe.nekojimi.friendcloud.ObjectChangeTransaction;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.objects.PeerFileState;
import org.jetbrains.annotations.NotNull;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public class FileDownloadTask implements RunnableFuture<File>
{
private final NetworkFile file;
private final ConnectionManager manager;
private final ConnectionManager connectionManager;
private final long timeoutPerPieceMs = 10_000;
private final long timeoutPerPieceMs = 1_000;
private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128;
private final SortedSet<Integer> missingPieceIndices = new TreeSet<>();
@ -28,20 +31,20 @@ public class FileDownloadTask implements RunnableFuture<File>
private boolean failed = false;
private final Object waitObject = new Object();
public FileDownloadTask(NetworkFile file, ConnectionManager manager)
public FileDownloadTask(NetworkFile file, ConnectionManager connectionManager)
{
this.file = file;
this.manager = manager;
this.connectionManager = connectionManager;
for (int i = 0; i < file.getPieceCount(); i++)
{
missingPieceIndices.add(i);
}
}
public FileDownloadTask(NetworkFile file, ConnectionManager manager, SortedSet<Integer> missingPieces)
public FileDownloadTask(NetworkFile file, ConnectionManager connectionManager, SortedSet<Integer> missingPieces)
{
this.file = file;
this.manager = manager;
this.connectionManager = connectionManager;
missingPieceIndices.addAll(missingPieces);
}
@ -53,38 +56,46 @@ public class FileDownloadTask implements RunnableFuture<File>
@Override
public void run()
{
System.out.println("Starting download of file " + file.getName());
System.out.println("Starting download of file " + file.getName() + " (pieces: " + missingPieceIndices + ")");
// NotificationManager.Notification notification = Main.getInstance().getNotificationManager().createNotification("Streaming " + file.getName(), "Starting download...", NotificationManager.NotificationType.TRANSFER_IN_PROGRESS);
int startingPieces = missingPieceIndices.size();
String connectionLine = "";
String progressLine = "";
Peer selfPeer = Main.getInstance().getModel().getSelfPeer();
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
{
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces.");
Map<Peer, PeerFileState> fileStates = file.getFileStates();
// Map<Peer, PeerFileState> fileStates = file.getFileStates();
// determine what nodes we can connect to
List<PeerConnection> connections = new ArrayList<>();
for (PeerFileState peerFileState : fileStates.values())
for (Peer peer : file.getPeersWithCopy())
{
if (peerFileState.getProgress() >= 100.0)
if (peer == selfPeer)
continue; // yeah that's us dipshit
PeerConnection connection = connectionManager.getNodeConnection(peer);
if (connection != null)
{
try
{
PeerConnection connection = manager.getNodeConnection(peerFileState.getNode());
System.out.println("FileDownloadTask: Will download from " + peerFileState.getNode().getNodeName());
connections.add(connection);
} catch (IOException ex)
{
System.err.println("Failed to connect to peer " + peerFileState.getNode().getNodeName() + ": " + ex.getMessage());
}
System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
connections.add(connection);
}
}
// connectionLine = "Connected to " + connections.size() + " peers.";
// notification.setBody(connectionLine + "\n" + progressLine);
// connectionLine = "Connected to " + connections.stream().map(PeerConnection::getNode).map(Peer::getNodeName).collect(Collectors.joining(", "));
// shuffle the connections list
Collections.shuffle(connections);
if (connections.isEmpty())
{
System.err.println("FileDownloadTask: No peers have the file, download failed!");
file.notifyPieceWaiters();
failed = true;
break;
}
@ -136,11 +147,29 @@ public class FileDownloadTask implements RunnableFuture<File>
System.err.println("FileDownloadTask: Request timed out.");
} catch (ExecutionException | TimeoutException e)
{
future.cancel(true);
e.printStackTrace(System.err);
}
// progressLine = "Have " + (startingPieces - missingPieceIndices.size()) + " / " + missingPieceIndices.size() + " pieces. (" + file.getDownloadPercentage() + "%)";
// notification.setBody(connectionLine + "\n" + progressLine);
}
}
if (file.getDownloadPercentage() >= 100.0)
{
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID()))
{
transaction.addObjectBeforeChange(file.getObjectID());
file.addPeerWithCopy(selfPeer);
}
catch (IOException ex)
{
}
}
// notification.setBody("Finished downloading!");
System.out.println("FileDownloadTask: finished downloading " + file.getName() + "!");
done = true;
synchronized (waitObject)

View file

@ -2,7 +2,7 @@ package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeTransaction;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
@ -15,7 +15,7 @@ public class JoinNetworkTask implements Runnable
public void run()
{
// generate new peer ID
NetworkObject.ObjectID peerID = null;
ObjectID peerID = null;
try (ObjectChangeTransaction builder = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), peerID))
{
Peer selfPeer = Main.getInstance().getModel().getSelfPeer();

View file

@ -22,20 +22,12 @@ public class SyncWithNetworkTask implements Runnable
for (Peer peer : Main.getInstance().getModel().listOtherPeers())
{
// open a connection
try
{
PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
// send a ObjectChangeRequest
ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()));
CompletableFuture<Set<ObjectStatements.ObjectChange>> future = connection.makeRequest(objectChangeRequest);
// integrate the returned changes with our change graph
} catch (IOException e)
{
System.err.println("SyncWithNetworkTask: Couldn't connect to " + peer + ": " + e.getMessage());
continue;
}
PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer);
// send a ObjectChangeRequest
ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()));
CompletableFuture<Set<ObjectStatements.ObjectChange>> future = connection.makeRequest(objectChangeRequest);
// integrate the returned changes with our change graph
}
// if no peers could be contacted:
// return success (everyone's offline)