Inconsistency with subscriber service and parallelism levels

I have a subscriber service which listens on Kafka topic for events.
It takes those events, and transforms them to commands for my entity.

val parallelism = 16

kafkaProxy.dataTopic.subscribe.atLeastOnce(
    Flow[Data]
      .flatMapConcat(data => Source(groupBySession(data)))
      .mapAsyncUnordered(parallelism)(sd => entityRefFor(sd.entityId).ask(RecordSessionData(sd.sessionId, sd)))
  )

groupBySession is basically

list.groupBy(_.session).map(/* wrap to domain type */).toList

I also have a read side, which reacts on events generated by my service which reacts to these commands.

When I publish data to Kafka (20 sessions), depending on parallelism levels, only a portion of sessions is actually processed.

For example, with parallelism level 16, 18 sessions are processed and recorded.
With 4, 6 sessions.
With 1, 3 sessions.
With 32, all 20 sessions.

I am running it in dev mode (via sbt lagom:runAll).
I also have another Lagom service subscribed to the same Kafka topic, but with different group id.

service1/src/main/application.conf

akka.kafka.consumer {
  kafka-clients {
    group.id = "service-1-group"
  }
}

service2/src/main/application.conf

akka.kafka.consumer {
  kafka-clients {
    group.id = "service-2-group"
  }
}

Am I doing something wrong here? Could someone shed some light?

Lagom’s Kafka subscriber assumes that you’re going to emit exactly one Done for each incoming element, and relies on this assumption to correctly align and commit the Kafka offset. Using flatMapConcat is therefore unsafe, as you’ll end up with more output elements than input elements and you’ll commit offsets that you haven’t successfully processed yet.

For a similar reason, mapAsyncUnordered is not safe to use: you could end up committing an offset for a later element before the processing of an earlier element has completed. Because Kafka treats the committed offset as “high water mark” (i.e., assumes that you’ve completed processing everything prior to the committed offset) this could also result in missed elements if the stream is restarted for any reason.

Instead you’ll need to make sure that you process each batch together and emit exactly one Done when the whole batch is complete, and that the order is preserved. You can use mapAsync to preserve the ordering, and you can nest a substream inside the mapAsync to iterate through the grouped elements. Something like this:

val parallelism = 16

kafkaProxy.dataTopic.subscribe.atLeastOnce(
  Flow[Data]
    .flatMapConcat(data =>
    .mapAsync(parallelism) { data =>
      Source(groupBySession(data)))
        .mapAsync(parallelism) { sd =>
          entityRefFor(sd.entityId).ask(RecordSessionData(sd.sessionId, sd))
        }
        .runWith(Sink.ignore)
    }
  )

(You might want to adjust for example using different parallelism values for the inner vs. outer mapAsync.)
You might not get the same level of throughput, but that’s a necessary sacrifice to maintain ordering.

1 Like

Ivan,

You can also check Scaling Lagom Broker.

Hope this also helps.

Br,
Alan

@TimMoore, thank you, I was not aware of this fact:

Lagom’s Kafka subscriber assumes that you’re going to emit exactly one Done for each incoming element

@aklikic, thank you as well, I’ll definitely take a look at it :D