Change downloads to use new DownloadManager class, which prevents redundant downloads. Also add support for waiting for file pieces independently of a FileDownloadTask.

This commit is contained in:
Nekojimi 2025-09-03 11:48:45 +01:00
parent fa687c2968
commit 6f8f424d8d
9 changed files with 260 additions and 44 deletions

View file

@ -0,0 +1,69 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import java.io.File;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.FutureTask;
public class DownloadManager
{
private static DownloadManager instance = null;
public static DownloadManager getInstance()
{
if(instance == null)
instance = new DownloadManager();
return instance;
}
private DownloadManager()
{
}
public Map<FileDownloadTask, CompletableFuture<Void>> activeDownloads = new HashMap<>();
public CompletableFuture<?> downloadPieces(NetworkFile file, SortedSet<Integer> pieces)
{
cullCompletedDownloads();
Set<CompletableFuture<Void>> tasksToWaitFor = new HashSet<>();
SortedSet<Integer> inactivePieces = new TreeSet<>(pieces);
for (Map.Entry<FileDownloadTask, CompletableFuture<Void>> e : activeDownloads.entrySet())
{
if (!Collections.disjoint(inactivePieces, e.getKey().getMissingPieceIndices()))
{
inactivePieces.removeAll(e.getKey().getMissingPieceIndices());
tasksToWaitFor.add(e.getValue());
}
}
if (!inactivePieces.isEmpty())
{
FileDownloadTask downloadTask = new FileDownloadTask(file, Main.getInstance().getConnectionManager(), pieces);
CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(downloadTask,Main.getInstance().getExecutor());
activeDownloads.put(downloadTask,completableFuture);
return completableFuture;
}
return CompletableFuture.allOf(tasksToWaitFor.toArray(new CompletableFuture[0]));
}
private void cullCompletedDownloads()
{
Set<FileDownloadTask> toRemove = new HashSet<>();
for (Map.Entry<FileDownloadTask, CompletableFuture<Void>> e : activeDownloads.entrySet())
{
if (e.getValue().isDone())
toRemove.add(e.getKey());
}
for (FileDownloadTask t: toRemove)
activeDownloads.remove(t);
}
public Map<FileDownloadTask, CompletableFuture<Void>> getActiveDownloads()
{
return Collections.unmodifiableMap(activeDownloads);
}
}

View file

@ -6,13 +6,14 @@ import moe.nekojimi.friendcloud.network.requests.FilePiecesRequest;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.Peer;
import moe.nekojimi.friendcloud.objects.PeerFileState;
import org.jetbrains.annotations.NotNull;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
public class FileDownloadTask implements Callable<File>
public class FileDownloadTask implements RunnableFuture<File>
{
private final NetworkFile file;
private final ConnectionManager manager;
@ -21,6 +22,10 @@ public class FileDownloadTask implements Callable<File>
private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128;
private final SortedSet<Integer> missingPieceIndices = new TreeSet<>();
private boolean cancelled = false;
private boolean done = false;
private boolean failed = false;
private final Object waitObject = new Object();
public FileDownloadTask(NetworkFile file, ConnectionManager manager)
{
@ -32,7 +37,7 @@ public class FileDownloadTask implements Callable<File>
}
}
public FileDownloadTask(NetworkFile file, ConnectionManager manager, List<Integer> missingPieces)
public FileDownloadTask(NetworkFile file, ConnectionManager manager, SortedSet<Integer> missingPieces)
{
this.file = file;
this.manager = manager;
@ -45,11 +50,11 @@ public class FileDownloadTask implements Callable<File>
}
@Override
public File call() throws Exception
public void run()
{
System.out.println("Starting download of file " + file.getName());
while (!missingPieceIndices.isEmpty())
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
{
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces.");
@ -73,11 +78,14 @@ public class FileDownloadTask implements Callable<File>
}
}
// shuffle the connections list
Collections.shuffle(connections);
if (connections.isEmpty())
{
System.err.println("FileDownloadTask: No peers have the file, download failed!");
return null;
failed = true;
break;
}
// find a continuous run of pieces to download
@ -133,6 +141,63 @@ public class FileDownloadTask implements Callable<File>
}
System.out.println("FileDownloadTask: finished downloading " + file.getName() + "!");
done = true;
synchronized (waitObject)
{
waitObject.notifyAll();
}
}
public SortedSet<Integer> getMissingPieceIndices()
{
return missingPieceIndices;
}
@Override
public boolean cancel(boolean b)
{
cancelled = true;
return true;
}
@Override
public boolean isCancelled()
{
return cancelled;
}
@Override
public boolean isDone()
{
return done;
}
@Override
public File get() throws InterruptedException, ExecutionException
{
try
{
return get(0, TimeUnit.SECONDS);
} catch (TimeoutException e)
{
throw new RuntimeException(e);
}
}
@Override
public File get(long l, @NotNull TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException
{
if (failed)
return null;
if (cancelled)
return null;
if (!done)
{
synchronized (waitObject)
{
waitObject.wait(timeUnit.toMillis(l));
}
}
return file.getLocalFile();
}
}

