Various other changes idk

This commit is contained in:
Nekojimi 2025-10-02 00:13:49 +01:00
parent fee20875a1
commit 480b6429f9
10 changed files with 179 additions and 69 deletions

View file

@ -154,6 +154,7 @@ public class Controller
public synchronized void addChangeRecord(ObjectChangeRecord record)
{
System.out.println("Controller: Adding change record " + record );
DataStore.DAO<ObjectChangeRecord> dao = dataStore.getDAOForClass(ObjectChangeRecord.class);
dao.update(record);
// update the change heads; if any of this change's heads are included in ours, then this change replaces them
@ -165,13 +166,13 @@ public class Controller
for (long changeHeadID : record.getChangeHeads())
{
ObjectChangeRecord headRecord = dao.get(changeHeadID);
recordIsNewHead = changeHeads.remove(headRecord);
recordIsNewHead |= changeHeads.remove(headRecord);
}
}
if (recordIsNewHead)
{
changeHeads.add(record);
System.out.println("Controller: Change heads updated: " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
System.out.println("Controller: Change heads updated: " + changeHeads);
}
}
}
@ -204,7 +205,7 @@ public class Controller
System.out.println("Controller: Applying change record " + record);
long changeID = localData.getCurrentChangeRecord().getChangeID();
if (!record.getChangeHeads().contains(changeID))
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads().stream().map(Long::toHexString).collect(Collectors.toSet()) + ", we are in state " + Long.toHexString(changeID));
throw new IllegalStateException("Change does not apply! Valid change heads=" + record.getChangeHeads() + ", we are in state " + localData.getCurrentChangeRecord());
addChangeRecord(record);
record.applyToLocalState();
@ -256,7 +257,7 @@ public class Controller
changeHeads = new HashSet<>(dataStore.getDAOForClass(ObjectChangeRecord.class).getAll());
Set<Long> referencedIDs = changeHeads.stream().flatMap(objectChangeRecord -> objectChangeRecord.getChangeHeads().stream()).collect(Collectors.toSet());
changeHeads.removeIf(objectChangeRecord -> referencedIDs.contains(objectChangeRecord.getChangeID()));
System.out.println("Controller: Determined change heads to be " + changeHeads.stream().map(objectChangeRecord -> Long.toHexString(objectChangeRecord.getChangeID())).collect(Collectors.toSet()));
System.out.println("Controller: Determined change heads to be " + changeHeads);
}
return changeHeads;
}
@ -335,7 +336,7 @@ public class Controller
String path = newpath.substring(0, lastSlash);
if (path.isEmpty())
path = "/";
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID()))
{
transaction.addObjectBeforeChange(fsNode);
if (!name.equals(fsNode.getName()))
@ -372,7 +373,7 @@ public class Controller
if (path.equals("/") || path.isEmpty())
return null;
NetworkFolder ret = null;
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), getLocalPeer().getObjectID()))
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(getLocalPeer().getObjectID()))
{
NetworkFSNode node = getFSNode(path);
if (node != null && !(node instanceof NetworkFolder))

View file

@ -5,10 +5,9 @@ import moe.nekojimi.friendcloud.objects.NetworkFile;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class FileRemoteAccess
{
@ -61,12 +60,14 @@ public class FileRemoteAccess
else
piecesToDownload = neededPieces;
CompletableFuture<Void> downloadFuture = null;
if (!piecesToDownload.isEmpty())
{
System.out.println("FRA: will fetch pieces " + piecesToDownload);
DownloadManager.getInstance().downloadPieces(file, piecesToDownload)
.thenRun(() -> {
downloadFuture = DownloadManager.getInstance().downloadPieces(file, piecesToDownload)
.thenRun(() ->
{
preemptiveDownloadInProgress = false;
});
// Main.getInstance().getExecutor().submit(futureTask);
@ -74,7 +75,7 @@ public class FileRemoteAccess
if (!neededPieces.isEmpty())
{
boolean ok = waitForPieceRange(neededPieces, 1000);
boolean ok = waitForPieceRange(neededPieces, 30_000, downloadFuture);
if (!ok)
{
System.err.println("FRA: timed out while waiting for pieces " + neededPieces);
@ -101,14 +102,35 @@ public class FileRemoteAccess
return ret;
}
private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs)
private boolean waitForPieceRange(Set<Integer> pieces, long pieceTimeoutMs, CompletableFuture<Void> downloadFuture)
{
for (int pieceIdx : pieces)
{
boolean ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs);
boolean ok = false;
while (pieceTimeoutMs > 0 && !ok)
{
long waitStartTime = Instant.now().toEpochMilli();
try
{
ok = file.waitForFilePiece(pieceIdx, pieceTimeoutMs);
}
catch (InterruptedException ignored)
{
ok = false;
}
if (!ok)
{
if (downloadFuture != null && (downloadFuture.isCancelled() || downloadFuture.isCancelled()))
{
System.err.println("FRA: Download failed.");
return false;
}
}
long waitEndTime = Instant.now().toEpochMilli();
long waitTime = waitEndTime - waitStartTime;
pieceTimeoutMs -= waitTime;
}
}
return true;
}

View file

@ -55,13 +55,27 @@ public class ObjectChangeRecord implements Storable
try
{
MessageDigest digest = MessageDigest.getInstance("SHA-256");
return digest.digest(toString().getBytes(StandardCharsets.UTF_8));
return digest.digest(getCanonicalStringRepresentation().getBytes(StandardCharsets.UTF_8));
} catch (NoSuchAlgorithmException e)
{
throw new RuntimeException(e);
}
}
@Override
public boolean equals(Object o)
{
if (o == null || getClass() != o.getClass()) return false;
ObjectChangeRecord record = (ObjectChangeRecord) o;
return Objects.equals(getChangeID(), record.getChangeID());
}
@Override
public int hashCode()
{
return (int) getChangeID();
}
public ObjectStatements.ObjectChangeMessage.Builder buildObjectChangeMessage()
{
ObjectStatements.ObjectChangeMessage.Builder builder = ObjectStatements.ObjectChangeMessage.newBuilder();
@ -108,7 +122,7 @@ public class ObjectChangeRecord implements Storable
}
}
public String toString()
public String getCanonicalStringRepresentation()
{
StringBuilder sb = new StringBuilder();
sb.append(creatorPeer).append(",").append(creationTime.toEpochMilli()).append(";");
@ -119,11 +133,21 @@ public class ObjectChangeRecord implements Storable
sb.append(";");
for (Change change: changes.stream().sorted(Comparator.comparingLong(a -> a.objectID.toLong())).toList())
{
sb.append(change.toString()).append(";");
sb.append(change.getCanonicalStringRepresentation()).append(";");
}
return sb.toString();
}
@Override
public String toString()
{
return "Change{" + Long.toHexString(getChangeID()) +
", heads=" + changeHeads.stream().map(Long::toHexString).collect(Collectors.toSet()) +
", time=" + creationTime.toString() +
", creator=" + creatorPeer.toString() +
"}";
}
public long getChangeID()
{
MessageDigest digest = null;
@ -134,7 +158,7 @@ public class ObjectChangeRecord implements Storable
{
throw new RuntimeException(e);
}
String stringVal = toString();
String stringVal = getCanonicalStringRepresentation();
byte[] bytes = digest.digest(stringVal.getBytes(StandardCharsets.UTF_8));
// System.out.println("ObjectChangeRecord: calculated change ID " + Long.toHexString(ret) + " from string: " + stringVal);
return Util.xorBytesToLong(bytes);
@ -195,7 +219,7 @@ public class ObjectChangeRecord implements Storable
return null;
}
public String toString()
public String getCanonicalStringRepresentation()
{
StringBuilder sb = new StringBuilder();
sb.append(objectID).append(";"); // The object ID, then ;
@ -213,7 +237,6 @@ public class ObjectChangeRecord implements Storable
return sb.toString();
}
public ObjectStatements.ObjectChange.Builder buildObjectChange()
{
ObjectStatements.ObjectChange.Builder builder = ObjectStatements.ObjectChange.newBuilder();

View file

@ -1,6 +1,5 @@
package moe.nekojimi.friendcloud;
import moe.nekojimi.friendcloud.network.ConnectionManager;
import moe.nekojimi.friendcloud.objects.NetworkObject;
import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.protos.ObjectStatements;
@ -16,22 +15,19 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
public class ObjectChangeTransaction implements AutoCloseable
{
private final ObjectID creator;
private final ConnectionManager connectionManager;
private final Map<ObjectID, ObjectStatements.ObjectState> beforeStates = new HashMap<>();
private static ObjectChangeTransaction currentTransaction = null;
private int openCount = 0;
private boolean ended = false;
private ObjectChangeTransaction(ConnectionManager connectionManager, ObjectID creator)
private ObjectChangeTransaction(ObjectID creator)
{
this.creator = creator;
this.connectionManager = connectionManager;
System.out.println("ObjectChangeTransaction: opening transaction");
@ -51,10 +47,10 @@ public class ObjectChangeTransaction implements AutoCloseable
}
}
public static ObjectChangeTransaction startTransaction(ConnectionManager connectionManager, ObjectID creatorPeer)
public static ObjectChangeTransaction startTransaction(ObjectID creatorPeer)
{
if (currentTransaction == null)
currentTransaction = new ObjectChangeTransaction(connectionManager, creatorPeer);
currentTransaction = new ObjectChangeTransaction(creatorPeer);
currentTransaction.increaseOpenCount();
return currentTransaction;
}
@ -95,7 +91,8 @@ public class ObjectChangeTransaction implements AutoCloseable
if (changes.isEmpty())
return null;
return ObjectChangeRecord.createFromChanges(creator, Main.getInstance().getModel().getChangeHeads().stream().map(ObjectChangeRecord::getChangeID).collect(Collectors.toSet()), changes);
ObjectChangeRecord currentChangeRecord = Main.getInstance().getModel().getLocalData().getCurrentChangeRecord();
return ObjectChangeRecord.createFromChanges(creator, currentChangeRecord != null ? Set.of(currentChangeRecord.getChangeID()) : Set.of(), changes);
}
public void commit()

View file

@ -18,7 +18,7 @@ public class SharedFileManager
if (files.isEmpty())
return;
Controller controller = Main.getInstance().getModel();
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(Main.getInstance().getConnectionManager(), controller.getLocalPeer().getObjectID()))
try(ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(controller.getLocalPeer().getObjectID()))
{
List<NetworkObject> knownFiles = controller.listObjects(Set.of(ObjectStatements.ObjectType.OBJECT_TYPE_FILE));

View file

@ -13,11 +13,10 @@ import moe.nekojimi.friendcloud.objects.ObjectID;
import moe.nekojimi.friendcloud.objects.Peer;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.*;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@ -30,22 +29,57 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
List<URI> getURIs()
{
List<URI> ret = new ArrayList<>();
ret.add(makeURI(serverSocket.getInetAddress().getHostAddress(), serverSocket.getLocalPort()));
ret.add(makeURI(serverSocket.getInetAddress().getHostName(), serverSocket.getLocalPort()));
InetAddress inetAddress = serverSocket.getInetAddress();
addAddressToList(inetAddress, ret);
try
{
Enumeration<NetworkInterface> networkInterfaces = NetworkInterface.getNetworkInterfaces();
for (Iterator<NetworkInterface> it = networkInterfaces.asIterator(); it.hasNext(); )
{
NetworkInterface networkInterface = it.next();
if (networkInterface.isLoopback())
continue;
if (!networkInterface.isUp())
continue;
for (InterfaceAddress interfaceAddress : networkInterface.getInterfaceAddresses())
addAddressToList(interfaceAddress.getAddress(), ret);
}
} catch (SocketException e)
{
}
if (mappedPort != null)
{
ret.add(makeURI(mappedPort.getExternalAddress().getHostAddress(), mappedPort.getExternalPort()));
ret.add(makeURI(mappedPort.getExternalAddress().getCanonicalHostName(), mappedPort.getExternalPort()));
addAddressToList(mappedPort.getExternalAddress(), ret);
}
System.out.println("Local addresses: " + ret);
return ret;
}
/**
* utility method for processing URIs
*/
private void addAddressToList(InetAddress address, List<URI> list)
{
if (address.isAnyLocalAddress())
return;
String hostAddress = address.getHostAddress();
if (!hostAddress.isEmpty())
list.add(makeURI(hostAddress, serverSocket.getLocalPort()));
String hostName = address.getHostName();
if (!hostName.isEmpty() && !hostName.equals(hostAddress))
list.add(makeURI(hostName, serverSocket.getLocalPort()));
}
public TCPConnectionBackend(int port, ConnectionManager connectionManager) throws IOException
{
super("TCP Listen Thread", "tcp", connectionManager);
serverSocket = new ServerSocket(port);
// setupIGP(port);
if (!Main.getInstance().getArgs().isNoUpnp())
setupIGP(port);
}
private void setupIGP(int port)
@ -60,6 +94,9 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
// Discover port forwarding devices and take the first one found
System.out.println("Discovering port mappers...");
List<PortMapper> mappers = PortMapperFactory.discover(networkBus, processBus);
if (mappers.isEmpty())
return;
;
PortMapper mapper = mappers.getFirst();
System.out.println("Got mapper " + mapper + ", mapping port...");
@ -102,6 +139,7 @@ public class TCPConnectionBackend extends ConnectionBackend<TCPPeerConnection>
protected TCPPeerConnection getConnection() throws IOException
{
Socket socket = serverSocket.accept();
System.out.println("TCPConnectionBackend: Received connection from " + socket.getRemoteSocketAddress());
return new TCPPeerConnection(socket);
}

View file

@ -11,7 +11,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Set;
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, List<NetworkObject>>
public class ObjectListRequest extends Request<ObjectStatements.ObjectListRequest, ObjectListRequest.ObjectListRequestResult>
{
private final Set<ObjectStatements.ObjectType> types;
@ -39,7 +39,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
// System.out.println("Received ObjectList, objects=" + objectList.getStatesList());
List<NetworkObject> ret = new ArrayList<>();
ObjectListRequestResult ret = new ObjectListRequestResult();
ret.objects = new ArrayList<>();
ret.changeID = objectList.getChangeHead();
for (ObjectStatements.ObjectState objectState : objectList.getStatesList())
{
@ -47,7 +50,7 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
System.out.println("ObjectListRequest: Received state of object " + objectID);
NetworkObject object = NetworkObject.createByID(objectID);
object.updateFromStateMessage(objectState);
ret.add(object);
ret.objects.add(object);
}
future.complete(ret);
@ -63,4 +66,10 @@ public class ObjectListRequest extends Request<ObjectStatements.ObjectListReques
return false;
}
public static class ObjectListRequestResult
{
public List<NetworkObject> objects;
public long changeID;
}
}

View file

@ -20,7 +20,7 @@ public class FileDownloadTask implements RunnableFuture<File>
private final NetworkFile file;
private final ConnectionManager connectionManager;
private final long timeoutPerPieceMs = 1_000;
private final long timeoutPerPieceMs = 30_000;
private static final int MAX_DOWNLOAD_PIECES_PER_ROUND = 128;
private final SortedSet<Integer> missingPieceIndices = new TreeSet<>();
@ -65,23 +65,28 @@ public class FileDownloadTask implements RunnableFuture<File>
Peer selfPeer = Main.getInstance().getModel().getLocalPeer();
while (!missingPieceIndices.isEmpty() && !cancelled && !failed && !done)
{
System.out.println("Need to get " + missingPieceIndices.size() + " missing pieces.");
System.out.println("FileDownloadTask: Need to get " + missingPieceIndices.size() + " missing pieces.");
// Map<Peer, PeerFileState> fileStates = file.getFileStates();
// determine what nodes we can connect to
List<PeerConnection> connections = new ArrayList<>();
for (Peer peer : file.getPeersWithCopy())
{
if (peer == selfPeer)
continue; // yeah that's us dipshit
PeerConnection connection = connectionManager.getNodeConnection(peer);
if (connection != null)
{
System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
connections.add(connection);
}
}
List<PeerConnection> connections = new ArrayList<>(file.getPeersWithCopy().stream()
.filter(peer -> peer != selfPeer)
.map(connectionManager::getNodeConnectionAsync)
.map(CompletableFuture::join)
.flatMap(Optional::stream)
.toList());
// for (Peer peer : file.getPeersWithCopy())
// {
// if (peer == selfPeer)
// continue; // yeah that's us dipshit
// PeerConnection connection = connectionManager.getNodeConnection(peer);
// if (connection != null)
// {
// System.out.println("FileDownloadTask: Will download from " + peer.getNodeName());
// connections.add(connection);
// }
// }
// connectionLine = "Connected to " + connections.size() + " peers.";
// notification.setBody(connectionLine + "\n" + progressLine);
@ -90,6 +95,7 @@ public class FileDownloadTask implements RunnableFuture<File>
// shuffle the connections list
Collections.shuffle(connections);
if (connections.isEmpty())
{
System.err.println("FileDownloadTask: No peers have the file, download failed!");
@ -98,6 +104,8 @@ public class FileDownloadTask implements RunnableFuture<File>
break;
}
System.out.println("FileDownloadTask: Will download from " + connections.size() + " peers: " + connections.stream().map(PeerConnection::getUri).toList());
// find a continuous run of pieces to download
// TODO: allow for runs with regular gaps (e.g. every 2) to account for previous failed download attempts
int runStart = -1;
@ -156,7 +164,7 @@ public class FileDownloadTask implements RunnableFuture<File>
if (file.getDownloadPercentage() >= 100.0)
{
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, selfPeer.getObjectID()))
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(selfPeer.getObjectID()))
{
transaction.addObjectBeforeChange(file);
file.addPeerWithCopy(selfPeer);

View file

@ -53,7 +53,7 @@ public class JoinNetworkTask implements Runnable
throw new RuntimeException(e);
}
// create our local peer object
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(connectionManager, peerID))
try (ObjectChangeTransaction transaction = ObjectChangeTransaction.startTransaction(peerID))
{
// create and submit our Peer object if it doesn't exist
Peer selfPeer = controller.getLocalData().getLocalPeer();

View file

@ -1,6 +1,7 @@
package moe.nekojimi.friendcloud.tasks;
import moe.nekojimi.friendcloud.Main;
import moe.nekojimi.friendcloud.ObjectChangeRecord;
import moe.nekojimi.friendcloud.Util;
import moe.nekojimi.friendcloud.network.PeerConnection;
import moe.nekojimi.friendcloud.network.requests.ObjectChangeRequest;
@ -31,6 +32,8 @@ public class PullStateTask implements Runnable
System.out.println("PullStateTask: Pulling state from peers...");
Set<PeerConnection> connections = new HashSet<>();
// TODO: pull changes first, then select the peer(s) with the latest changes to pull state from
for (String knownPeerAddress : Main.getInstance().getArgs().getKnownPeers())
{
String[] split = knownPeerAddress.split(":");
@ -50,7 +53,7 @@ public class PullStateTask implements Runnable
{
connections.add(nodeConnection);
}
} catch (URISyntaxException | IOException e)
} catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
@ -80,24 +83,33 @@ public class PullStateTask implements Runnable
for (PeerConnection connection : connections)
{
futures.add(connection.makeRequest(new ObjectListRequest(Set.of(
CompletableFuture<Void> changeFuture = connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
{
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
});
CompletableFuture<Long> objectFuture = connection.makeRequest(new ObjectListRequest(Set.of(
ObjectStatements.ObjectType.OBJECT_TYPE_FILE,
ObjectStatements.ObjectType.OBJECT_TYPE_FOLDER,
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenAccept(networkObjects ->
ObjectStatements.ObjectType.OBJECT_TYPE_PEER))).thenApply(objectListRequestResult ->
{
List<NetworkObject> networkObjects = objectListRequestResult.objects;
System.out.println("PullStateTask: got state of " + networkObjects.size() + " objects.");
for (NetworkObject object : networkObjects)
{
Main.getInstance().getModel().addNetworkObject(object);
}
}));
return objectListRequestResult.changeID;
});
futures.add(connection.makeRequest(new ObjectChangeRequest(Set.of())).thenAccept(objectChangeRecords ->
CompletableFuture<Void> bothFuture = changeFuture.thenAcceptBoth(objectFuture, (unused, changeID) ->
{
System.out.println("PullStateTask: got " + objectChangeRecords.size() + " change records.");
Main.getInstance().getModel().addChangeRecords(objectChangeRecords);
}));
ObjectChangeRecord record = Main.getInstance().getModel().getChangeRecord(changeID);
System.out.println("PullStateTask: updating current change ID to " + record);
Main.getInstance().getModel().setCurrentChangeRecord(record);
});
futures.add(bothFuture);
}
Util.collectFutures(futures).join();