package moe.nekojimi.friendcloud; import moe.nekojimi.friendcloud.network.PeerConnection; import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest; import moe.nekojimi.friendcloud.objects.NetworkFile; import moe.nekojimi.friendcloud.objects.Peer; import moe.nekojimi.friendcloud.objects.PeerFileState; import java.io.File; import java.io.IOException; import java.util.*; import java.util.concurrent.*; public class FileDownloadTask implements Callable { private final NetworkFile file; private final ConnectionManager manager; private final long timeoutPerPieceMs = 10_000; private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128; private final SortedSet missingPieceIndices = new TreeSet<>(); public FileDownloadTask(NetworkFile file, ConnectionManager manager) { this.file = file; this.manager = manager; for (int i = 0; i < file.getPieceCount(); i++) { missingPieceIndices.add(i); } } public FileDownloadTask(NetworkFile file, ConnectionManager manager, List missingPieces) { this.file = file; this.manager = manager; missingPieceIndices.addAll(missingPieces); } public NetworkFile getFile() { return file; } @Override public File call() throws Exception { System.out.println("Starting download of file " + file.getName()); while (!missingPieceIndices.isEmpty()) { System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces."); Map fileStates = file.getFileStates(); // determine what nodes we can connect to List connections = new ArrayList<>(); for (PeerFileState peerFileState : fileStates.values()) { if (peerFileState.getProgress() >= 100.0) { try { PeerConnection connection = manager.getNodeConnection(peerFileState.getNode()); System.out.println("FileDownloadTask: Will download from " + peerFileState.getNode().getNodeName()); connections.add(connection); } catch (IOException ex) { System.err.println("Failed to connect to peer " + peerFileState.getNode().getNodeName() + ": " + ex.getMessage()); } } } if (connections.isEmpty()) { System.err.println("FileDownloadTask: No peers have the file, download failed!"); return null; } // find a continuous run of pieces to download // TODO: allow for runs with regular gaps (e.g. every 2) to account for previous failed download attempts int runStart = -1; int runEnd = -1; for (int pieceIdx: missingPieceIndices) { int runLength = runEnd - runStart; if (runLength >= MAX_DOWNLOAD_PIECES_PER_ROUND) break; else if (runStart == -1) { runStart = pieceIdx; runEnd = pieceIdx; } else if (pieceIdx == runEnd + 1) runEnd = pieceIdx; else break; } System.out.println("FileDownloadTask: Will download pieces from " + runStart + " to " + runEnd); // make one request per connectable peer, striping the needed pieces among them List>> fileFutures = new ArrayList<>(); int offset = 0; for (PeerConnection connection : connections) { CompletableFuture> future = connection.makeRequest(new FilePiecesRequest(file, runStart+offset, (runEnd-runStart)+1, connections.size())); fileFutures.add(future); offset++; } long timeout = timeoutPerPieceMs * (missingPieceIndices.size() / connections.size()); // wait for all the requests to complete for (CompletableFuture> future : fileFutures) { try { List receivedPieces = future.get(timeout, TimeUnit.MILLISECONDS); receivedPieces.forEach(missingPieceIndices::remove); } catch (InterruptedException e) { future.cancel(true); timeout = 1_000; System.err.println("FileDownloadTask: Request timed out."); } catch (ExecutionException | TimeoutException e) { e.printStackTrace(System.err); } } } System.out.println("FileDownloadTask: finished downloading " + file.getName() + "!"); return file.getLocalFile(); } }