You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
208 lines
5.4 KiB
208 lines
5.4 KiB
/*
|
|
* Copyright (C) 2023 jimj316
|
|
*
|
|
* This program is free software: you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation, either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
package moe.nekojimi.chords;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.LinkedList;
|
|
import java.util.List;
|
|
import java.util.Queue;
|
|
import java.util.function.Consumer;
|
|
|
|
/**
|
|
*
|
|
* @author jimj316
|
|
*/
|
|
public abstract class QueueThing<I, O> implements Consumer<I>
|
|
{
|
|
// TODO: better name
|
|
protected final Queue<I> inputQueue;
|
|
protected final List<QueueThing<?, I>> sources = new ArrayList<>();
|
|
protected final List<QueueThing<O, ?>> sinks = new ArrayList<>();
|
|
protected final Queue<Promise<I, O>> pendingPromises = new LinkedList<>();
|
|
|
|
protected int queueTargetSize = 0;
|
|
|
|
protected QueueThing(Queue<I> inputQueue)
|
|
{
|
|
this.inputQueue = inputQueue;
|
|
}
|
|
|
|
@Override
|
|
public void accept(I t)
|
|
{
|
|
|
|
// if we've got pending promises, fullfill them, otherwise just put it in the queue and notify the sinks
|
|
if (pendingPromises.isEmpty())
|
|
{
|
|
inputQueue.add(t);
|
|
notifyNewInput();
|
|
} else
|
|
{
|
|
Promise<I, O> promise = pendingPromises.poll();
|
|
promise.setInput(t);
|
|
handlePromise(promise);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Called to notify downstream modules that there's input upstream -
|
|
* consider requesting it to pull it through
|
|
*/
|
|
protected void notifyNewInput()
|
|
{
|
|
if (inputQueue.size() < queueTargetSize)
|
|
demandInput(queueTargetSize - inputQueue.size());
|
|
|
|
for (QueueThing<O, ?> sink : sinks)
|
|
{
|
|
sink.notifyNewInput();
|
|
}
|
|
}
|
|
|
|
public List<Promise<I, O>> request(int count, Consumer<O> destination)
|
|
{
|
|
List<Promise<I, O>> ret = new ArrayList<>();
|
|
int demands = 0;
|
|
for (int i = 0; i < count; i++)
|
|
{
|
|
I input = null;
|
|
if (!inputQueue.isEmpty())
|
|
{
|
|
input = inputQueue.poll();
|
|
}
|
|
|
|
Promise<I, O> promise = new Promise<>(input, destination);
|
|
|
|
boolean ok = handlePromise(promise);
|
|
|
|
if (ok)
|
|
{
|
|
// we got a promise of output so we can tell the client
|
|
ret.add(promise);
|
|
} else
|
|
{
|
|
// we need to get more input from sources
|
|
demands++;
|
|
}
|
|
}
|
|
|
|
demands += queueTargetSize - inputQueue.size();
|
|
|
|
if (demands > 0)
|
|
{
|
|
// try to get more input from sources
|
|
int sourcePromises = demandInput(demands);
|
|
// each promise of input we get represents a promise of output we can give
|
|
for (int i = 0; i < sourcePromises; i++)
|
|
{
|
|
Promise<I, O> myPromise = new Promise<>(destination);
|
|
pendingPromises.add(myPromise);
|
|
ret.add(myPromise);
|
|
}
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
private boolean handlePromise(Promise<I, O> promise)
|
|
{
|
|
boolean ok;
|
|
ok = completePromise(promise);
|
|
return ok;
|
|
}
|
|
|
|
protected abstract boolean completePromise(Promise<I, O> request);
|
|
|
|
/**
|
|
* Requests from sources a certain about of input, to be provided later.
|
|
*
|
|
* @param count the number of input items to request.
|
|
* @return a number Promises of input items. May be more or less than count.
|
|
*/
|
|
protected int demandInput(int count)
|
|
{
|
|
int ret = 0;
|
|
for (QueueThing<?, I> source : sources)
|
|
{
|
|
List<Promise<?, I>> promises = (List<Promise<?, I>>) source.request(count - ret, this);
|
|
ret += promises.size();
|
|
if (ret >= count)
|
|
break;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
public void addSource(QueueThing<?, I> source)
|
|
{
|
|
sources.add(source);
|
|
source.addSink(this);
|
|
}
|
|
|
|
public void removeSource(QueueThing<?, I> source)
|
|
{
|
|
sources.remove(source);
|
|
source.removeSink(this);
|
|
}
|
|
|
|
private void addSink(QueueThing<O, ?> sink)
|
|
{
|
|
sinks.add(sink);
|
|
}
|
|
|
|
private void removeSink(QueueThing<O, ?> sink)
|
|
{
|
|
sinks.remove(sink);
|
|
}
|
|
|
|
public static class Promise<I, O>
|
|
{
|
|
|
|
private I input;
|
|
private final Consumer<O> output;
|
|
|
|
public Promise(I input, Consumer<O> output)
|
|
{
|
|
this.input = input;
|
|
this.output = output;
|
|
}
|
|
|
|
public Promise(Consumer<O> output)
|
|
{
|
|
this.output = output;
|
|
}
|
|
|
|
public void setInput(I input)
|
|
{
|
|
this.input = input;
|
|
}
|
|
|
|
public void complete(O out)
|
|
{
|
|
output.accept(out);
|
|
}
|
|
|
|
public I getInput()
|
|
{
|
|
return input;
|
|
}
|
|
|
|
}
|
|
// protected void submit(O output)
|
|
// {
|
|
// if (nextStage != null)
|
|
// nextStage.accept(output);
|
|
// }
|
|
}
|
|
|