I am currently in the process of using DrainingControl.apply
in order to provide a convenient way to shut down an akka-stream that is committing Kafka cursors as described in Consumer • Alpakka Kafka Documentation i.e.
val committerSettings = CommitterSettings(system)
val control: DrainingControl[Done] =
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1) { msg =>
business(msg.record.key, msg.record.value)
.map(_ => msg.committableOffset)
}
.toMat(Committer.sink(committerSettings))(DrainingControl.apply)
.run()
The problem I am currently having is that I am using a SubFlow
in combination with .to
and in doing so I am losing the reference to the materialization value (in this case a Consumer.Control
which is required as an argument to DrainingControl.apply
).
What I have have is something like
val committerSettings = CommitterSettings(system)
val sublfow = consumerStream.splitAfter(...)
subflow.to(
// Lose reference to materialized value i.e. Future[Consumer.Control] when using
// Sink.lazyInit
Sink.lazyInit({ startValue =>
for {
sink <- createSink(startValue)
} yield sink
}, () => ???)
)
def createSink(startValue: CommittableOffset): Sink[???, ???] = {
Flow[???].toMat(Committer.sink(committerSettings))(Keep.right)
// Need to use DrainingControl.apply here
}
As can be seen in the comment I lose reference to the Consumer.Control
materialized value in the SubFlow
which I need to be passed into createSink
(since that is where I use DrainingControl.apply
).
Ideally I would like to achieve something like this
val committerSettings = CommitterSettings(system)
val sublfow = consumerStream.splitAfter(...)
subflow.to(
val control = ??? // Get current materialized value here
// Sink.lazyInit
Sink.lazyInit({ startValue =>
for {
sink <- createSink(startValue, control)
} yield sink
}, () => ???)
)
def createSink(startValue: CommittableOffset, control: Consumer.Control): Sink[???, ???] = {
Flow[???].withMaterializedValue(control).toMat(Committer.sink(committerSettings))(DrainingControl.apply)
}