Losing reference to materialized value with Sink.lazyInit + SubFlow

I am currently in the process of using DrainingControl.apply in order to provide a convenient way to shut down an akka-stream that is committing Kafka cursors as described in Consumer • Alpakka Kafka Documentation i.e.

val committerSettings = CommitterSettings(system)

val control: DrainingControl[Done] =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic))
    .mapAsync(1) { msg =>
      business(msg.record.key, msg.record.value)
        .map(_ => msg.committableOffset)
    }
    .toMat(Committer.sink(committerSettings))(DrainingControl.apply)
    .run()

The problem I am currently having is that I am using a SubFlow in combination with .to and in doing so I am losing the reference to the materialization value (in this case a Consumer.Control which is required as an argument to DrainingControl.apply).

What I have have is something like

val committerSettings = CommitterSettings(system)

val sublfow = consumerStream.splitAfter(...)
subflow.to(
  // Lose reference to materialized value i.e. Future[Consumer.Control] when using
  // Sink.lazyInit
  Sink.lazyInit({ startValue =>
    for {
      sink <- createSink(startValue)
    } yield sink
  }, () => ???)
)

def createSink(startValue: CommittableOffset): Sink[???, ???] = {
  Flow[???].toMat(Committer.sink(committerSettings))(Keep.right)
  // Need to use DrainingControl.apply here
}

As can be seen in the comment I lose reference to the Consumer.Control materialized value in the SubFlow which I need to be passed into createSink (since that is where I use DrainingControl.apply).

Ideally I would like to achieve something like this

val committerSettings = CommitterSettings(system)

val sublfow = consumerStream.splitAfter(...)
subflow.to(
  val control = ??? // Get current materialized value here
  // Sink.lazyInit
  Sink.lazyInit({ startValue =>
    for {
      sink <- createSink(startValue, control)
    } yield sink
  }, () => ???)
)

def createSink(startValue: CommittableOffset, control: Consumer.Control): Sink[???, ???] = {
  Flow[???].withMaterializedValue(control).toMat(Committer.sink(committerSettings))(DrainingControl.apply)
}

The problem is that the “outer” stream and the subflow streams is not actually part of the same materialization, the substreams are materialized as needed based on the elements of the outer stream.

I think the easiest way to propagate the materialized value from one materialization to another stream is via side effect, capture it in a Promise[Consumer.Control] which you complete from mapMaterializedValue in the outer stream, then use Flow.futureFlow or Sink.futureSink to map from the future Consumer.Control only once it has gotten a value.