I’m using Lagom 1.6.
In one microservice I subscribe to a topic of another microservice like this:
customerService.customerEventsTopic().subscribe
// And subscribe to it with at least once processing semantics.
.withMetadata
.withGroupId("ISTWriter")
.atLeastOnce(
Flow[Message[CustomerEvent]].map {
msg => {
...
Everything works fine except if I restart the application. After the restart I receive again recent messages I have already received before the restart.
It’s like those message where not committed.
I tried to set those properties related to committing messages:
Properties.setProp("lagom.broker.kafka.client.consumer.batching-size","1")
Properties.setProp("lagom.broker.kafka.client.consumer.offset-buffer","1")
Properties.setProp("lagom.broker.kafka.client.consumer.batching-interval","100 millis")
Properties.setProp("lagom.broker.kafka.client.consumer.enable.auto.commit","true")
Properties.setProp("lagom.broker.kafka.client.consumer.auto.commit.interval.ms","100")
but it doesn’t change anything.
What is the problem ?