Are produced messages ordered when using alpakka-kafka's `flexiFlow` and default settings?

Hello!

I’m using alpakka-kafka to consume from one Kafka cluster and produce to another. I’m using independent streams for each consumed partition, and a single producer flexiFlow is shared by all the streams. Ordering is important to me, meaning that, for a given consumed partition, messages should be published in the order they were consumed. I assumed this was the case, but re-reading the docs, I’m a bit unsure.

Specifically, I’m looking at the akka.kafka.producer.parallelism setting which defaults to 100. In the code, I see mapAsync used with this parallelism. It has me wondering whether a stream of ordered messages sent to a flexiFlow could be published out of order unless that setting is set to 1.

Here’s some code:

val producerFlow = Producer.flexiFlow(producerSettings)

val control = Consumer
  .committablePartitionedSource(consumerSettings, Subscriptions.topics("topic1"))
  .map {
    case (topicPartition, source) =>
      source
        .via(producerMessageMakerFlow)
        .via(producerFlow)
        .mapAsync(1)(_.committableOffset.commitScaladsl())
        .runWith(Sink.ignore)
  }
  .to(Sink.ignore)
  .run()

Specifically this is the code that got me wondering: https://github.com/akka/alpakka-kafka/blob/557927ae8083a2db494e64bee1e0f60523dbb310/core/src/main/scala/akka/kafka/scaladsl/Producer.scala#L158.

Thanks!

Hi David,

maybe the discussion in Throughput and batching with Alpakka Kafka is helpful to you?

Short summary:

  • The mapAsync part that you found is independent of the order of publishing. In itself, it only guarantees that you will receive the Results (i.e. the success notifications of the publishing process) in the same order as the messages you put into it. (If the Producer was using mapAsyncUnordered instead, this would not be guaranteed. The parallelism setting affects the size of the internal buffer than mapAsync uses, but that buffer uses ahead-of-line-blocking to guarantee ordering.)

  • The order of publishing is managed by the underlying KafkaProducer from the kafka clients library. The alpakka flexiflow will pass messages to the KafkaProducer in the same order they arrive, then collect the futures that track publishing success for each message and hand them back to you once they’re done. The KafkaProducer will usually keep the order, but make sure to read the documentation for the producer configs retries and max.in.flight.requests.per.connection at https://kafka.apache.org/documentation/ ! In particular, the default for max.in.flight.requests.per.connection is 5 and I don’t think alpakka overrides this by default, so you need to adjust your configuration to retain ordering.

  • Finally - although I’m certain you’re aware of that - “publishing in order” only preserves order for downstream consumers of your partitioner is the same for the source and target topic, as two messages consumed from the source will inherently lose their order if they are being published to two different partitions downstream.

Hope that helps!

Thanks! That’s very helpful, including the linked discussion.

After spending a bit more time with the code yesterday I actually came to the same conclusion as you. I meant to post back here but you beat me to it :-).