How Do I Pause consumption from kafka source when using groupBy/groupedWithin

I have a flow that looks something like the code below and what is happening is I don’t have great control of memory usage. What are some options to cause backpressure and pause retrieving more data from the source.

        commitableKafkaSource
            // TODO: add some sort of stoplight that causes backpressure under certain condition
            .map(messageIntake::convert)
            .filter(Optional::isPresent)
            .map(Optional::get)
            .groupBy(Integer.MAX_VALUE, Pair::first)
            .map(Pair::second)
            .groupedWithin(config.getMaxBatchSizeInAkkaStream(),
                Duration.ofSeconds(config.getMaxTimeWindowBySeconds()))
            .map(batchProcessor::processBatch)
            .filter(Optional::isPresent)
            .map(Optional::get)
            .mergeSubstreams()
            .toMat(KafkaSink.committableSink(config.getOutputStream()), Keep.both())
            .run(materializer);

In general, that stream will only backpressure if there’s not much variety in the firsts of the pairs and processBatch takes longer than getMaxTimeWindowBySeconds to process a batch (or if messageIntake::convert is slow, but that doesn’t seem to be the case).

May I ask what message-processing guarantee you’re expecting? committableSink at the end suggests at-least-once processing, but groupBy -> ... -> mergeSubstreams is at-least-never except when grouping by the Kafka partition (in which case, Integer.MAX_VALUE seems misplaced: getting the number of Kafka partitions would be a better choice).

Can you describe what processBatch does? If it’s scheduling work to be done asynchronously (e.g. sending a message to an actor) and immediately returning the CommittableOffset, that’s not much work being done in processBatch. I’d investigate implementing in terms of mapAsync, which will eventually backpressure based on when the scheduled work is completed.

If using Akka 2.8, rather than mapAsync and you’re storing something that’s not the topic and partition in Pair::first, it might be worth investigating mapAsyncPartitioned as a stage and dispensing entirely with groupBy -> ... -> mergeSubstreams: that will preserve an at-least-once guarantee.

Thanks for the response. So this is used in a service that is intended to reduce the number of writes to a datastore. We allow any number of services (S) to write to a kafka topic (SourceTopic with ~20 partitions). The message they send is basically instructions to write record R to destination D. Once we have collected a set of records intended to write to a destination, we store them temporarily and emit kafka message to another topic (DestinationTopic) to read from the temporary location and perf the bulk append. We unfortunately don’t really have control over the number of destinations. So essentially what we are doing is saying

.groupBy(Integer.MAX_VALUE, Destination)
.groupedWithin(10000, 15 minutes)

Up until recently this worked with some tuning on length of time, but we have a new application that changes the patterns of how many destinations and the number of records going to each destination (essentially many more destinations and a lot fewer records per destination.

The service is supposed to provide atLeastOnce semantics, and are slow to upgrade, we finally got off 2.5 last week.

From my observations and testing since recently coming on to the team, the current implementation seems to give the at-least-once processing guarantee (e.g. restart pods in the middle of collection will reprocess the messages from the source topic.). Perhaps what you are saying is something I hadn’t yet considered to be a problem, which is, if we get to the .mergeSubstreams() and the restart happens before we are able to write the message to the DestinationTopic, then we will drop messages. I unfortunately don’t have the best knowledge of how akka works and have had difficulty finding best place to better learn about it. My assumption upon reading our code was that the materializer would be responsible for the commit of the source messages and that would happen right after the output message was committed to the output topic. The code also uses deprecated method commitJavaDsl to commit source messages that shoudln’t be processed (hence the use of the .filter(Optional::isPresent).map(Optional::get)). I wasn’t quite sure how to rewrite to not use the deprecated methods.

mergeSubstreams doesn’t guarantee to preserve ordering of events which got groupBy’d into different partitions, which means that if your groupBy puts a message from partition 1, offset 1 into substream A, and a message from partition 1, offset 2 into substream B, the downstream might see (and commit) offset 2 before it sees (and commits) offset 1 (which would, if the stream failed immediately after committing offset 1, reprocess the offset 2 message when it restarts). If, however, processing of the offset 1 message fails (failing the stream) after offset 2 has been committed, the stream will restart at offset 3 (viz. offset 1 never successfully processes).

The committer sink batches offset commits, which (not intentionally, as far as I can tell) can ameliorate this, but there exists fairly possible scenarios where this approach fails to guarantee at-least-once.

Each substream adds (if nothing else) a fair amount of memory overhead to manage the substream. So cutting down on the number of substreams should go a long way to reducing the memory blow-up:

.groupBy(64, destination.hashCode() % 64)  // 64 substreams

That will put messages for multiple destinations in a substream which will get batched up. A mapConcat stage right after the groupedWithin can break up the batches and emit a batch for each destination (probably a good idea to sort the batches so the batch with the earliest latest offset (if that makes sense…) is emitted first).

If you’re using the standard Kafka produce API, that’s async, so your processBatch likely returns before the message to be produced has actually been produced. It may be worth restructuring things so you use the Producer.flexiFlow method (see docs for the latest version of Alpakka Kafka that supports any release of Akka 2.5.x): you could then not use the commitJavaDsl method: just put the offset in a ProducerMessage.passThrough. For the multi-batch, you would put the latest offset in the batch as the passThrough in a ProducerMessage.multi. The flexiFlow then will emit the offsets downstream: no need to unwrap Optionals.

Slap a reasonable sized buffer wit backpressure strategy in front of group by.