Configure offset when consuming kafka topic

I am using Alpakka-kafka in scala to consume a Kafka topic. Here’s my code:

    val kafkaConsumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers(kafkaConfig.server)
        .withGroupId(kafkaConfig.group)
        .withProperties(
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG       -> "100",
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
        )

    Consumer
        .plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
        .runWith(Sink.foreach(println))

However, consumer only starts polling from the first uncommitted message in topic. I would like to always start from offset 0, regardless of messages being committed. With Alpakka consumer, how do I specify offset manually?

Hi @vasily802,

You have several different options here in Alpakka Kafka, but basically you must seek to offset 0 (or seek to earliest) explicitely. However, the configuration you have in your example:

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",

… should work if you’re not committing offsets back to Kafka. This consumer property says that if a committed offset isn’t found within Kafka for the group.id and subscribed partition specified, then start from the earliest available offset. If you never commit offsets then I would expect any time you run this code that it would begin from the earliest offset. The earliest offset may be 0 or something else, depending on the broker retention settings for that topic.

Another way to start from the earliest offset is to implement your own PartitionAssignmentHandler and in the onAssign method use the RestrictedConsumer instance to seekToBeginning.

A third way is to manage offsets yourself and use the Consumer.plainPartitionedManualOffsetSource to seek to a specific offset manually. See the Alpakka Kafka “Offset storage external to Kafka” docs for examples. You could pair this option with the MetadataClient to get the beginning offsets for your topic subscription and then seek to it in the getOffsetsOnAssign function passed to plainPartitionedManualOffsetSource.

1 Like