diff --git a/src/main/java/moe/nekojimi/friendcloud/DownloadManager.java b/src/main/java/moe/nekojimi/friendcloud/DownloadManager.java new file mode 100644 index 0000000..6f53547 --- /dev/null +++ b/src/main/java/moe/nekojimi/friendcloud/DownloadManager.java @@ -0,0 +1,69 @@ +package moe.nekojimi.friendcloud; + +import moe.nekojimi.friendcloud.objects.NetworkFile; + +import java.io.File; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.FutureTask; + +public class DownloadManager +{ + private static DownloadManager instance = null; + public static DownloadManager getInstance() + { + if(instance == null) + instance = new DownloadManager(); + return instance; + } + private DownloadManager() + { + + } + + public Map> activeDownloads = new HashMap<>(); + + public CompletableFuture downloadPieces(NetworkFile file, SortedSet pieces) + { + cullCompletedDownloads(); + + Set> tasksToWaitFor = new HashSet<>(); + SortedSet inactivePieces = new TreeSet<>(pieces); + for (Map.Entry> e : activeDownloads.entrySet()) + { + if (!Collections.disjoint(inactivePieces, e.getKey().getMissingPieceIndices())) + { + inactivePieces.removeAll(e.getKey().getMissingPieceIndices()); + tasksToWaitFor.add(e.getValue()); + } + } + + if (!inactivePieces.isEmpty()) + { + FileDownloadTask downloadTask = new FileDownloadTask(file, Main.getInstance().getConnectionManager(), pieces); + CompletableFuture completableFuture = CompletableFuture.runAsync(downloadTask,Main.getInstance().getExecutor()); + activeDownloads.put(downloadTask,completableFuture); + return completableFuture; + } + + return CompletableFuture.allOf(tasksToWaitFor.toArray(new CompletableFuture[0])); + + } + + private void cullCompletedDownloads() + { + Set toRemove = new HashSet<>(); + for (Map.Entry> e : activeDownloads.entrySet()) + { + if (e.getValue().isDone()) + toRemove.add(e.getKey()); + } + for (FileDownloadTask t: toRemove) + activeDownloads.remove(t); + } + + public Map> getActiveDownloads() + { + return Collections.unmodifiableMap(activeDownloads); + } +} diff --git a/src/main/java/moe/nekojimi/friendcloud/FileDownloadTask.java b/src/main/java/moe/nekojimi/friendcloud/FileDownloadTask.java index f776e1d..8cf706a 100644 --- a/src/main/java/moe/nekojimi/friendcloud/FileDownloadTask.java +++ b/src/main/java/moe/nekojimi/friendcloud/FileDownloadTask.java @@ -6,13 +6,14 @@ import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest; import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.PeerFileState; +import org.jetbrains.annotations.NotNull; import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.*; -public class FileDownloadTask implements Callable +public class FileDownloadTask implements RunnableFuture { private final NetworkFile file; private final ConnectionManager manager; @@ -21,6 +22,10 @@ public class FileDownloadTask implements Callable private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128; private final SortedSet missingPieceIndices = new TreeSet<>(); + private boolean cancelled = false; + private boolean done = false; + private boolean failed = false; + private final Object waitObject = new Object(); public FileDownloadTask(NetworkFile file, ConnectionManager manager) { @@ -32,7 +37,7 @@ public class FileDownloadTask implements Callable } } - public FileDownloadTask(NetworkFile file, ConnectionManager manager, List missingPieces) + public FileDownloadTask(NetworkFile file, ConnectionManager manager, SortedSet missingPieces) { this.file = file; this.manager = manager; @@ -45,11 +50,11 @@ public class FileDownloadTask implements Callable } @Override - public File call() throws Exception + public void run() { System.out.println("Starting download of file " + file.getName()); - while (!missingPieceIndices.isEmpty()) + while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done) { System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces."); @@ -73,11 +78,14 @@ public class FileDownloadTask implements Callable } } + // shuffle the connections list + Collections.shuffle(connections); if (connections.isEmpty()) { System.err.println("FileDownloadTask: No peers have the file, download failed!"); - return null; + failed = true; + break; } // find a continuous run of pieces to download @@ -133,6 +141,63 @@ public class FileDownloadTask implements Callable } System.out.println("FileDownloadTask: finished downloading " + file.getName() + "!"); + done = true; + synchronized (waitObject) + { + waitObject.notifyAll(); + } + } + + public SortedSet getMissingPieceIndices() + { + return missingPieceIndices; + } + + @Override + public boolean cancel(boolean b) + { + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() + { + return cancelled; + } + + @Override + public boolean isDone() + { + return done; + } + + @Override + public File get() throws InterruptedException, ExecutionException + { + try + { + return get(0, TimeUnit.SECONDS); + } catch (TimeoutException e) + { + throw new RuntimeException(e); + } + } + + @Override + public File get(long l, @NotNull TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException + { + if (failed) + return null; + if (cancelled) + return null; + if (!done) + { + synchronized (waitObject) + { + waitObject.wait(timeUnit.toMillis(l)); + } + } return file.getLocalFile(); } } diff --git a/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java b/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java index 88f26b4..5b53987 100644 --- a/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java +++ b/src/main/java/moe/nekojimi/friendcloud/FileRemoteAccess.java @@ -5,8 +5,8 @@ import moe.nekojimi.friendcloud.objects.NetworkFile; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; @@ -15,6 +15,9 @@ public class FileRemoteAccess private final NetworkFile file; private static final double PREEMPTIVE_DOWNLOAD_THRESHOLD = 5; + private boolean preemptiveDownloadInProgress = false; +// private Set> activeTasks = new HashSet<>() + // private FilePieceAccess access; public FileRemoteAccess(NetworkFile file) @@ -25,8 +28,8 @@ public class FileRemoteAccess public synchronized byte[] read(long offset, long size) throws IOException { if (offset >= file.getSize()) - return new byte[0]; - List missingPieces = new ArrayList<>(); + return null; + SortedSet neededPieces = new TreeSet<>(); long pieceSize = file.getPieceSize(); int startPieceIdx = Math.toIntExact(Math.floorDiv(offset, pieceSize)); long endOffset = (offset + size); @@ -36,33 +39,49 @@ public class FileRemoteAccess for (int pieceIdx = startPieceIdx; pieceIdx <= endPieceIdx; pieceIdx++) { if (!file.hasPiece(pieceIdx)) - missingPieces.add(pieceIdx); + neededPieces.add(pieceIdx); } System.out.println("FRA: offset=" + offset + ", endOffset=" + endOffset + ", startPieceIdx=" + startPieceIdx + ", endPieceIdx=" + endPieceIdx); - if (!missingPieces.isEmpty()) + SortedSet piecesToDownload = new TreeSet<>(); + if (file.getDownloadPercentage() >= PREEMPTIVE_DOWNLOAD_THRESHOLD) { - System.out.println("FRA: need to get missing pieces " + missingPieces); - FileDownloadTask downloadTask = new FileDownloadTask(file, Main.getInstance().getConnectionManager(), missingPieces); - FutureTask futureTask = new FutureTask<>(downloadTask); - Main.getInstance().getExecutor().submit(futureTask); + if (!preemptiveDownloadInProgress) + { + System.out.println("FRA: downloaded " + file.getDownloadPercentage() + "% > " + PREEMPTIVE_DOWNLOAD_THRESHOLD + "% ! Threshold reached, starting preemptive download..."); + for (int pieceIdx = 0; pieceIdx < file.getPieceCount(); pieceIdx++) + { + if (!file.hasPiece(pieceIdx)) + piecesToDownload.add(pieceIdx); + } + } + preemptiveDownloadInProgress = true; + } + else + piecesToDownload = neededPieces; - try + if (!piecesToDownload.isEmpty()) + { + System.out.println("FRA: will fetch pieces " + piecesToDownload); + + DownloadManager.getInstance().downloadPieces(file, piecesToDownload) + .thenRun(() -> { + preemptiveDownloadInProgress = false; + }); +// Main.getInstance().getExecutor().submit(futureTask); + } + + if (!neededPieces.isEmpty()) + { + boolean ok = waitForPieceRange(neededPieces, 10000); + if (!ok) { - futureTask.get(); - } catch (InterruptedException | ExecutionException e) - { - e.printStackTrace(System.err); + System.err.println("FRA: timed out while waiting for pieces " + neededPieces); return null; } } -// if (file.getDownloadPercentage() >= PREEMPTIVE_DOWNLOAD_THRESHOLD) -// { -// FileDownloadTask preemptiveDownloadTask = new FileDownloadTask(file, Main.getInstance().getConnectionManager(), ) -// } - File localFile = file.getLocalFile(); if (localFile == null) { @@ -82,4 +101,15 @@ public class FileRemoteAccess return ret; } + private boolean waitForPieceRange(Set pieces, long pieceTimeoutMs) + { + for (int pieceIdx : pieces) + { + boolean ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs); + if (!ok) + return false; + } + return true; + } + } diff --git a/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java b/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java index d1c62e8..4ca96be 100644 --- a/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java +++ b/src/main/java/moe/nekojimi/friendcloud/filesystem/FUSEAccess.java @@ -12,7 +12,6 @@ import ru.serce.jnrfuse.FuseStubFS; import ru.serce.jnrfuse.struct.FileStat; import ru.serce.jnrfuse.struct.FuseFileInfo; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -97,7 +96,7 @@ public class FUSEAccess extends FuseStubFS System.err.println("FUSE: Failed to open file " + path + ": is a directory"); return -ErrorCodes.EISDIR(); } - long fileID = fsNode.getObjectID().getId(); + long fileID = fsNode.getObjectID().toLong(); synchronized (fileLock) { int openCount = fileOpenCounts.getOrDefault(fileID, 0) + 1; @@ -148,12 +147,13 @@ public class FUSEAccess extends FuseStubFS } if (bytes == null) { - System.err.println("FUSE: failed to read: buffer empty"); - return -ErrorCodes.EIO(); +// System.err.println("FUSE: failed to read: buffer empty"); + return 0; +// return -ErrorCodes.EIO(); } buf.put(0, bytes, 0, bytes.length); + return bytes.length; // System.out.println("FUSE: Read " + bytes.length + " bytes."); } - return Math.toIntExact(size); } } diff --git a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java index 59cf0c2..8a7000e 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/PeerConnection.java @@ -81,7 +81,7 @@ public abstract class PeerConnection extends Thread { CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder() .setMessageId(nextMessageId) - .setSenderId(Model.getInstance().getSelfNode().getObjectID().getId()); + .setSenderId(Model.getInstance().getSelfNode().getObjectID().toLong()); if (inReplyTo != null) headerBuilder.setReplyToMessageId(inReplyTo.getMessageId()); @@ -218,7 +218,7 @@ public abstract class PeerConnection extends Thread System.out.println("Replying to file piece request with piece " + index); PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder() .setPieceIndex(index) - .setFileId(networkFile.getObjectID().getId()) + .setFileId(networkFile.getObjectID().toLong()) .setData(ByteString.copyFrom(buffer)) .build(); sendMessage(wrapMessage(filePieceMessage, header)); diff --git a/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java b/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java index 02daf24..14e3ea3 100644 --- a/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java +++ b/src/main/java/moe/nekojimi/friendcloud/network/requests/FilePiecesRequest.java @@ -4,13 +4,9 @@ import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; import moe.nekojimi.friendcloud.FilePieceAccess; import moe.nekojimi.friendcloud.objects.NetworkFile; -import moe.nekojimi.friendcloud.objects.NetworkObject; import moe.nekojimi.friendcloud.protos.PieceMessages; -import java.io.File; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; @@ -39,7 +35,7 @@ public class FilePiecesRequest extends Request 0) + { + long startTime = System.currentTimeMillis(); + try + { + System.out.println("NetworkFile: waiting "+ timeoutMs + "ms for piece " + pieceIdx + " of file " + name); + pieces.wait(timeoutMs); + if (hasPiece(pieceIdx)) + { + System.out.println("NetworkFile: got piece we were waiting for."); + return true; + } + } catch (InterruptedException ignored) + { + } + long endTime = System.currentTimeMillis(); + long timeWaited = (endTime - startTime); + timeoutMs -= timeWaited; + } + } + System.err.println("Timed out waiting for piece!"); + return false; + } + + public enum StorageType + { + /** The file will be stored as a complete file in the storage directory under it's own name and file path. */ + COMPLETE, + /** The file will be stored as a complete file in a temporary directory. It will be deleted when the computer restarts. */ + TEMPORARY_COMPLETE, + /** Each piece of the file will be stored permanently in an individual piece file, in the pieces subdirectory. */ + PIECES, + } } diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java index b483f38..f0897ef 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/NetworkObject.java @@ -20,7 +20,7 @@ public abstract class NetworkObject public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state) { - if (state.getObjectId() != objectID.getId()) + if (state.getObjectId() != objectID.toLong()) throw new IllegalArgumentException("Wrong object!"); } @@ -32,7 +32,7 @@ public abstract class NetworkObject public ObjectStatements.ObjectState.Builder buildObjectState() { return ObjectStatements.ObjectState.newBuilder() - .setObjectId(objectID.getId()); + .setObjectId(objectID.toLong()); } public static class ObjectID @@ -55,7 +55,7 @@ public abstract class NetworkObject this.uniqueID = uniqueID; } - public long getId() + public long toLong() { long uniquePart = Integer.toUnsignedLong(uniqueID); long systemPart = Integer.toUnsignedLong(systemID) << 32; @@ -81,7 +81,7 @@ public abstract class NetworkObject @Override public String toString() { - return "OBJ{" + Long.toHexString(getId()) + "}"; + return "OBJ{" + Long.toHexString(toLong()) + "}"; } @Override diff --git a/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java b/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java index 6936dc4..813c557 100644 --- a/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java +++ b/src/main/java/moe/nekojimi/friendcloud/objects/PeerFileState.java @@ -38,8 +38,8 @@ public class PeerFileState extends NetworkObject public ObjectStatements.ObjectState.Builder buildObjectState() { return super.buildObjectState() - .putValues("peer", Long.toString(peer.getObjectID().getId())) - .putValues("file", Long.toString(file.getObjectID().getId())) + .putValues("peer", Long.toString(peer.getObjectID().toLong())) + .putValues("file", Long.toString(file.getObjectID().toLong())) .putValues("progress", Double.toString(progress)); }