Commiting offsets for Kafka messages that are filtered

Hi, I’m using CommitableSource and I need at-least-once processing guarantees. I need to filter some messages from Kafka so I was wondering how to commit offsets for messages that are filtered out?
My pipeline looks something like this:

 source
      .throttle(25, 1.second)
      .filter(predicate)
      .groupedWithin(25, 5.seconds)
      .mapAsync(1) { batch =>
          processAsync(batch)
      }
      .toMat(Committer.sink(CommitterSettings(actorSystem)))(DrainingControl.apply)
      .run()