Ordered Processing of Messages consumed from Kafka

kafka
(Sigurd) #1

Hey guys,

I have a question about the committing of messages in Kafka.
Kafka use a offset which is represented by a ascending number for a consumer group.
So the committing have to be done in correct order, right?
If I have three messages a,b,c and in this case a batch size of 1, then a need to commit a, then b and finish with c.

It is allowed to use a unordered operator such as mapAsyncUnordered?

In this example, the third line is the important part (process messages of one partition):

committablePartionedSource(..)
.mapAsyncUnordered(bufferSizer, pair -> pair.second()
    .mapAsyncUnordered(buffer, business())
    .runWith(commiterSink(),mat))
.toMat(Sink.ignore(), Keep.both())
.mapMaterializedValue(pair -> Consumer.createDrainingControl(pair))
.run(mat)

Is this process correct or break this the committing process of kafka?

Best
Sigurd

(Johan Andrén) #2

If you want to keep the ordering of the events you cannot use mapAsyncUnordered but must instead use mapAsync which keeps the original ordering.

With re-ordering and committing offset to Kafka you could end up committing a higher offset and then dropping a lower offset element and losing data.