I’m using version 0.22 of akka-stream-kafka_2.12 (the Alpakka Kafka connector).
I have two competing implementations of a particular Flow
in Akka Streams:
- One that is taking a batched-up sequence of messages, grouping them by what is effectively the key of the Kafka message, and then sequentially doing the application processing against each grouping by key.
- Another that is doing a
groupBy
by the Kafka message key on each batched-up sequence so that it can do the application processing in parallel on each grouping by key.
I’m able to confirm through a smoke test that both implementations are functionally working as expected.
I don’t have my logging configured correctly yet, so, in addition to logging, I’m producing some console output with printlns similar to what’s shown in the Akka Streams Cookbook. The console output is showing up just fine with the sequential implementation, but with the implementation that uses groupBy
and subflows, the console output is not showing up.
Here’s the sequential implementation where the output is being produced by logAfterProcessing
:
private def createFlowToProcessPerPartitionBatchOfEvents()(
implicit ec: ExecutionContext): Flow[Seq[CommittableMessageType], CommittableMessagesWithTiming, NotUsed] = {
Flow[Seq[CommittableMessageType]]
.mapAsync(1) { messages =>
streamedKafkaIOEventProcessorApplication.processCommittableMessages(messages)
}
.async
.map(messages => logAfterProcessing(messages))
.withAttributes(attributesFactory.createAttributesForEventProcessor())
}
Here’s the implementation that uses subflows. Note that the call to the method that produces the console output, logAfterProcessing
, happens after the subflows have been merged back into a single flow:
private def createFlowUsingSubFlowsToProcessPerPartitionBatchOfEvents()(
implicit ec: ExecutionContext): Flow[Seq[CommittableMessageType], CommittableMessagesWithTiming, NotUsed] = {
val mergedFlow: Flow[Seq[CommittableMessageType], CommittableMessagesWithTiming, NotUsed] =
Flow[Seq[CommittableMessageType]]
.groupBy(maxAccountNumbersPerAppServerWhenGroupingEventProcessingByAccountNumber, { messages =>
messages.groupBy { message =>
message.record.key()
}
})
.mapAsync(1) { messages =>
streamedKafkaIOEventProcessorApplication.processCommittableMessages(messages)
}
.async
.mergeSubstreams
.reduce((left, right) => combine(left, right))
.map(messages => logAfterProcessing(messages))
mergedFlow.withAttributes(attributesFactory.createAttributesForEventProcessor())
}
Does anyone have any idea why the console output is getting lost? What am I missing here?