Losing reference to materialized value with Sink.lazyInit + SubFlow

I am currently in the process of using DrainingControl.apply in order to provide a convenient way to shut down an akka-stream that is committing Kafka cursors as described in Consumer • Alpakka Kafka Documentation i.e.

val committerSettings = CommitterSettings(system)

val control: DrainingControl[Done] =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record.key, msg.record.value)
        .map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
    .run()

The problem I am currently having is that I am using a SubFlow in combination with .to and in doing so I am losing the reference to the materialization value (in this case a Consumer.Control which is required as an argument to DrainingControl.apply).

What I have have is something like

val committerSettings = CommitterSettings(system)

val sublfow = consumerStream.splitAfter(...)
subflow.to(
  // Lose reference to materialized value i.e. Future[Consumer.Control] when using
  // Sink.lazyInit
  Sink.lazyInit({ startValue =>
    for {
      sink <- createSink(startValue)
    } yield sink
  }, () => ???)
)

def createSink(startValue: CommittableOffset): Sink[???, ???] = {
  Flow[???].toMat(Committer.sink(committerSettings))(Keep.right)
  // Need to use DrainingControl.apply here
}

As can be seen in the comment I lose reference to the Consumer.Control materialized value in the SubFlow which I need to be passed into createSink (since that is where I use DrainingControl.apply).

Ideally I would like to achieve something like this

val committerSettings = CommitterSettings(system)

val sublfow = consumerStream.splitAfter(...)
subflow.to(
  val control = ??? // Get current materialized value here
  // Sink.lazyInit
  Sink.lazyInit({ startValue =>
    for {
      sink <- createSink(startValue, control)
    } yield sink
  }, () => ???)
)

def createSink(startValue: CommittableOffset, control: Consumer.Control): Sink[???, ???] = {
  Flow[???].withMaterializedValue(control).toMat(Committer.sink(committerSettings))(DrainingControl.apply)
}

The problem is that the “outer” stream and the subflow streams is not actually part of the same materialization, the substreams are materialized as needed based on the elements of the outer stream.

I think the easiest way to propagate the materialized value from one materialization to another stream is via side effect, capture it in a Promise[Consumer.Control] which you complete from mapMaterializedValue in the outer stream, then use Flow.futureFlow or Sink.futureSink to map from the future Consumer.Control only once it has gotten a value.

Thanks for the reply, after looking into it I came to the same conclusion.

Do you think there is merit in providing a toSubstreamMat method for a SubFlow that implements your solution (i.e. capturing the materialized values from the substream, not the main stream, and feeding that to the sink passed in toSubstreamMat)?

I don’t think it is a common enough use case to warrant specific APIs, but please open up an issue in the Akka issue tracker if you disagree and we can discuss it there.

Okay I will do some prototyping locally to see if its even worth it