Inconsistency with subscriber service and parallelism levels

Lagom’s Kafka subscriber assumes that you’re going to emit exactly one Done for each incoming element, and relies on this assumption to correctly align and commit the Kafka offset. Using flatMapConcat is therefore unsafe, as you’ll end up with more output elements than input elements and you’ll commit offsets that you haven’t successfully processed yet.

For a similar reason, mapAsyncUnordered is not safe to use: you could end up committing an offset for a later element before the processing of an earlier element has completed. Because Kafka treats the committed offset as “high water mark” (i.e., assumes that you’ve completed processing everything prior to the committed offset) this could also result in missed elements if the stream is restarted for any reason.

Instead you’ll need to make sure that you process each batch together and emit exactly one Done when the whole batch is complete, and that the order is preserved. You can use mapAsync to preserve the ordering, and you can nest a substream inside the mapAsync to iterate through the grouped elements. Something like this:

val parallelism = 16

kafkaProxy.dataTopic.subscribe.atLeastOnce(
  Flow[Data]
    .flatMapConcat(data =>
    .mapAsync(parallelism) { data =>
      Source(groupBySession(data)))
        .mapAsync(parallelism) { sd =>
          entityRefFor(sd.entityId).ask(RecordSessionData(sd.sessionId, sd))
        }
        .runWith(Sink.ignore)
    }
  )

(You might want to adjust for example using different parallelism values for the inner vs. outer mapAsync.)
You might not get the same level of throughput, but that’s a necessary sacrifice to maintain ordering.

1 Like