|
|
You can use a pair of SynchronousQueues to build a coroutine-style framework. Untested code:
public interface CoroutineInvoker<T, U> {
U invoke(T) throws InterruptedException;
}
public interface Coroutine<T, U> {
public void run(CoroutineInvoker<T, U> invoker);
}
public class CoroutinePair<T, U> implements Runnable {
private final BlockingQueue<T> tq = new SynchronousQueue<T>();
private final BlockingQueue<U> uq = new SynchronousQueue<U>();
private final ExecutorService exec = Executors.newFixedThreadPool(2);
public CoroutinePair(Coroutine<T, U> first, Coroutine<U, T> second) {
this.first = first;
this.second = second;
}
public void run() {
exec.execute( new Runnable() {
public void run() {
first.run(new InvokerImpl<T, U>(false, tq, uq));
}
});
exec.execute(new Runnable() {
public void run() {
second.run(new InvokerImpl<U, T>(true, uq, tq));
}
});
// doesn't block, doesn't deal with termination or pool shutdown
}
private static class InvokerImpl<X, Y> implements CoroutineInvoker<X, Y> {
private boolean ignoreArgument;
private final BlockingQueue<X> xq;
private final BlockingQueue<Y> yq;
InvokerImpl(boolean ignoreFirstArgument, BlockingQueue<X> xq, BlockingQueue<Y> yq) {
ignoreArgument = ignoreFirstArgument;
this.xq = xq;
this.yq = yq;
}
public Y invoke(X x) throws InterruptedException {
if (ignoreArgument) {
ignoreArgument = false;
} else {
xq.put(x);
}
return yq.take();
}
}
}
I have neither toy nor practical applications of this. Feel free to tack some on here.