Failing in one branch of the akka stream did not failed execution of another branch

I have quite simple Akka Graph with one source and 2 sinks.
What I expect if sink in one branch fails it should stop executing second branch, but it’s not what I see in reality. In reality second branch is continuing to consume rest of the tuples. Do I miss anything?

     Sink<Integer, CompletionStage<Done>> topHeadSink = Sink.foreach(o ->  {
      System.out.println("Upper sink " + o);
      throw new RuntimeException("UPS");
    });
    Sink<Integer, CompletionStage<Done>> bottomHeadSink = Sink.foreach( o -> System.out.println("Bottom sink " + o));

    final RunnableGraph<Pair<CompletionStage<Done>, CompletionStage<Done>>> g =
        RunnableGraph.<Pair<CompletionStage<Done>, CompletionStage<Done>>>fromGraph(
            GraphDSL.create(
                topHeadSink, // import this sink into the graph
                bottomHeadSink, // and this as well
                Keep.both(),
                (b, top, bottom) -> {
                  final UniformFanOutShape<Integer, Integer> bcast = b.add(Broadcast.create(2));

                  b.from(b.add(Source.from(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))))
                      .viaFanOut(bcast)
                      .to(top);
                  b.from(bcast).toInlet(bottom.in());
                  return ClosedShape.getInstance();
                }));

    g.run(_runner.actorSystem());

Result of executing this graph is

Upper sink 1
Bottom sink 1
Bottom sink 2
Bottom sink 3
Bottom sink 4
Bottom sink 5
Bottom sink 6
Bottom sink 7
Bottom sink 8
Bottom sink 9
Bottom sink 10

cancels depends on the eagerCancel flag. If it is true, when any downstream cancels, if false, when all downstreams cancel.

You can create a broadcast with an eagerCancel flag, if you set that to true, the BC will stop if any output stops.

1 Like