pawelkaczor Thanks for the hint about the ProducerMessage.passThrough
I used it in the test mentioned above, like:
val control =
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map { msg =>
if (msg.record.value().toInt % 2 == 0) {
// For msg with even numbers: send to the targetTopic
ProducerMessage.single(
new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
// The `passThrough` field may hold any element that is passed through the `Producer.flow`
// In our case it is the [[ConsumerMessage.CommittableOffset]] which is committed later in the flow
msg.committableOffset)
} else {
// For msg with odd numbers: Create a pass-through message not containing any records
ProducerMessage.passThrough[String,String,ConsumerMessage.CommittableOffset](msg.committableOffset)
}
}
.toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply)
.run()
With this, the targetTopic yields 10 messages containing even numbers (5 from topic1, 5 from topic2 instead of 10 each as in the original test without this conditional filtering). So the messages with the odd numbers are filtered and we have a way to do a conditional publish, yay!
The commits go to the Consumer topic(s). See also answer here: Kafka Transactional Producer
Kind regards
Paul