Throughput and batching with Alpakka Kafka

I’m trying to migrate an ETL pipeline from a plain java solution to Akka streams, but finding some sticking points.
Input comes from a UDP socket and output goes to Kafka.
One main WTF right now, is batching at the producer. I’m used to being able to set the max batch size and the linger.ms on the producer, and it handles batching automatically. With Akka Kafka as a sink, the batch size seems to be ignored, and all linger.ms does is backpressure on each message. Is this the desired behavior? I’ve partially worked around this with the MultiMessage construct. How does this compare to native Kafka producer batching?

I have no idea how the kafka or the kafka sink works, but as you described the problem you maybe want to use groupedWithin.

I have seen an issue that sounds very much like yours - the throughput of a single Alpakka Kafka Sink being very low, using many parallel streams with a Kafka Sink each scaling basically linearly.

I’m not sure about your observation of linger.ms backpressuring on every message though, if that were the case, the throughput should be much lower than what I am seeing currently.

Will keep you updated if I find anything. My impression so far was that Alpakka Kafka did use a standard Kafka producer and should use its batching, but our observations call for inspecting how that Kafka producer is actually used internally, I think…

Right, so, all producers have ProducerSettings::parallelism, which is the maximum number of in-flight objects (these objects may be MultiMessages or Messages). This maximum is enforced by an internal buffer in the MapAsync stage that is part of all producers.

Note that it’s a MapAsync rather than a MapAsyncUnordered, which means that the buffer only pulls new elements when the current head is finished (acknowledged by Kafka). This is probably a conscious design decision to support the commitableSink cases (Kafka-to-Kafka streams) - out-of-order production of results would commit offsets in the upstream consumer without guaranteeing that all records in between have been processed.

This means:

  • We are still using native Kafka producer batching. But: with a default flow using Messages (default parallelism is 100), we only allow 100 in-flight records at a time which may be way below batch sizes in Kafka, and we may incur additional latency by the order guarantee. In particular, if 100 messages don’t hit the batch.size in your KafkaProducer settings, then you may wait linger.ms for every batch of 100 records, since the latency of talking to Kafka may well be much higher than the time it takes to pull new elements into the flow stage.
  • Increasing ProducerSettings::parallelism should alleviate this problem.
  • Using MultiMessages should similarly alleviate the problem.
  • At first glance, it seems that the performance of the latter should be slightly better. The first one keeps two futures for every record, the latter keeps one for every record and one for every batch.
  • Arguably, it might be easier to see at first glance what the max number of inflight records is if we only adjust parallelism, though?

@extantpedant, does this help you at all?

For anyone with deeper knowledge of alpakka-kafka:

  • Does this sound like a sensible assessment?
  • Any comments on MultiMessage vs increasing parallelism?
  • Would it be sensible to provide producers that do mapAsyncUnordered in the library code?