Cyclic Graph Never Terminating

I’m running into an issue where a Cyclic Graph seems never to be terminating. The docs here seem to be close to what I’m looking for, but don’t quite get to what I’m looking for.

In a nutshell, I have a simple cycle where each time through the cycle the value is decremented by 1. When the value reaches 0, it is ejected from the cycle. The execution appears to complete (all inputs get decremented to zero and ejected) but the program never terminates.

// Using Akka v 2.5.11
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.*;
import akka.stream.javadsl.*;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;

public class Experiment {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("QuickStart");

        ActorMaterializerSettings matSettings  = ActorMaterializerSettings.create(system).withDebugLogging(true);

        ActorMaterializer mat = ActorMaterializer.create(matSettings, system);

        Flow<Integer, Integer, NotUsed> printFlow  = Flow.of(Integer.class).map( s -> {
            System.out.println("Processing: " + s);
            return s;
        });

        Flow<Integer, Integer, NotUsed> completePrintFlow  = Flow.of(Integer.class).map( (Integer s) -> {
            System.out.println("Completed: " + s);
            return s;
        });

        Sink<Object, CompletionStage<Done>> ignore = Sink.ignore();
        List<Integer> numbers = Arrays.asList(3, 3);

        RunnableGraph<CompletionStage<Done>> mainGraph = RunnableGraph.fromGraph(GraphDSL.create(ignore, (b, out) -> {
            SourceShape<Integer> s = b.add(Source.from(numbers));
            FlowShape<Integer, Integer> printer = b.add(printFlow);
            FlowShape<Integer, Integer> completePrinter = b.add(completePrintFlow);
            UniformFanOutShape<Integer, Integer> partition = b.add(Partition.create(2, (Integer x) -> (x == 0) ? 0 : 1 ));
            FlowShape<Integer, Integer> decrement = b.add(Flow.of(Integer.class).map( (x) -> x - 1));

            // MergedPreferredShape seems only to be in scaladsl
            UniformFanInShape<Integer, Integer> mergePreferred = b.add(MergePreferred.create(1));

            // s ~> mergedPreferred.in(0)                           ~> partition ~> completePrinter ~> out
            //      mergedPreferred.preferred <~ decFlow <~ printer <~ partition

            b.from(s).toInlet(mergePreferred.in(0));
            b.from(mergePreferred.out()).viaFanOut(partition);
            b.from(partition.out(0)).via(completePrinter).to(out);
            b.to(mergePreferred.in(1)).via(decrement).via(printer).fromFanOut(partition);

            return ClosedShape.getInstance();
        }));
        mainGraph.run(mat).whenComplete((out, error) -> {
            system.terminate();
        });
    }
}

Using the opposite priority (mergePreferred the source has higher priority) – doesn’t pull second entity from source.
A regular merge doesn’t seem to work – doesn’t pull the second entity from source.
Nor does using eagerCancel and eagerComplete on partition and mergePreferred – pulls the second from the source, but doesn’t process any more elements.

Note: this is a java port of an experiment written in kotlin, so there are a few minor changes but the code seems to execute the same (main difference is the type of mergedPreferred).

Is there a better way to do this? Or some way to determine why this isn’t terminating the system when complete?

I did not look closer on your cyclic graph, but if you run it in a test you can use akka.stream.testkit.Utils#assertAllStagesStopped from the akka-stream-testkit to get a graphviz dump showing what stage is waiting for demand etc. when the stream does not complete before a timeout.

The merge prefered will pull both inputs, and use some fairness. From the code:

 /*
     * This determines the unfairness of the merge:
     * - at 1 the preferred will grab 40% of the bandwidth against three equally fast secondaries
     * - at 2 the preferred will grab almost all bandwidth against three equally fast secondaries
     * (measured with eventLimit=1 in the GraphInterpreter, so may not be accurate)
     */

And bcs of the partition implementation which will only give out elements if the chosen output has demand, your flowback will deadlock your cycle.

So try to avoid cycles, if you are not familiar with the used elements :smile: (If you have enought time its really fun to get familiar with some of the build in stages implementations.) You can patch this usecase with a huge buffer. (Not so good idea, it can still deadlock…) Or you could write a better merger (which is more suitable to your problem). But the best thing you could do is fight against cycles :smiley:

P.S.: We have an Akka Streams tag too.