Concat + Partition doesn't appear to work when combining sinks

I am currently working on a PR to add SinkWithContext into akka-streams I came across a blocker when creating a helper function called SinkWithContext.fromDataAndContext. Below is the definition of the function

def fromDataAndContext[In, CtxIn, MatIn, MatCtx, Mat3](
    dataSink: Graph[SinkShape[In], MatIn],
    contextSink: Graph[SinkShape[CtxIn], MatCtx],
    combine: (MatIn, MatCtx) => Mat3,
    dataComesFirst: Boolean = true)(
    strategy: Int => Graph[UniformFanInShape[Either[In, CtxIn], Either[In, CtxIn]], NotUsed]): SinkWithContext[In, CtxIn, Mat3] = {
  SinkWithContext.fromTuples(Sink.fromGraph(GraphDSL.createGraph(dataSink, contextSink)(combine) {
    implicit b => (dSink, ctxSink) =>
      import GraphDSL.Implicits._

      val unzip = b.add(Unzip[In, CtxIn]())

      val c = b.add(strategy(2))
      val p = b.add(new Partition[Either[In, CtxIn]](outputPorts = 2, { in =>
        if (in.isLeft)
          0
        else
          1
      }, eagerCancel = true))

      if (dataComesFirst) {
        unzip.out0.map(Left.apply) ~> c.in(0)
        unzip.out1.map(Right.apply) ~> c.in(1)
      } else {
        unzip.out1.map(Right.apply) ~> c.in(0)
        unzip.out0.map(Left.apply) ~> c.in(1)
      }

      c.out ~> p.in

      p.out(0).map(_.asInstanceOf[Left[In, CtxIn]].value) ~> dSink.in
      p.out(1).map(_.asInstanceOf[Right[In, CtxIn]].value) ~> ctxSink.in

      SinkShape(unzip.in)
  }))

The general idea is this, given that you have a SourceWithContext/FlowWithContext and you want to pipe this to a SinkWithContext, you may want to create a SinkWithContext from 2 already existing Sink's.

An immediate example that comes to mind is that if you have a Kafka consumer with explicit offset comitting (i.e. Consumer.sourceWithOffsetContext(consumerSettings, subscriptions) and for each Kafka message you want to persist it to a file and if that’s successful you would then commit the cursor. Using SinkWithContext.fromDataAndContext this would be quite easy, i.e. you would do something like

val sourceWithContext = Consumer.sourceWithOffsetContext(consumerSettings, subscriptions)
val sinkWithContext = SinkWithContext.fromDataAndContext(FileIO.toPath(...), Committer.sink(committerSettings), Keep.none)(Concat(_))
sourceWithContext.runWith(sinkWithContext)

The purpose of Concat(_) (i.e. the strategy parameter) is so that the first Sink FileIO.toPath(...) runs before Committer.sink(committerSettings) and hence Committer.sink(committerSettings) will only execute if FileIO.toPath(...) is successful.

The problem I am getting is that for some reason the above implementation doesn’t work, i.e. if I create a simple test

val dataSink = Sink.collection[Int, List[Int]]
val contextSink = Sink.collection[Int, List[Int]]

val sink = SinkWithContext.fromDataAndContext(dataSink, contextSink, (left: Future[List[Int]], right: Future[List[Int]]) => {
  implicit val ec = akka.dispatch.ExecutionContexts.parasitic

  for {
    l <- left
    r <- right
  } yield left zip right
  }
)(Concat(_))

val source = SourceWithContext.fromTuples(Source(List(
  (1,2),
  (3,4),
  (5,6)
)))

source.runWith(sink).futureValue shouldEqual List(
  (1,2),
  (3,4),
  (5,6)
)

The materialized stream from SinkWithContext.fromDataAndContext function never finishes (the test actually times out because the Future from source.runWith(sink) never completes). Interestingly if I remove the entire strategy portion from SinkWithContext.fromDataAndContext (i.e. the Concat(_)) then the stream completes as expected however the problem then is that its not possible to control the ordering/strategy of the dataSink and the contextSink (while in this specific Kafka example you may want to execute the contextSink before the dataSink, there can be situations where you want to do the opposite or use a strategy like Merge if its fine for dataSink/contextSink to execute concurrently at the same time).

I have tried using different parameters for eagerCancel/detachedInputs for Concat/Partition and it doesn’t have an effect. From doing some initial debugging/print statements what appears to happen is the data part of the first element of the source (i.e. 1 in the test) ends up going through correctly all the way up to the data sink, but then the context part of the first element (i.e. 2) never gets propagated through.

Note that in the above example SinkWithContext doesn’t actually exist yet, instead you can modify the example to just deal with tuple + Sink. Alternately you can check the in progress PR here that contains the work for SinkWithContext which you can just checkout and run the test.