View file

@ -5,8 +5,8 @@ import moe.nekojimi.friendcloud.objects.NetworkFile;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
@ -15,6 +15,9 @@ public class FileRemoteAccess
private final NetworkFile file;
private static final double PREEMPTIVE_DOWNLOAD_THRESHOLD = 5;
private boolean preemptiveDownloadInProgress = false;
// private Set<FutureTask<File>> activeTasks = new HashSet<>()
// private FilePieceAccess access;
public FileRemoteAccess(NetworkFile file)
@ -25,8 +28,8 @@ public class FileRemoteAccess
public synchronized byte[] read(long offset, long size) throws IOException
{
if (offset >= file.getSize())
return new byte[0];
List<Integer> missingPieces = new ArrayList<>();
return null;
SortedSet<Integer> neededPieces = new TreeSet<>();
long pieceSize = file.getPieceSize();
int startPieceIdx = Math.toIntExact(Math.floorDiv(offset, pieceSize));
long endOffset = (offset + size);
@ -36,33 +39,49 @@ public class FileRemoteAccess
for (int pieceIdx = startPieceIdx; pieceIdx <= endPieceIdx; pieceIdx++)
{
if (!file.hasPiece(pieceIdx))
missingPieces.add(pieceIdx);
neededPieces.add(pieceIdx);
}
System.out.println("FRA: offset=" + offset + ", endOffset=" + endOffset + ", startPieceIdx=" + startPieceIdx + ", endPieceIdx=" + endPieceIdx);
if (!missingPieces.isEmpty())
SortedSet<Integer> piecesToDownload = new TreeSet<>();
if (file.getDownloadPercentage() >= PREEMPTIVE_DOWNLOAD_THRESHOLD)
{
System.out.println("FRA: need to get missing pieces " + missingPieces);
FileDownloadTask downloadTask = new FileDownloadTask(file, Main.getInstance().getConnectionManager(), missingPieces);
FutureTask<File> futureTask = new FutureTask<>(downloadTask);
Main.getInstance().getExecutor().submit(futureTask);
if (!preemptiveDownloadInProgress)
{
System.out.println("FRA: downloaded " + file.getDownloadPercentage() + "% > " + PREEMPTIVE_DOWNLOAD_THRESHOLD + "% ! Threshold reached, starting preemptive download...");
for (int pieceIdx = 0; pieceIdx < file.getPieceCount(); pieceIdx++)
{
if (!file.hasPiece(pieceIdx))
piecesToDownload.add(pieceIdx);
}
}
preemptiveDownloadInProgress = true;
}
else
piecesToDownload = neededPieces;
try
if (!piecesToDownload.isEmpty())
{
System.out.println("FRA: will fetch pieces " + piecesToDownload);
DownloadManager.getInstance().downloadPieces(file, piecesToDownload)
.thenRun(() -> {
preemptiveDownloadInProgress = false;
});
// Main.getInstance().getExecutor().submit(futureTask);
}
if (!neededPieces.isEmpty())
{
boolean ok = waitForPieceRange(neededPieces, 10000);
if (!ok)
{
futureTask.get();
} catch (InterruptedException | ExecutionException e)
{
e.printStackTrace(System.err);
System.err.println("FRA: timed out while waiting for pieces " + neededPieces);
return null;
}
}
// if (file.getDownloadPercentage() >= PREEMPTIVE_DOWNLOAD_THRESHOLD)
// {
// FileDownloadTask preemptiveDownloadTask = new FileDownloadTask(file, Main.getInstance().getConnectionManager(), )
// }
File localFile = file.getLocalFile();
if (localFile == null)
{
@ -82,4 +101,15 @@ public class FileRemoteAccess
return ret;
}
private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs)
{
for (int pieceIdx : pieces)
{
boolean ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs);
if (!ok)
return false;
}
return true;
}
}

