Throughput and batching with Alpakka Kafka

Right, so, all producers have ProducerSettings::parallelism, which is the maximum number of in-flight objects (these objects may be MultiMessages or Messages). This maximum is enforced by an internal buffer in the MapAsync stage that is part of all producers.

Note that it’s a MapAsync rather than a MapAsyncUnordered, which means that the buffer only pulls new elements when the current head is finished (acknowledged by Kafka). This is probably a conscious design decision to support the commitableSink cases (Kafka-to-Kafka streams) - out-of-order production of results would commit offsets in the upstream consumer without guaranteeing that all records in between have been processed.

This means:

  • We are still using native Kafka producer batching. But: with a default flow using Messages (default parallelism is 100), we only allow 100 in-flight records at a time which may be way below batch sizes in Kafka, and we may incur additional latency by the order guarantee. In particular, if 100 messages don’t hit the batch.size in your KafkaProducer settings, then you may wait linger.ms for every batch of 100 records, since the latency of talking to Kafka may well be much higher than the time it takes to pull new elements into the flow stage.
  • Increasing ProducerSettings::parallelism should alleviate this problem.
  • Using MultiMessages should similarly alleviate the problem.
  • At first glance, it seems that the performance of the latter should be slightly better. The first one keeps two futures for every record, the latter keeps one for every record and one for every batch.
  • Arguably, it might be easier to see at first glance what the max number of inflight records is if we only adjust parallelism, though?

@extantpedant, does this help you at all?

For anyone with deeper knowledge of alpakka-kafka:

  • Does this sound like a sensible assessment?
  • Any comments on MultiMessage vs increasing parallelism?
  • Would it be sensible to provide producers that do mapAsyncUnordered in the library code?