MergeHub not completing stream

We are having a problem with using distributed streams using a MergeHub. We are trying to create a stream that has a single Source and Sink running locally, and configure multiple remote streams which consume and write from/to the local Source and Sink. We are using a PartitionHub to distribute the call from the Souce and a MergeHub to collate the results back to the Sink. The problem we are seeing is that in the third segment of our stream (with the sink) we are not seeing the stream complete. In other words, we are not seeing the onUpstreamFinished() method get called within our custom Sink stage. This is causing problems because we need to close resources and report back that our stream is done. I do see the postStop() method called when the process is shut down, but not when the stream is completed. I assume this is due to the actor shutting down when the process shuts down.

I have included a sample test case which demonstrates the issue.

Rick

import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.*;
import akka.stream.*;
import akka.stream.javadsl.*;
import akka.stream.stage.*;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

public class MergeHubRunner extends AbstractBehavior<MergeHubRunner.Command>
{
  public static Behavior<Command> create()
  {
    return Behaviors.setup(MergeHubRunner::new);
  }

  public MergeHubRunner(ActorContext<Command> context)
  {
    super(context);
  }

  @Override
  public Receive<Command> createReceive()
  {
    return newReceiveBuilder().build();
  }

  interface Command extends Serializable {}

  public static void main(String[] args) throws InterruptedException
  {
    Config config = ConfigFactory.load("simple.conf");
    ActorSystem<MergeHubRunner.Command> system = ActorSystem.create(MergeHubRunner.create(), "MergeHubRunner", config);

    // source
    System.out.println("Creating source");
    Source<Integer, NotUsed> source = Source.range(100, 110)
      .toMat(PartitionHub.of(
          Integer.class, (size, elem) -> Math.abs(elem.hashCode() % size), 1, 256),
          Keep.right())
        .run(system);
    // sink
    System.out.println("Creating sink");
    Sink<Integer, NotUsed> sink = MergeHub.of(Integer.class)
        .to(MySinkStage.create(i -> i = i + 1000, callback -> System.out.println("MYSINK: DONE"))).run(system);

    // source/sink refs
    System.out.println("Creating sourceRef and sinkRef");
    SourceRef<Integer> sourceRef = source.runWith(StreamRefs.sourceRef(), system);

    //Source<Integer, NotUsed> source2 = sourceRef.getSource().map(i -> {System.out.println("->" + i); return i;});
    //SourceRef<Integer> sourceRef2 = source2.runWith(StreamRefs.sourceRef(), system);
    //sourceRef2.getSource().to(sink).run(system);

    SinkRef<Integer> sinkRef = StreamRefs.<Integer>sinkRef().to(sink).run(system);
    // create remote flow
    System.out.println("Creating remote flow");
    RunnableGraph<NotUsed> flow = sourceRef.getSource().map(i -> i + 1).to(sinkRef.getSink());
    flow.run(system);

    System.out.println("Running...");
    Thread.sleep(5000);
    System.out.println("Shutting down");

    system.terminate();
  }

  static class MySinkStage extends GraphStage<SinkShape<Integer>>
  {
    private static final String SINK_IN = "Sink.in";

    private final Consumer<Integer> consumer;
    private final Callback callback;

    private final Inlet<Integer> in = Inlet.create(SINK_IN);
    private final SinkShape<Integer> shape = SinkShape.of(in);

    public static MySinkStage create(Consumer<Integer> consumer, Callback callback)
    {
      return new MySinkStage(consumer, callback);
    }

    private MySinkStage(Consumer<Integer> consumer, Callback callback)
    {
      this.consumer = consumer;
      this.callback = callback;
    }

    @Override
    public GraphStageLogic createLogic(Attributes inheritedAttributes) throws Exception, Exception
    {
      return new GraphStageLogic(shape)
      {
        private final AtomicInteger count = new AtomicInteger(0);

        @Override
        public int inCount()
        {
          return count.get();
        }

        @Override
        public void preStart() throws Exception
        {
          pull(in);
        }

        @Override
        public void postStop() throws Exception
        {
          System.out.println("MYSINK: postStop()");
          //new Exception().printStackTrace();
          if (callback != null)
          {
            callback.callback(new Results(count.get()));
          }
          super.postStop();
        }

        // constructor
        {
          setHandler(in, new AbstractInHandler()
          {
            @Override
            public void onPush()
            {
              Integer data = grab(in);
              count.incrementAndGet();
              consumer.accept(data);
              pull(in);
            }

            @Override
            public void onUpstreamFinish() throws Exception
            {
              System.out.println("MYSINK: Closing");
              if (callback != null)
              {
                callback.callback(new Results(count.get()));
              }
              super.onUpstreamFinish();
            }

            @Override
            public void onUpstreamFailure(Throwable ex) throws Exception
            {
              ex.printStackTrace();
              super.onUpstreamFailure(ex);
            }
          });
        }
      };
    }

    public SinkShape<Integer> shape()
    {
      return shape;
    }

    public static class Results
    {
      private final int count;

      public Results(int count)
      {
        this.count = count;
      }

      public int getCount()
      {
        return count;
      }
    };

    public interface Callback extends Serializable
    {
      void callback(Results results);
    }
  }
}

The mergehub does not complete once an upstream completes, since it is meant for materializing with multiple incoming streams over time, and survive periods of time when there are no running upstreams, the only way to stop it is to cancel the source, in your case that would be your custom graph stage or something like a .takeWhile or a killswitch between the mergehub source and your custom sink.

Thanks. This is what I am doing now. I appreciate your feedback.

Rick