/* * Copyright (C) 2023 jimj316 * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ package moe.nekojimi.chords; import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.function.Consumer; /** * * @author jimj316 */ public abstract class QueueThing implements Consumer { // TODO: better name protected final Queue inputQueue; protected final List> sources = new ArrayList<>(); protected final List> sinks = new ArrayList<>(); protected final Queue> pendingPromises = new LinkedList<>(); protected int queueTargetSize = 0; protected QueueThing(Queue inputQueue) { this.inputQueue = inputQueue; } @Override public void accept(I t) { // FIXME: this causes queue-jumping // if we've got pending promises, fullfill them, otherwise just put it in the queue and notify the sinks if (pendingPromises.isEmpty()) { inputQueue.add(t); notifyNewInput(); } else { Promise promise = pendingPromises.poll(); System.out.println(this.getClass().getSimpleName() + " now has " + pendingPromises + " promises."); promise.setInput(t); handlePromise(promise); } } /** * Called to notify downstream modules that there's input upstream - * consider requesting it to pull it through */ protected void notifyNewInput() { if (inputQueue.size() < queueTargetSize) demandInput(queueTargetSize - inputQueue.size()); for (QueueThing sink : sinks) { sink.notifyNewInput(); } } public List> request(int count, Consumer destination) { List> ret = new ArrayList<>(); int demandsForClient = 0; for (int i = 0; i < count; i++) { I input = null; if (!inputQueue.isEmpty()) { input = inputQueue.poll(); } Promise promise = new Promise<>(input, destination); boolean ok = handlePromise(promise); if (ok) { // we got a promise of output so we can tell the client ret.add(promise); } else { // we need to get more input from sources demandsForClient++; } } int demandsForMe = queueTargetSize - inputQueue.size(); int demandsTotal = demandsForMe + demandsForClient; if (demandsTotal > 0) { // try to get more input from sources int sourcePromises = demandInput(demandsForClient); // each promise of input we get represents a promise of output we can give for (int i = 0; i < sourcePromises && i < demandsForClient; i++) { Promise myPromise = new Promise<>(destination); pendingPromises.add(myPromise); System.out.println(this.getClass().getSimpleName() + " has made " + pendingPromises + " promises."); ret.add(myPromise); } } return ret; } private boolean handlePromise(Promise promise) { boolean ok; ok = completePromise(promise); return ok; } protected abstract boolean completePromise(Promise request); /** * Requests from sources a certain about of input, to be provided later. * * @param count the number of input items to request. * @return a number Promises of input items. May be more or less than count. */ protected int demandInput(int count) { int ret = 0; for (QueueThing source : sources) { List> promises = (List>) source.request(count - ret, this); ret += promises.size(); if (ret >= count) break; } return ret; } public void addSource(QueueThing source) { sources.add(source); source.addSink(this); } public void removeSource(QueueThing source) { sources.remove(source); source.removeSink(this); } private void addSink(QueueThing sink) { sinks.add(sink); } private void removeSink(QueueThing sink) { sinks.remove(sink); } public static class Promise { private I input; private final Consumer output; public Promise(I input, Consumer output) { this.input = input; this.output = output; } public Promise(Consumer output) { this.output = output; } public void setInput(I input) { this.input = input; } public void complete(O out) { output.accept(out); } public I getInput() { return input; } } // protected void submit(O output) // { // if (nextStage != null) // nextStage.accept(output); // } }