Commiting offsets for Kafka messages that are filtered

Hi, I’m using CommitableSource and I need at-least-once processing guarantees. I need to filter some messages from Kafka so I was wondering how to commit offsets for messages that are filtered out?
My pipeline looks something like this:

 source
      .throttle(25, 1.second)
      .filter(predicate)
      .groupedWithin(25, 5.seconds)
      .mapAsync(1) { batch =>
          processAsync(batch)
      }
      .toMat(Committer.sink(CommitterSettings(actorSystem)))(DrainingControl.apply)
      .run()

Hi @aleksandarskrbic,

As you guess, filtering will make it impossible to commit the offsets of those messages.

Depending on the ratio filtered vs non-filtered messages that might not be a problem as the offset of the non-filtered messages will commit even the filtered ones.

If it is important in your scenario to even commit offsets of messages that don’t require processing, you need to let the Committable reach the Committer.sink by changes to your processAsync.

Cheers,
Enno.

1 Like