Kafka consumer failed to commit message received

I’m using Lagom 1.6.

In one microservice I subscribe to a topic of another microservice like this:

  customerService.customerEventsTopic().subscribe
    // And subscribe to it with at least once processing semantics.
    .withMetadata
    .withGroupId("ISTWriter")
    .atLeastOnce(
      Flow[Message[CustomerEvent]].map {
        msg => {
...

Everything works fine except if I restart the application. After the restart I receive again recent messages I have already received before the restart.

It’s like those message where not committed.

I tried to set those properties related to committing messages:

  Properties.setProp("lagom.broker.kafka.client.consumer.batching-size","1")
  Properties.setProp("lagom.broker.kafka.client.consumer.offset-buffer","1")
  Properties.setProp("lagom.broker.kafka.client.consumer.batching-interval","100 millis")
  Properties.setProp("lagom.broker.kafka.client.consumer.enable.auto.commit","true")
  Properties.setProp("lagom.broker.kafka.client.consumer.auto.commit.interval.ms","100")

but it doesn’t change anything.
What is the problem ?

Hi,

Check this topic.

Hole this helps.

Br,
Alan

@domschoen

There are somethings that could be happen,

  1. Fisrt, check if you are not duplicating events, if somehow the event are send more than one time this could be happen.

  2. Second, you are using .atLeastOnce semantic is it means that you ill just commits the offset after messages processing, if somehow your services down before commiting the offset then this ill happen. So you can check the messages offset in the embedded Cassandra in development mode to see if this is happen.