Commiting offsets for Kafka messages that are filtered

Hi, I’m using CommitableSource and I need at-least-once processing guarantees. I need to filter some messages from Kafka so I was wondering how to commit offsets for messages that are filtered out?
My pipeline looks something like this:

      .throttle(25, 1.second)
      .groupedWithin(25, 5.seconds)
      .mapAsync(1) { batch =>