Alpakka Kafka Conditional Publish

pawelkaczor Thanks for the hint about the ProducerMessage.passThrough

I used it in the test mentioned above, like:

val control =
  Consumer
    .committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
    .map { msg =>
      if (msg.record.value().toInt % 2 == 0) {
        // For msg with even numbers: send to the targetTopic
        ProducerMessage.single(
          new ProducerRecord(targetTopic, msg.record.key, msg.record.value),
          // The `passThrough` field may hold any element that is passed through the `Producer.flow`
          // In our case it is the [[ConsumerMessage.CommittableOffset]] which is committed later in the flow
          msg.committableOffset)
      } else {
      // For msg with odd numbers: Create a pass-through message not containing any records
      ProducerMessage.passThrough[String,String,ConsumerMessage.CommittableOffset](msg.committableOffset)
      }
    }
    .toMat(Producer.committableSink(producerSettings, committerSettings))(DrainingControl.apply)
    .run()

With this, the targetTopic yields 10 messages containing even numbers (5 from topic1, 5 from topic2 instead of 10 each as in the original test without this conditional filtering). So the messages with the odd numbers are filtered and we have a way to do a conditional publish, yay!
The commits go to the Consumer topic(s). See also answer here: Kafka Transactional Producer

Kind regards
Paul