diff --git a/pom.xml b/pom.xml index 052e69d..ea57383 100644 --- a/pom.xml +++ b/pom.xml @@ -74,24 +74,24 @@ jlibnotify 1.1.0 - - com.github.hypfvieh - dbus-java-core - 5.1.0-SNAPSHOT - - - - com.github.hypfvieh - dbus-java-transport-native-unixsocket - 5.1.0-SNAPSHOT - + + + + + + + + + + + - - - com.github.hypfvieh - dbus-java-transport-tcp - 5.1.0-SNAPSHOT - + + + + + + diff --git a/src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java b/src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java index ff4f0b7..ab7e645 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java +++ b/src/main/java/moe/nekojimi/friendcloud/ConnectionManager.java @@ -48,12 +48,13 @@ public class ConnectionManager extends Thread { consumer.accept(nodeTCPConnection); } - } catch (IOException e) + } catch (Exception e) { - System.err.println("ConnectionManager TCP experienced exception:" + e.getMessage()); + System.err.println("ConnectionManager experienced exception:" + e.getMessage()); e.printStackTrace(System.err); } } + System.err.println("ConnectionManager: thread dying!"); } public PeerConnection getNodeConnection(URI uri) throws IOException @@ -81,13 +82,14 @@ public class ConnectionManager extends Thread return nodeConnection; } - public PeerConnection getNodeConnection(Peer peer) throws IOException + public PeerConnection getNodeConnection(Peer peer) { // try to find if we already have an active connection to this peer purgeDeadConnections(); + System.out.println("ConnectionManager: trying to get connection to " + peer + " (have " + activeConnections.size() + " connections open)"); for (PeerConnection peerConnection: activeConnections) { - if (peerConnection.getNode() == peer) + if (peerConnection.getNode().equals(peer)) return peerConnection; } @@ -95,14 +97,14 @@ public class ConnectionManager extends Thread { try { - return getNodeConnection(address); + return getNodeConnection(address, peer); } catch (IOException ex) { - System.err.println("Couldn't create PeerConnection to " + address + " : " + ex.getMessage()); + System.err.println("ConnectionManager: Couldn't create PeerConnection to " + address + " : " + ex.getMessage()); } } - System.err.println("Failed to create PeerConnection to " + peer + "!"); + System.err.println("ConnectionManager: Failed to create PeerConnection to " + peer + "!"); return null; } @@ -121,7 +123,10 @@ public class ConnectionManager extends Thread for (PeerConnection peerConnection: activeConnections) { if (!peerConnection.isAlive()) + { + System.out.println("ConnectionManager: purged dead connection to " + peerConnection.getUri()); deadConnections.add(peerConnection); + } } activeConnections.removeAll(deadConnections); } diff --git a/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java b/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java index 6c6caa9..80cecb1 100644 --- a/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java +++ b/src/main/java/moe/nekojimi/friendcloud/FilePieceAccess.java @@ -21,9 +21,9 @@ public class FilePieceAccess implements Closeable randomAccessFile.setLength(file.length()); } - public int getPieceOffset(int index) + public long getPieceOffset(int index) { - return Math.toIntExact(index * networkFile.getPieceSize()); + return (index * networkFile.getPieceSize()); } public int getPieceSize(int index) @@ -47,7 +47,7 @@ public class FilePieceAccess implements Closeable int pieceSize = getPieceSize(index); byte[] buffer = new byte[pieceSize]; - int pieceOffset = getPieceOffset(index); + long pieceOffset = getPieceOffset(index); System.out.println("Reading piece " + index + " from file " + file.getName() + " (offset=" + pieceOffset + ", size=" + pieceSize + ")"); randomAccessFile.seek(pieceOffset); randomAccessFile.read(buffer); diff --git a/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java b/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java index 5b53987..c0c493a 100644 --- a/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java +++ b/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java @@ -74,7 +74,7 @@ public class FileRemoteAccess if (!neededPieces.isEmpty()) { - boolean ok = waitForPieceRange(neededPieces, 10000); + boolean ok = waitForPieceRange(neededPieces, 1000); if (!ok) { System.err.println("FRA: timed out while waiting for pieces " + neededPieces); diff --git a/src/main/java/moe/nekojimi/friendcloud/Main.java b/src/main/java/moe/nekojimi/friendcloud/Main.java index 0bcbce1..6f19f50 100644 --- a/src/main/java/moe/nekojimi/friendcloud/Main.java +++ b/src/main/java/moe/nekojimi/friendcloud/Main.java @@ -24,12 +24,15 @@ import moe.nekojimi.friendcloud.network.requests.ObjectListRequest; import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.objects.Peer; +import moe.nekojimi.friendcloud.objects.PeerFileState; import moe.nekojimi.friendcloud.protos.ObjectStatements; +import moe.nekojimi.friendcloud.storage.DataStore; import moe.nekojimi.friendcloud.storage.Model; import moe.nekojimi.friendcloud.storage.StupidJSONFileStore; import moe.nekojimi.friendcloud.tasks.JoinNetworkTask; import org.slf4j.simple.SimpleLogger; +import java.awt.*; import java.io.File; import java.io.IOException; import java.net.*; @@ -60,12 +63,16 @@ public class Main @Parameter(names="-create-network") private boolean createNetwork = false; + @Parameter(names = "-storage") + private String storageLocation = "."; + // @Parameter(names="-file") private ConnectionManager connectionManager; private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(16); private final FUSEAccess fuseAccess = new FUSEAccess(); - private final Model model = new Model(new StupidJSONFileStore(new File("storage"))); + private Model model; + private final NotificationManager notificationManager = new NotificationManager(); public static void main(String[] args) { @@ -77,8 +84,9 @@ public class Main try { instance.run(); - } catch (IOException | InterruptedException | JLibnotifyLoadException | JLibnotifyInitException e) + } catch (Exception e) { + System.err.println("main() received exception, dying horribly!!"); e.printStackTrace(System.err); try { @@ -94,6 +102,10 @@ public class Main private void run() throws IOException, InterruptedException, JLibnotifyLoadException, JLibnotifyInitException { + DataStore dataStore = new StupidJSONFileStore(new File(storageLocation)); + model = new Model(dataStore); + + model.init(); connectionManager = new ConnectionManager(tcpPort); Path mountPoint; @@ -106,7 +118,6 @@ public class Main mountPoint = Path.of(System.getProperty("user.dir") + "/fuse-mount-" + tcpPort); boolean created = mountPoint.toFile().mkdirs(); System.out.println("Created FUSE mount point " + mountPoint); - } fuseAccess.mount(mountPoint); System.out.println("Mounted virtual filesystem at " + mountPoint); @@ -122,10 +133,11 @@ public class Main } })); - DefaultJLibnotify libnotify = (DefaultJLibnotify) DefaultJLibnotifyLoader.init().load(); - libnotify.init("FriendCloud"); - JLibnotifyNotification notification = libnotify.createNotification("Holy balls a notification!", "Woah!!!", "dialog-information"); - notification.show(); +// if (Desktop.isDesktopSupported()) +// { +// Desktop desktop = Desktop.getDesktop(); +// desktop.browse(mountPoint.toFile().toURI()); +// } connectionManager.addNewConnectionConsumer(this::requestCompleteState); @@ -184,8 +196,8 @@ public class Main } } - JoinNetworkTask joinNetworkTask = new JoinNetworkTask(); - executor.submit(joinNetworkTask); +// JoinNetworkTask joinNetworkTask = new JoinNetworkTask(); +// executor.submit(joinNetworkTask); for (String knownPeerAddress : knownPeers) { @@ -309,4 +321,9 @@ public class Main { return model; } + + public NotificationManager getNotificationManager() + { + return notificationManager; + } } \ No newline at end of file diff --git a/src/main/java/moe/nekojimi/friendcloud/NotificationManager.java b/src/main/java/moe/nekojimi/friendcloud/NotificationManager.java new file mode 100644 index 0000000..48caaa1 --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/NotificationManager.java @@ -0,0 +1,105 @@ +package moe.nekojimi.friendcloud; + +import es.blackleg.jlibnotify.JLibnotify; +import es.blackleg.jlibnotify.JLibnotifyNotification; +import es.blackleg.jlibnotify.core.DefaultJLibnotifyLoader; +import es.blackleg.jlibnotify.exception.JLibnotifyInitException; +import es.blackleg.jlibnotify.exception.JLibnotifyLoadException; + +public class NotificationManager +{ + private final JLibnotify libnotify; + + public NotificationManager() + { + JLibnotify n; + try + { + n = DefaultJLibnotifyLoader.init().load(); + n.init("FriendCloud"); + System.out.println("Libnotify capabilities detected: " + n.getServerCapabilities()); + JLibnotifyNotification notification = n.createNotification("FriendCloud started", "It works! Cool!", "dialog-information"); + notification.show(); + } catch (JLibnotifyLoadException | JLibnotifyInitException e) + { + n = null; + System.err.println("Failed to initialise notification manager."); + e.printStackTrace(System.err); + } + libnotify = n; + } + + public enum NotificationType + { + HELLO, + TRANSFER_IN_PROGRESS, + TRANSFER_DONE, + } + + protected String getNotificationIcon(NotificationType type) + { + return switch (type) + { + default -> "dialog-information"; + }; + } + + public Notification createNotification(String heading, String body, NotificationType type) + { + try + { + return new Notification(heading, body, type); + } catch (Exception e) + { + e.printStackTrace(System.err); + return null; + } + } + + public class Notification + { + private String heading; + private String body; + private NotificationType type; + + private JLibnotifyNotification notification; + + public Notification(String heading, String body, NotificationType type) + { + this.heading = heading; + this.body = body; + this.type = type; + + if (libnotify != null) + { + notification = libnotify.createNotification(heading, body, getNotificationIcon(type)); +// notification.setTimeOut(10); + notification.show(); + } + } + + public void setHeading(String heading) + { + this.heading = heading; + update(); + } + + public void setBody(String body) + { + this.body = body; + update(); + } + + public void setType(NotificationType type) + { + this.type = type; + update(); + } + + public void update() + { + notification.update(heading, body, getNotificationIcon(type)); +// notification.setTimeOut(10); + } + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java index 973e010..aee582f 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java +++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeRecord.java @@ -1,6 +1,6 @@ package moe.nekojimi.friendcloud; -import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.storage.Storable; @@ -13,18 +13,13 @@ public class ObjectChangeRecord implements Storable { // private final long changeID; - private NetworkObject.ObjectID creatorPeer; + private ObjectID creatorPeer; private Set changeHeads = new HashSet<>(); private Set changes = new HashSet<>(); - public ObjectChangeRecord(NetworkObject.ObjectID creatorPeer) - { - this.creatorPeer = creatorPeer; - } - public static ObjectChangeRecord createFromChangeMessage(ObjectStatements.ObjectChangeMessage objectChangeMessage) { - ObjectChangeRecord record = new ObjectChangeRecord(new NetworkObject.ObjectID(0)); // TODO: decode creator + ObjectChangeRecord record = new ObjectChangeRecord(); // TODO: decode creator record.changeHeads.addAll(objectChangeMessage.getChangeHeadsList()); for (ObjectStatements.ObjectChange objectChange : objectChangeMessage.getChangesList()) { @@ -39,9 +34,10 @@ public class ObjectChangeRecord implements Storable return record; } - public static ObjectChangeRecord createFromChanges(NetworkObject.ObjectID creator, Set changes) + public static ObjectChangeRecord createFromChanges(ObjectID creator, Set changes) { - ObjectChangeRecord record = new ObjectChangeRecord(creator); + ObjectChangeRecord record = new ObjectChangeRecord(); + record.creatorPeer = creator; record.changes.addAll(changes); return record; } @@ -83,7 +79,7 @@ public class ObjectChangeRecord implements Storable { changeHeads = new HashSet<>((Collection) map.get("changeHeads")); changes = new HashSet<>((Collection) map.get("changes")); - creatorPeer = new NetworkObject.ObjectID((Long) map.get("creator")); + creatorPeer = new ObjectID((Long) map.get("creator")); } public String toString() @@ -115,7 +111,7 @@ public class ObjectChangeRecord implements Storable return Util.xorBytesToLong(bytes); } - public NetworkObject.ObjectID getCreatorPeer() + public ObjectID getCreatorPeer() { return creatorPeer; } @@ -131,12 +127,12 @@ public class ObjectChangeRecord implements Storable return changeHeads; } - public record Change(NetworkObject.ObjectID objectID, Map beforeValues, Map afterValues) + public record Change(ObjectID objectID, Map beforeValues, Map afterValues) { public static Change createFromObjectChange(ObjectStatements.ObjectChange change) { - return new Change(new NetworkObject.ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap()); + return new Change(new ObjectID(change.getObjectId()), change.getBeforeMap(), change.getAfterMap()); } public static Change createFromObjectStates(ObjectStatements.ObjectState before, ObjectStatements.ObjectState after) @@ -155,7 +151,7 @@ public class ObjectChangeRecord implements Storable } if (!afterValues.isEmpty()) { - return new Change(new NetworkObject.ObjectID(before.getObjectId()), beforeValues, afterValues); + return new Change(new ObjectID(before.getObjectId()), beforeValues, afterValues); } return null; } diff --git a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java index 5788394..0ebefb9 100644 --- a/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java +++ b/src/main/java/moe/nekojimi/friendcloud/ObjectChangeTransaction.java @@ -1,6 +1,7 @@ package moe.nekojimi.friendcloud; import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.tasks.PropagateMessageTask; @@ -13,29 +14,29 @@ import java.util.Set; public class ObjectChangeTransaction implements Closeable { - private final NetworkObject.ObjectID creator; + private final ObjectID creator; private final ConnectionManager connectionManager; - private final Map beforeStates = new HashMap<>(); + private final Map beforeStates = new HashMap<>(); private boolean ended = false; - ObjectChangeTransaction(ConnectionManager connectionManager, NetworkObject.ObjectID creator) + ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator) { this.creator = creator; this.connectionManager = connectionManager; } - public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, NetworkObject.ObjectID creatorPeer, NetworkObject.ObjectID... objects) + public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer, ObjectID... objects) { ObjectChangeTransaction builder = new ObjectChangeTransaction(connectionManager, creatorPeer); - for (NetworkObject.ObjectID id : objects) + for (ObjectID id : objects) { builder.addObjectBeforeChange(id); } return builder; } - public ObjectChangeTransaction addObjectBeforeChange(NetworkObject.ObjectID id) + public ObjectChangeTransaction addObjectBeforeChange(ObjectID id) { NetworkObject object = Main.getInstance().getModel().getObject(id); if (object != null) @@ -50,7 +51,7 @@ public class ObjectChangeTransaction implements Closeable ended = true; Set changes = new HashSet<>(); - for (Map.Entry entry : beforeStates.entrySet()) + for (Map.Entry entry : beforeStates.entrySet()) { ObjectStatements.ObjectState afterState = Main.getInstance().getModel().getObject(entry.getKey()).buildObjectState().build(); ObjectChangeRecord.Change change = ObjectChangeRecord.Change.createFromObjectStates(entry.getValue(), afterState); diff --git a/src/main/java/moe/nekojimi/friendcloud/Util.java b/src/main/java/moe/nekojimi/friendcloud/Util.java index 4534dbe..f000a5f 100644 --- a/src/main/java/moe/nekojimi/friendcloud/Util.java +++ b/src/main/java/moe/nekojimi/friendcloud/Util.java @@ -17,4 +17,16 @@ public class Util return ret; } + public static long unconditionalNumberToLong(Object number) + { + assert (number instanceof Number); + return ((Number)number).longValue(); + } + + public static double unconditionalNumberToDouble(Object number) + { + assert (number instanceof Number); + return ((Number)number).doubleValue(); + } + } diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java index 574eff1..ed99ba7 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java @@ -9,6 +9,7 @@ import moe.nekojimi.friendcloud.FilePieceAccess; import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.protos.CommonMessages; import moe.nekojimi.friendcloud.protos.ObjectStatements; @@ -129,9 +130,9 @@ public abstract class PeerConnection extends Thread } } - NetworkObject.ObjectID senderID = new NetworkObject.ObjectID(header.getSenderId()); + ObjectID senderID = new ObjectID(header.getSenderId()); if (peer == null) - peer = (Peer) Main.getInstance().getModel().getOrCreateObject(senderID); + peer = Main.getInstance().getModel().getOrCreateObject(senderID); else { if (!senderID.equals(peer.getObjectID())) @@ -194,7 +195,7 @@ public abstract class PeerConnection extends Thread { objectList.addStates(object.buildObjectState()); } - System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList()); +// System.out.println("Replying to ObjectListRequest with ObjectList, objects=" + objectList.getStatesList()); sendMessage(wrapMessage(objectList.build(), header)); } else if (body.is(PieceMessages.FilePiecesRequestMessage.class)) @@ -205,7 +206,7 @@ public abstract class PeerConnection extends Thread replyWithError(CommonMessages.Error.ERROR_INVALID_ARGUMENT, header); } - NetworkFile networkFile = (NetworkFile) Main.getInstance().getModel().getObject(new NetworkObject.ObjectID(filePiecesRequestMessage.getFileId())); + NetworkFile networkFile = (NetworkFile) Main.getInstance().getModel().getObject(new ObjectID(filePiecesRequestMessage.getFileId())); if (networkFile == null) { replyWithError(CommonMessages.Error.ERROR_OBJECT_NOT_FOUND, header); diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java index d63e071..8abe51a 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/PeerTCPConnection.java @@ -47,10 +47,13 @@ public class PeerTCPConnection extends PeerConnection messageReceived(message); } } - } catch (IOException ex) + } catch (Exception ex) { // fuck + ex.printStackTrace(System.err); } + + System.out.println("TCP Connection: connection closed"); } @Override diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java index db8fbb3..d1bfe53 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/ObjectListRequest.java @@ -4,6 +4,7 @@ import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.protos.ObjectStatements; import java.io.IOException; @@ -37,15 +38,16 @@ public class ObjectListRequest extends Request fileStates = new HashMap<>(); + private final SortedSet peersWithCopy = new TreeSet<>(); +// private final Map fileStates = new HashMap<>(); private BitSet pieces = new BitSet(); // private List pieces = new ArrayList<>(); @@ -97,6 +99,11 @@ public class NetworkFile extends NetworkFSNode System.out.println("Total hash: " + HexFormat.of().formatHex(hash)); System.out.println("Have " + pieces.cardinality() + " of " + getPieceCount() + " pieces."); + if (pieces.cardinality() >= getPieceCount()) + { + peersWithCopy.add(Main.getInstance().getModel().getSelfPeer().getObjectID()); + } + } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); @@ -115,6 +122,15 @@ public class NetworkFile extends NetworkFSNode hash = HexFormat.of().parseHex(state.getValuesOrThrow("hash")); if (state.containsValues("pieceSize")) pieceSize = Long.parseLong(state.getValuesOrThrow("pieceSize")); + if (state.containsValues("peersWithCopy")) + { + peersWithCopy.clear(); + String[] peers = state.getValuesOrThrow("peersWithCopy").split(","); + for (String peer: peers) + { + peersWithCopy.add(new ObjectID(Long.parseUnsignedLong(peer,16))); + } + } } @Override @@ -124,7 +140,8 @@ public class NetworkFile extends NetworkFSNode // .putValues("path", path) .putValues("size", Long.toString(size)) .putValues("hash", HexFormat.of().formatHex(hash)) - .putValues("pieceSize", Long.toString(pieceSize)); + .putValues("pieceSize", Long.toString(pieceSize)) + .putValues("peersWithCopy", peersWithCopy.stream().map(ObjectID::toLong).map(Long::toHexString).collect(Collectors.joining(","))); } @Override @@ -136,6 +153,7 @@ public class NetworkFile extends NetworkFSNode ret.put("pieceSize", pieceSize); ret.put("pieces", Arrays.stream(pieces.toLongArray()).boxed().toList()); ret.put("localFile", localFile != null ? localFile.getAbsolutePath() : ""); + ret.put("peersWithCopy", peersWithCopy.stream().map(ObjectID::toLong).toList()); return ret; } @@ -146,13 +164,14 @@ public class NetworkFile extends NetworkFSNode size = ((Number) map.get("size")).longValue(); hash = HexFormat.of().parseHex((CharSequence) map.get("hash")); pieceSize = ((Number) map.get("pieceSize")).longValue(); - ArrayList pieces1 = (ArrayList) map.get("pieces"); - pieces = BitSet.valueOf(pieces1.stream().mapToLong(Number::longValue).toArray()); + pieces = BitSet.valueOf(((ArrayList) map.get("pieces")).stream().mapToLong(Number::longValue).toArray()); String localFilePath = (String) map.get("localFile"); if (localFilePath.isEmpty()) localFile = null; else localFile = new File(localFilePath); + peersWithCopy.clear(); + peersWithCopy.addAll(((List)map.get("peersWithCopy")).stream().map(Util::unconditionalNumberToLong).map(ObjectID::new).toList()); } public File getLocalFile() @@ -241,7 +260,7 @@ public class NetworkFile extends NetworkFSNode synchronized (pieces) { pieces.set(pieceIdx, has); - pieces.notifyAll(); + notifyPieceWaiters(); } } @@ -255,14 +274,24 @@ public class NetworkFile extends NetworkFSNode return size; } - void addFileState(PeerFileState peerFileState) +// void addFileState(PeerFileState peerFileState) +// { +// fileStates.put(peerFileState.getNode(), peerFileState); +// } +// +// public Map getFileStates() +// { +// return fileStates; +// } + + public void addPeerWithCopy(Peer selfPeer) { - fileStates.put(peerFileState.getNode(), peerFileState); + peersWithCopy.add(selfPeer.getObjectID()); } - public Map getFileStates() + public List getPeersWithCopy() { - return fileStates; + return peersWithCopy.stream().map(objectID -> (Peer) Main.getInstance().getModel().getObject(objectID)).toList(); } /** @@ -301,6 +330,14 @@ public class NetworkFile extends NetworkFSNode return false; } + public void notifyPieceWaiters() + { + synchronized (pieces) + { + pieces.notifyAll(); + } + } + public enum StorageType { /** The file will be stored as a complete file in the storage directory under it's own name and file path. */ diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFolder.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFolder.java index b8f02c3..7330fe7 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFolder.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkFolder.java @@ -3,9 +3,6 @@ package moe.nekojimi.friendcloud.objects; import moe.nekojimi.friendcloud.protos.ObjectStatements; import java.util.Map; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.WeakHashMap; public class NetworkFolder extends NetworkFSNode { diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java index 07fcc1c..62e6b2d 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java @@ -2,12 +2,13 @@ package moe.nekojimi.friendcloud.objects; import moe.nekojimi.friendcloud.protos.ObjectStatements; import moe.nekojimi.friendcloud.storage.Storable; +import org.jetbrains.annotations.NotNull; import java.util.HashMap; import java.util.Map; import java.util.Objects; -public abstract class NetworkObject implements Storable +public abstract class NetworkObject implements Storable, Comparable { private final ObjectID objectID; @@ -53,67 +54,23 @@ public abstract class NetworkObject implements Storable .setObjectId(objectID.toLong()); } - public static class ObjectID + @Override + public int compareTo(@NotNull NetworkObject networkObject) { - private final ObjectStatements.ObjectType type; - private final int systemID; - private final int uniqueID; + return Long.compare(getObjectID().toLong(), networkObject.getObjectID().toLong()); + } - public ObjectID(long id) - { - uniqueID = (int)(0x00000000_FFFFFFFFL & id); - systemID = Math.toIntExact((0x00FFFFFF_00000000L & id) >>> 32); - type = ObjectStatements.ObjectType.forNumber(Math.toIntExact(((0xFF000000_00000000L & id) >>> 56))); - } + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + NetworkObject that = (NetworkObject) o; + return Objects.equals(objectID, that.objectID); + } - public ObjectID(ObjectStatements.ObjectType type, int systemID, int uniqueID) - { - this.type = type; - this.systemID = systemID; - this.uniqueID = uniqueID; - } - - public long toLong() - { - long uniquePart = Integer.toUnsignedLong(uniqueID); - long systemPart = Integer.toUnsignedLong(systemID) << 32; - long typePart = ((long) type.getNumber()) << 56; - return typePart | systemPart | uniquePart; - } - - public ObjectStatements.ObjectType getType() - { - return type; - } - - public int getSystemID() - { - return systemID; - } - - public int getUniqueID() - { - return uniqueID; - } - - @Override - public String toString() - { - return "OBJ{" + Long.toHexString(toLong()) + "}"; - } - - @Override - public boolean equals(Object o) - { - if (o == null || getClass() != o.getClass()) return false; - ObjectID objectID = (ObjectID) o; - return systemID == objectID.systemID && uniqueID == objectID.uniqueID && type == objectID.type; - } - - @Override - public int hashCode() - { - return Objects.hash(type, systemID, uniqueID); - } + @Override + public int hashCode() + { + return Objects.hashCode(objectID); } } diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/ObjectID.java b/src/main/java/moe/nekojimi/friendcloud/objects/ObjectID.java new file mode 100644 index 0000000..bbe332e --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/objects/ObjectID.java @@ -0,0 +1,75 @@ +package moe.nekojimi.friendcloud.objects; + +import org.jetbrains.annotations.NotNull; + +import java.util.Objects; + +public class ObjectID implements Comparable +{ + private final moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType type; + private final int systemID; + private final int uniqueID; + + public ObjectID(long id) + { + uniqueID = (int) (0x00000000_FFFFFFFFL & id); + systemID = Math.toIntExact((0x00FFFFFF_00000000L & id) >>> 32); + type = moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType.forNumber(Math.toIntExact(((0xFF000000_00000000L & id) >>> 56))); + } + + public ObjectID(moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType type, int systemID, int uniqueID) + { + this.type = type; + this.systemID = systemID; + this.uniqueID = uniqueID; + } + + public long toLong() + { + long uniquePart = Integer.toUnsignedLong(uniqueID); + long systemPart = Integer.toUnsignedLong(systemID) << 32; + long typePart = ((long) type.getNumber()) << 56; + return typePart | systemPart | uniquePart; + } + + public moe.nekojimi.friendcloud.protos.ObjectStatements.ObjectType getType() + { + return type; + } + + public int getSystemID() + { + return systemID; + } + + public int getUniqueID() + { + return uniqueID; + } + + @Override + public String toString() + { + return "OBJ{" + Integer.toHexString(type.getNumber()) + "-" + Integer.toHexString(systemID) + "-" + Integer.toHexString(uniqueID) + "}"; + } + + @Override + public boolean equals(Object o) + { + if (o == null || getClass() != o.getClass()) return false; + ObjectID objectID = (ObjectID) o; + return systemID == objectID.systemID && uniqueID == objectID.uniqueID && type == objectID.type; + } + + @Override + public int hashCode() + { + return Objects.hash(type, systemID, uniqueID); + } + + @Override + public int compareTo(@NotNull ObjectID objectID) + { + return Long.compare(toLong(), objectID.toLong()); + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java b/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java index ca67e8d..46d91c7 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/Peer.java @@ -9,11 +9,11 @@ import java.util.stream.Collectors; public class Peer extends NetworkObject { - private final List addresses = new ArrayList<>(); + private final SortedSet addresses = new TreeSet<>(); private String userName = ""; private String systemName = ""; - private Map fileStates = new HashMap<>(); +// private Map fileStates = new HashMap<>(); private volatile int lastTriedAddressIdx = -1; @@ -94,25 +94,17 @@ public class Peer extends NetworkObject addresses.add(address); } - public URI getNextAddress() - { - lastTriedAddressIdx++; - if (lastTriedAddressIdx >= addresses.size()) - lastTriedAddressIdx = 0; - return addresses.get(lastTriedAddressIdx); - } +// void addFileState(PeerFileState peerFileState) +// { +// fileStates.put(peerFileState.getFile(), peerFileState); +// } +// +// public Map getFileStates() +// { +// return fileStates; +// } - void addFileState(PeerFileState peerFileState) - { - fileStates.put(peerFileState.getFile(), peerFileState); - } - - public Map getFileStates() - { - return fileStates; - } - - public List getAddresses() + public SortedSet getAddresses() { return addresses; } diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java b/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java index 79b80c6..56d61ad 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java @@ -1,14 +1,14 @@ package moe.nekojimi.friendcloud.objects; -import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.Util; import moe.nekojimi.friendcloud.protos.ObjectStatements; import java.util.Map; public class PeerFileState extends NetworkObject { - private Peer peer; - private NetworkFile file; + private ObjectID peerID; + private ObjectID fileID; private double progress = 0; @@ -21,13 +21,10 @@ public class PeerFileState extends NetworkObject public void updateFromStateMessage(ObjectStatements.ObjectState state) { super.updateFromStateMessage(state); - peer = (Peer) Main.getInstance().getModel().getOrCreateObject(new ObjectID(Long.parseLong(state.getValuesOrThrow("peer")))); - file = (NetworkFile) Main.getInstance().getModel().getOrCreateObject(new ObjectID(Long.parseLong(state.getValuesOrThrow("file")))); + peerID = new ObjectID(Long.parseLong(state.getValuesOrThrow("peer"))); + fileID = new ObjectID(Long.parseLong(state.getValuesOrThrow("file"))); if (state.containsValues("progress")) progress = Double.parseDouble(state.getValuesOrThrow("progress")); - - peer.addFileState(this); - file.addFileState(this); } @Override @@ -40,20 +37,20 @@ public class PeerFileState extends NetworkObject public ObjectStatements.ObjectState.Builder buildObjectState() { return super.buildObjectState() - .putValues("peer", Long.toString(peer.getObjectID().toLong())) - .putValues("file", Long.toString(file.getObjectID().toLong())) +// .putValues("peer", Long.toString(peer.getObjectID().toLong())) +// .putValues("file", Long.toString(file.getObjectID().toLong())) .putValues("progress", Double.toString(progress)); } - public void setNode(Peer peer) - { - this.peer = peer; - } - - public void setFile(NetworkFile file) - { - this.file = file; - } +// public void setNode(Peer peer) +// { +// this.peer = peer; +// } +// +// public void setFile(NetworkFile file) +// { +// this.file = file; +// } public double getProgress() { @@ -65,19 +62,31 @@ public class PeerFileState extends NetworkObject this.progress = progress; } - public NetworkFile getFile() - { - return file; - } +// public NetworkFile getFile() +// { +// return file; +// } +// +// public Peer getNode() +// { +// return peer; +// } - public Peer getNode() + @Override + public Map getStateMap() { - return peer; + Map ret = super.getStateMap(); +// ret.put("peer", peer == null ? 0L : peer.getObjectID().toLong()); +// ret.put("file", file == null? 0L : file.getObjectID().toLong()); + ret.put("progress", progress); + return ret; } @Override public void updateFromStateMap(Map map) { - +// peer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("peer")))); +// file = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.get("file")))); + progress = Util.unconditionalNumberToDouble( map.get("progress")); } } diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java b/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java index e159629..41cc4fd 100644 --- a/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java +++ b/src/main/java/moe/nekojimi/friendcloud/storage/LocalData.java @@ -1,7 +1,9 @@ package moe.nekojimi.friendcloud.storage; import moe.nekojimi.friendcloud.Main; -import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.ObjectChangeRecord; +import moe.nekojimi.friendcloud.Util; +import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.Peer; import java.util.Map; @@ -9,6 +11,8 @@ import java.util.Map; public class LocalData implements Storable { private Peer localPeer; + private ObjectChangeRecord currentChangeRecord; + private int systemID; public Peer getLocalPeer() { @@ -20,6 +24,26 @@ public class LocalData implements Storable this.localPeer = localPeer; } + public ObjectChangeRecord getCurrentChangeRecord() + { + return currentChangeRecord; + } + + public void setCurrentChangeRecord(ObjectChangeRecord currentChangeRecord) + { + this.currentChangeRecord = currentChangeRecord; + } + + public int getSystemID() + { + return systemID; + } + + public void setSystemID(int systemID) + { + this.systemID = systemID; + } + @Override public long getStorageID() { @@ -29,12 +53,17 @@ public class LocalData implements Storable @Override public Map getStateMap() { - return Map.of("localPeer", localPeer.getObjectID().toLong()); + return Map.of("localPeer", localPeer == null ? 0L : localPeer.getObjectID().toLong(), + "currentChangeRecord", currentChangeRecord == null ? 0L : currentChangeRecord.getChangeID(), + "systemID", systemID); } @Override public void updateFromStateMap(Map map) { - localPeer = (Peer) Main.getInstance().getModel().getObject(new NetworkObject.ObjectID((Long) map.get("localPeer"))); + localPeer = Main.getInstance().getModel().getObject(new ObjectID(Util.unconditionalNumberToLong(map.getOrDefault("localPeer",0)))); + currentChangeRecord = Main.getInstance().getModel().getChangeRecord(Util.unconditionalNumberToLong(map.getOrDefault("currentChangeRecord",0))); + systemID = (int) map.getOrDefault("systemID", 0); + System.out.println("LocalData: resumed state, localPeer=" + localPeer + ", currentChangeRecord=" + currentChangeRecord + ", systemID=" + Integer.toHexString(systemID)); } } diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/Model.java b/src/main/java/moe/nekojimi/friendcloud/storage/Model.java index 1078689..9d982a9 100644 --- a/src/main/java/moe/nekojimi/friendcloud/storage/Model.java +++ b/src/main/java/moe/nekojimi/friendcloud/storage/Model.java @@ -11,37 +11,53 @@ public class Model { private final CachingDataStore dataStore; - - private final int systemID; - - private Peer selfPeer = null; - private ObjectChangeRecord currentChange; + private LocalData localData; public Model(DataStore dataStore) { this.dataStore = new CachingDataStore(dataStore); - Random ran = new Random(); - systemID = ran.nextInt() & 0x00FFFFFF; } - public void setSelfPeer(Peer selfPeer) + public synchronized void init() { - this.selfPeer = selfPeer; + + List localDataList = dataStore.getDAOForClass(LocalData.class).getAll(); + if (localDataList.isEmpty()) + { + localData = dataStore.getDAOForClass(LocalData.class).create(0); + } + else if (localDataList.size() == 1) + { + localData = localDataList.getFirst(); + } + else + { + throw new IllegalStateException("We have more than one LocalData somehow!!"); + } + if (localData.getSystemID() == 0) + { + Random ran = new Random(); + localData.setSystemID(ran.nextInt() & 0x00FFFFFF); + objectChanged(localData); + } } public synchronized Peer getSelfPeer() { - if (selfPeer == null) - selfPeer = createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER); - return selfPeer; + if (localData.getLocalPeer() == null) + { + localData.setLocalPeer(createObjectByType(ObjectStatements.ObjectType.OBJECT_TYPE_PEER)); + objectChanged(localData); + } + return localData.getLocalPeer(); } // private Map nodes = new HashMap<>(); - public synchronized NetworkObject.ObjectID getNextObjectID(ObjectStatements.ObjectType type) + public synchronized ObjectID getNextObjectID(ObjectStatements.ObjectType type) { Random ran = new Random(); int randomNumber = ran.nextInt(); - NetworkObject.ObjectID objectID = new NetworkObject.ObjectID(type, systemID, randomNumber); + ObjectID objectID = new ObjectID(type, localData.getSystemID(), randomNumber); System.out.println("Assigned new object ID: " + objectID); return objectID; } @@ -59,7 +75,7 @@ public class Model }; } - public synchronized T createObjectByID(NetworkObject.ObjectID id) + public synchronized T createObjectByID(ObjectID id) { if (id.toLong() == 0) throw new IllegalArgumentException("Cannot create an object with ID=0!"); @@ -76,7 +92,7 @@ public class Model return createObjectByID(getNextObjectID(type)); } - public synchronized T getObject(NetworkObject.ObjectID id) + public synchronized T getObject(ObjectID id) { if (id.toLong() == 0) return null; @@ -84,7 +100,7 @@ public class Model return dataStore.getDAOForClass(clazz).get(id.toLong()); } - public synchronized T getOrCreateObject(NetworkObject.ObjectID id) + public synchronized T getOrCreateObject(ObjectID id) { if (id.toLong() == 0) return null; @@ -138,13 +154,15 @@ public class Model public ObjectChangeRecord getChangeRecord(long id) { - return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id); + if (id == 0) + return null; + return dataStore.getDAOForClass(ObjectChangeRecord.class).get(id); } public void applyChangeRecord(ObjectChangeRecord record) { - if (!record.getChangeHeads().contains(currentChange.getChangeID())) - throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + currentChange.getChangeID()); + if (!record.getChangeHeads().contains(localData.getCurrentChangeRecord().getChangeID())) + throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord().getChangeID()); addChangeRecord(record); diff --git a/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java b/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java index 37c1a9c..4ea043f 100644 --- a/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java +++ b/src/main/java/moe/nekojimi/friendcloud/storage/StupidJSONFileStore.java @@ -1,5 +1,6 @@ package moe.nekojimi.friendcloud.storage; +import moe.nekojimi.friendcloud.ObjectChangeRecord; import moe.nekojimi.friendcloud.objects.*; import org.jetbrains.annotations.NotNull; import org.json.JSONArray; @@ -8,10 +9,10 @@ import org.json.JSONObject; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.lang.reflect.Modifier; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.util.*; public class StupidJSONFileStore extends DataStore @@ -36,17 +37,21 @@ public class StupidJSONFileStore extends DataStore if (daos.containsKey(clazz)) return (DAO) daos.get(clazz); - DAO ret; + DAO ret; if (clazz.equals(NetworkFile.class)) - ret = (DAO) new NetworkFileDAO(); + ret = new NetworkFileDAO(); else if (clazz.equals(NetworkFolder.class)) - ret = (DAO) new NetworkFolderDAO(); + ret = new NetworkFolderDAO(); else if (clazz.equals(Peer.class)) - ret = (DAO) new PeerDAO(); + ret = new PeerDAO(); else if (clazz.equals(PeerFileState.class)) - ret = (DAO) new PeerFileStateDAO(); + ret = new PeerFileStateDAO(); else if (clazz.equals(NetworkFSNode.class)) - ret = (DAO) new NetworkFSNodeDAO(); + ret = new NetworkFSNodeDAO(); + else if (clazz.equals(LocalData.class)) + ret = new LocalDataDAO(); + else if (clazz.equals(ObjectChangeRecord.class)) + ret = new ObjectChangeRecordDAO(); else throw new UnsupportedOperationException("Requested DAO for unsupported type " + clazz.getCanonicalName()); @@ -202,11 +207,17 @@ public class StupidJSONFileStore extends DataStore public T get(long id) { File file = new File(getNamespaceDirectory(), Long.toHexString(id) + ".json"); + file.getParentFile().mkdirs(); try { JSONObject json = new JSONObject(Files.readString(file.toPath())); return jsonToObject(json); - } catch (IOException e) + } catch (NoSuchFileException ex) + { + System.err.println("JSONFileStore: failed to find object with ID=" + id + ", expected in " + file.getAbsolutePath()); + return null; + } + catch (IOException e) { throw new RuntimeException(e); } @@ -216,6 +227,7 @@ public class StupidJSONFileStore extends DataStore public void update(T object) { File file = new File(getNamespaceDirectory(), Long.toHexString(object.getStorageID()) + ".json"); + file.getParentFile().mkdirs(); try(FileWriter writer = new FileWriter(file, false)) { objectToJson(object).write(writer); @@ -255,7 +267,7 @@ public class StupidJSONFileStore extends DataStore @Override protected NetworkFile makeBlank(long id) { - return new NetworkFile(new NetworkObject.ObjectID(id)); + return new NetworkFile(new ObjectID(id)); } } @@ -269,7 +281,7 @@ public class StupidJSONFileStore extends DataStore @Override protected NetworkFolder makeBlank(long id) { - return new NetworkFolder(new NetworkObject.ObjectID(id)); + return new NetworkFolder(new ObjectID(id)); } } @@ -283,7 +295,7 @@ public class StupidJSONFileStore extends DataStore @Override protected Peer makeBlank(long id) { - return new Peer(new NetworkObject.ObjectID(id)); + return new Peer(new ObjectID(id)); } @Override @@ -328,7 +340,37 @@ public class StupidJSONFileStore extends DataStore @Override protected PeerFileState makeBlank(long id) { - return new PeerFileState(new NetworkObject.ObjectID(id)); + return new PeerFileState(new ObjectID(id)); + } + } + + private class LocalDataDAO extends JSONObjectDAO + { + @Override + protected String getNamespace() + { + return "localData"; + } + + @Override + protected LocalData makeBlank(long id) + { + return new LocalData(); + } + } + + private class ObjectChangeRecordDAO extends JSONObjectDAO + { + @Override + protected String getNamespace() + { + return "changes"; + } + + @Override + protected ObjectChangeRecord makeBlank(long id) + { + return new ObjectChangeRecord(); } } } diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java index a54eeeb..4584a4b 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/FileDownloadTask.java @@ -2,24 +2,27 @@ package moe.nekojimi.friendcloud.tasks; import moe.nekojimi.friendcloud.ConnectionManager; +import moe.nekojimi.friendcloud.Main; +import moe.nekojimi.friendcloud.NotificationManager; +import moe.nekojimi.friendcloud.ObjectChangeTransaction; import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest; import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.Peer; -import moe.nekojimi.friendcloud.objects.PeerFileState; import org.jetbrains.annotations.NotNull; import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.*; +import java.util.stream.Collectors; public class FileDownloadTask implements RunnableFuture { private final NetworkFile file; - private final ConnectionManager manager; + private final ConnectionManager connectionManager; - private final long timeoutPerPieceMs = 10_000; + private final long timeoutPerPieceMs = 1_000; private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128; private final SortedSet missingPieceIndices = new TreeSet<>(); @@ -28,20 +31,20 @@ public class FileDownloadTask implements RunnableFuture private boolean failed = false; private final Object waitObject = new Object(); - public FileDownloadTask(NetworkFile file, ConnectionManager manager) + public FileDownloadTask(NetworkFile file, ConnectionManager connectionManager) { this.file = file; - this.manager = manager; + this.connectionManager = connectionManager; for (int i = 0; i < file.getPieceCount(); i++) { missingPieceIndices.add(i); } } - public FileDownloadTask(NetworkFile file, ConnectionManager manager, SortedSet missingPieces) + public FileDownloadTask(NetworkFile file, ConnectionManager connectionManager, SortedSet missingPieces) { this.file = file; - this.manager = manager; + this.connectionManager = connectionManager; missingPieceIndices.addAll(missingPieces); } @@ -53,38 +56,46 @@ public class FileDownloadTask implements RunnableFuture @Override public void run() { - System.out.println("Starting download of file " + file.getName()); + System.out.println("Starting download of file " + file.getName() + " (pieces: " + missingPieceIndices + ")"); +// NotificationManager.Notification notification = Main.getInstance().getNotificationManager().createNotification("Streaming " + file.getName(), "Starting download...", NotificationManager.NotificationType.TRANSFER_IN_PROGRESS); + int startingPieces = missingPieceIndices.size(); + + String connectionLine = ""; + String progressLine = ""; + + Peer selfPeer = Main.getInstance().getModel().getSelfPeer(); while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done) { System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces."); - Map fileStates = file.getFileStates(); +// Map fileStates = file.getFileStates(); // determine what nodes we can connect to List connections = new ArrayList<>(); - for (PeerFileState peerFileState : fileStates.values()) + for (Peer peer : file.getPeersWithCopy()) { - if (peerFileState.getProgress() >= 100.0) + if (peer == selfPeer) + continue; // yeah that's us dipshit + PeerConnection connection = connectionManager.getNodeConnection(peer); + if (connection != null) { - try - { - PeerConnection connection = manager.getNodeConnection(peerFileState.getNode()); - System.out.println("FileDownloadTask: Will download from " + peerFileState.getNode().getNodeName()); - connections.add(connection); - } catch (IOException ex) - { - System.err.println("Failed to connect to peer " + peerFileState.getNode().getNodeName() + ": " + ex.getMessage()); - } + System.out.println("FileDownloadTask: Will download from " + peer.getNodeName()); + connections.add(connection); } } +// connectionLine = "Connected to " + connections.size() + " peers."; +// notification.setBody(connectionLine + "\n" + progressLine); +// connectionLine = "Connected to " + connections.stream().map(PeerConnection::getNode).map(Peer::getNodeName).collect(Collectors.joining(", ")); + // shuffle the connections list Collections.shuffle(connections); if (connections.isEmpty()) { System.err.println("FileDownloadTask: No peers have the file, download failed!"); + file.notifyPieceWaiters(); failed = true; break; } @@ -136,11 +147,29 @@ public class FileDownloadTask implements RunnableFuture System.err.println("FileDownloadTask: Request timed out."); } catch (ExecutionException | TimeoutException e) { + future.cancel(true); e.printStackTrace(System.err); } + +// progressLine = "Have " + (startingPieces - missingPieceIndices.size()) + " / " + missingPieceIndices.size() + " pieces. (" + file.getDownloadPercentage() + "%)"; +// notification.setBody(connectionLine + "\n" + progressLine); } } + if (file.getDownloadPercentage() >= 100.0) + { + try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID())) + { + transaction.addObjectBeforeChange(file.getObjectID()); + file.addPeerWithCopy(selfPeer); + } + catch (IOException ex) + { + + } + } + +// notification.setBody("Finished downloading!"); System.out.println("FileDownloadTask: finished downloading " + file.getName() + "!"); done = true; synchronized (waitObject) diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java index a6b1bd2..1858310 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/JoinNetworkTask.java @@ -2,7 +2,7 @@ package moe.nekojimi.friendcloud.tasks; import moe.nekojimi.friendcloud.Main; import moe.nekojimi.friendcloud.ObjectChangeTransaction; -import moe.nekojimi.friendcloud.objects.NetworkObject; +import moe.nekojimi.friendcloud.objects.ObjectID; import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.protos.ObjectStatements; @@ -15,7 +15,7 @@ public class JoinNetworkTask implements Runnable public void run() { // generate new peer ID - NetworkObject.ObjectID peerID = null; + ObjectID peerID = null; try (ObjectChangeTransaction builder = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), peerID)) { Peer selfPeer = Main.getInstance().getModel().getSelfPeer(); diff --git a/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java b/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java index a396b94..d6f01db 100644 --- a/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/tasks/SyncWithNetworkTask.java @@ -22,20 +22,12 @@ public class SyncWithNetworkTask implements Runnable for (Peer peer : Main.getInstance().getModel().listOtherPeers()) { // open a connection - try - { - PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer); - // send a ObjectChangeRequest - ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet())); - CompletableFuture> future = connection.makeRequest(objectChangeRequest); - - // integrate the returned changes with our change graph - } catch (IOException e) - { - System.err.println("SyncWithNetworkTask: Couldn't connect to " + peer + ": " + e.getMessage()); - continue; - } + PeerConnection connection = Main.getInstance().getConnectionManager().getNodeConnection(peer); + // send a ObjectChangeRequest + ObjectChangeRequest objectChangeRequest = new ObjectChangeRequest(Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet())); + CompletableFuture> future = connection.makeRequest(objectChangeRequest); + // integrate the returned changes with our change graph } // if no peers could be contacted: // return success (everyone's offline)