View file

@ -12,7 +12,6 @@ import ru.serce.jnrfuse.FuseStubFS;
import ru.serce.jnrfuse.struct.FileStat;
import ru.serce.jnrfuse.struct.FuseFileInfo;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -97,7 +96,7 @@ public class FUSEAccess extends FuseStubFS
System.err.println("FUSE: Failed to open file " + path + ": is a directory");
return -ErrorCodes.EISDIR();
}
long fileID = fsNode.getObjectID().getId();
long fileID = fsNode.getObjectID().toLong();
synchronized (fileLock)
{
int openCount = fileOpenCounts.getOrDefault(fileID, 0) + 1;
@ -148,12 +147,13 @@ public class FUSEAccess extends FuseStubFS
}
if (bytes == null)
{
System.err.println("FUSE: failed to read: buffer empty");
return -ErrorCodes.EIO();
// System.err.println("FUSE: failed to read: buffer empty");
return 0;
// return -ErrorCodes.EIO();
}
buf.put(0, bytes, 0, bytes.length);
return bytes.length;
// System.out.println("FUSE: Read " + bytes.length + " bytes.");
}
return Math.toIntExact(size);
}
}

View file

@ -81,7 +81,7 @@ public abstract class PeerConnection extends Thread
{
CommonMessages.MessageHeader.Builder headerBuilder = CommonMessages.MessageHeader.newBuilder()
.setMessageId(nextMessageId)
.setSenderId(Model.getInstance().getSelfNode().getObjectID().getId());
.setSenderId(Model.getInstance().getSelfNode().getObjectID().toLong());
if (inReplyTo != null)
headerBuilder.setReplyToMessageId(inReplyTo.getMessageId());
@ -218,7 +218,7 @@ public abstract class PeerConnection extends Thread
System.out.println("Replying to file piece request with piece " + index);
PieceMessages.FilePieceMessage filePieceMessage = PieceMessages.FilePieceMessage.newBuilder()
.setPieceIndex(index)
.setFileId(networkFile.getObjectID().getId())
.setFileId(networkFile.getObjectID().toLong())
.setData(ByteString.copyFrom(buffer))
.build();
sendMessage(wrapMessage(filePieceMessage, header));

View file

@ -4,13 +4,9 @@ import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import moe.nekojimi.friendcloud.FilePieceAccess;
import moe.nekojimi.friendcloud.objects.NetworkFile;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.protos.PieceMessages;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@ -39,7 +35,7 @@ public class FilePiecesRequest extends Request<PieceMessages.FilePiecesRequestMe
{
expectedPieceCount = Math.toIntExact(pieceCount / pieceMod);
return PieceMessages.FilePiecesRequestMessage.newBuilder()
.setFileId(file.getObjectID().getId())
.setFileId(file.getObjectID().toLong())
.setPieceCount(pieceCount)
.setPieceMod(pieceMod)
.setStartPieceIndex(startPiece)

View file

@ -13,6 +13,7 @@ import java.util.BitSet;
import java.util.HashMap;
import java.util.HexFormat;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class NetworkFile extends NetworkFSNode
{
@ -135,6 +136,11 @@ public class NetworkFile extends NetworkFSNode
return localFile;
}
/**
* Retreives the local file used for storing file data, or if it doesn't exist, creates it.
* @return a File object for the (potentially new) file storage.
* @throws IOException if the file could not be created due to an IO error.
*/
public File getOrCreateLocalFile() throws IOException
{
if (tempDirectory == null)
@ -208,7 +214,11 @@ public class NetworkFile extends NetworkFSNode
public void setHasPiece(int pieceIdx, boolean has)
{
pieces.set(pieceIdx, has);
synchronized (pieces)
{
pieces.set(pieceIdx, has);
pieces.notifyAll();
}
}
public double getDownloadPercentage()
@ -230,4 +240,50 @@ public class NetworkFile extends NetworkFSNode
{
return fileStates;
}
/**
* Waits for a certain piece of the file to become available, or for a timeout to expire.
* @param pieceIdx the index of the piece to wait for. Note: <=0 means "don't wait at all", not "wait forever".
* @param timeoutMs the amount of time, in milliseconds, to wait.
* @return true if the piece is available (haspiece(pieceIdx) will return true), false if the timeout was reached.
*/
public boolean waitForFilePiece(int pieceIdx, long timeoutMs)
{
synchronized (pieces)
{
if (hasPiece(pieceIdx))
return true;
while (timeoutMs > 0)
{
long startTime = System.currentTimeMillis();
try
{
System.out.println("NetworkFile: waiting "+ timeoutMs + "ms for piece " + pieceIdx + " of file " + name);
pieces.wait(timeoutMs);
if (hasPiece(pieceIdx))
{
System.out.println("NetworkFile: got piece we were waiting for.");
return true;
}
} catch (InterruptedException ignored)
{
}
long endTime = System.currentTimeMillis();
long timeWaited = (endTime - startTime);
timeoutMs -= timeWaited;
}
}
System.err.println("Timed out waiting for piece!");
return false;
}
public enum StorageType
{
/** The file will be stored as a complete file in the storage directory under it's own name and file path. */
COMPLETE,
/** The file will be stored as a complete file in a temporary directory. It will be deleted when the computer restarts. */
TEMPORARY_COMPLETE,
/** Each piece of the file will be stored permanently in an individual piece file, in the pieces subdirectory. */
PIECES,
}
}

View file

@ -20,7 +20,7 @@ public abstract class NetworkObject
public synchronized void updateFromStateMessage(ObjectStatements.ObjectState state)
{
if (state.getObjectId() != objectID.getId())
if (state.getObjectId() != objectID.toLong())
throw new IllegalArgumentException("Wrong object!");
}
@ -32,7 +32,7 @@ public abstract class NetworkObject
public ObjectStatements.ObjectState.Builder buildObjectState()
{
return ObjectStatements.ObjectState.newBuilder()
.setObjectId(objectID.getId());
.setObjectId(objectID.toLong());
}
public static class ObjectID
@ -55,7 +55,7 @@ public abstract class NetworkObject
this.uniqueID = uniqueID;
}
public long getId()
public long toLong()
{
long uniquePart = Integer.toUnsignedLong(uniqueID);
long systemPart = Integer.toUnsignedLong(systemID) << 32;
@ -81,7 +81,7 @@ public abstract class NetworkObject
@Override
public String toString()
{
return "OBJ{" + Long.toHexString(getId()) + "}";
return "OBJ{" + Long.toHexString(toLong()) + "}";
}
@Override

View file

@ -38,8 +38,8 @@ public class PeerFileState extends NetworkObject
public ObjectStatements.ObjectState.Builder buildObjectState()
{
return super.buildObjectState()
.putValues("peer", Long.toString(peer.getObjectID().getId()))
.putValues("file", Long.toString(file.getObjectID().getId()))
.putValues("peer", Long.toString(peer.getObjectID().toLong()))
.putValues("file", Long.toString(file.getObjectID().toLong()))
.putValues("progress", Double.toString(progress));
}