Losing console output with use of subflows

I’m using version 0.22 of akka-stream-kafka_2.12 (the Alpakka Kafka connector).

I have two competing implementations of a particular Flow in Akka Streams:

  • One that is taking a batched-up sequence of messages, grouping them by what is effectively the key of the Kafka message, and then sequentially doing the application processing against each grouping by key.
  • Another that is doing a groupBy by the Kafka message key on each batched-up sequence so that it can do the application processing in parallel on each grouping by key.

I’m able to confirm through a smoke test that both implementations are functionally working as expected.

I don’t have my logging configured correctly yet, so, in addition to logging, I’m producing some console output with printlns similar to what’s shown in the Akka Streams Cookbook. The console output is showing up just fine with the sequential implementation, but with the implementation that uses groupBy and subflows, the console output is not showing up.

Here’s the sequential implementation where the output is being produced by logAfterProcessing:

  private def createFlowToProcessPerPartitionBatchOfEvents()(
      implicit ec: ExecutionContext): Flow[Seq[CommittableMessageType], CommittableMessagesWithTiming, NotUsed] = {
    Flow[Seq[CommittableMessageType]]
      .mapAsync(1) { messages =>
        streamedKafkaIOEventProcessorApplication.processCommittableMessages(messages)
      }
      .async
      .map(messages => logAfterProcessing(messages))
      .withAttributes(attributesFactory.createAttributesForEventProcessor())
  }

Here’s the implementation that uses subflows. Note that the call to the method that produces the console output, logAfterProcessing, happens after the subflows have been merged back into a single flow:

  private def createFlowUsingSubFlowsToProcessPerPartitionBatchOfEvents()(
      implicit ec: ExecutionContext): Flow[Seq[CommittableMessageType], CommittableMessagesWithTiming, NotUsed] = {
    val mergedFlow: Flow[Seq[CommittableMessageType], CommittableMessagesWithTiming, NotUsed] =
      Flow[Seq[CommittableMessageType]]
        .groupBy(maxAccountNumbersPerAppServerWhenGroupingEventProcessingByAccountNumber, { messages =>
          messages.groupBy { message =>
            message.record.key()
          }
        })
        .mapAsync(1) { messages =>
          streamedKafkaIOEventProcessorApplication.processCommittableMessages(messages)
        }
        .async
        .mergeSubstreams
        .reduce((left, right) => combine(left, right))
        .map(messages => logAfterProcessing(messages))
    mergedFlow.withAttributes(attributesFactory.createAttributesForEventProcessor())
  }

Does anyone have any idea why the console output is getting lost? What am I missing here?

I have a better handle now on this problem. As I should have realized sooner, it’s not particular to logging or console output. It’s that none of the real work after the mapAsync is happening, even though the mapAsync itself is indeed going through. I only realized this once I stopped the Kafka consumer and restarted it with the configuration switched to not use groupBy. When I would then start up the consumer, it would process a whole backlog of messages that the consumer, when configured to use groupBy, had failed to process. Part of the downstream work that wasn’t happening with groupBy was the batch commit back to Kafka.

Since the work I’m trying to parallelize is being invoked by a mapAsync between the groupBy and mergeSubstreams, I thought perhaps the problem had something to do with the mapAsync/async combination itself. So, for the sake of isolating the problem, I tried replacing that with a map:

.map { messages =>
  streamedKafkaIOEventProcessorApplication.processCommittableMessagesSynchronously(messages)
}

Still the same problem—the downstream work didn’t happen. Same if I did the mapAsync without the subsequent async.

I also tried inserting log and recover stages between each of the stages in the flow to try to capture some extra output. The recover calls produced no output. Presumably there was nothing to recover from. The log calls produced no output either, even inexplicably log calls around the mapAsync, even though the logging is otherwise configured correctly.

I also tried an arbitrary Sink to do the logging in addition to the map that does the logging:

        .alsoTo(Sink.foreach(messages => logAfterProcessing(messages)))
        .map(messages => logAfterProcessing(messages))

No output there either.

For no good reason, I also tried putting an arbitrary async boundary after the reduce. No difference.

Here’s the one interesting finding that gives a bit of a hint as to what may be going on. When I finally shut down the Kafka consumer, the logging output downstream from the mapAsync suddenly goes through. At this time, I haven’t distinguished whether that’s from the alsoTo to the Sink or the map that’s part of the Flow, or even both. (The batch commit back to Kafka that is part of the larger workflow does not happen here.)

This suggests to me that the demand is not percolating up from downstream the way it should be, although I don’t know that there’s anything else I can change to make that happen. Otherwise, the rest of the RunnableGraph is wired together the same whether or not I’m using groupBy.

Just one observation: you seem to use this as the key extraction function of the streams groupBy:

The result is that you use a Map[..., Seq[CommittableMessageType]] as the key for the stream grouping. That doesn’t seem to make sense.

Johannes, thanks for your response. I see now what you’re saying that what I’m doing doesn’t make sense. As in, how does a Map[String, Seq[TypedCommittableMessage]] relate back to a Seq[TypedCommittableMessage]? Thinking through your response finally leads me to realize that groupBy is the incorrect solution to my problem.

What’s maybe a bit different about my problem is that my input is not a stream of TypedCommittableMessage instances; it’s a stream of Seq[TypedCommittableMessage] instances because I’ve already batched them up. What I want to do is break up the single Seq[TypedCommittableMessage] input into multiple Seq[TypedCommittableMessage] outputs where every element in each sequence has the same key.

sequence 1:

key=123456789, value=…
key=123456789, value=…
key=123456789, value=…

sequence 2:

key=987654321, value=…
key=987654321, value=…

Instead of grouping by Map[String, Seq[TypedCommittableMessage]], I feel like I was wanting to group by TypedCommittableMessage.key. Then it occurred to me, as a bit of devil’s advocacy, maybe this is a case where I need to use mapConcat to get from a Seq[TypedCommittableMessage] to just a TypedCommittableMessage. That’s what finally led me to realize this is the incorrect solution to my problem. I don’t need to maintain potentially thousands of unique substreams cordoned off from each other when all I want to do is achieve some parallelism with the current batch of messages.

FWW, my interest in solving this problem had already shifted into the academic. I’d already come around to the notion that, if I want to break up each sequence by message key so that the work can be parallelized, I would be better off using the Partition construct from the GraphDSL. That way I can maintain a fixed degree of parallelism, say 10, and get out of the business of keeping track of all the incoming keys in memory. What you’re making me realize now is that, not only is groupBy less pragmatic, it’s also incorrect.