Kafka consumer <plain-Partitioned-Manual-Offset-Source>

Hello,

I want to store the kafka offset externally and this plainPartitionedManualOffsetSource seems to be a good fit for my use case. I have a working code without any commit using plainSource that uses slick to push the data to Postgres. I am using Slick.flow for that.

I need to change the code so that
a) i should be able to be able to load the offset during source creation. Partition and groupId are provided from the config. IE, i should be able to provide offset, partition and groupId apart from topic.
b) store the offset after inserting the data to postgres

For this Slick.flowWithPassThrough seems to be fine.

I do not find any example to how to create the consumer with plainPartitionedManualOffsetSource in java. I am unable to follow scala.

Can someone direct me to proper example or provide documentation for both plainPartitionedManualOffsetSource and Slick.flowWithPassThrough ?

Thanks and Regards

I have no real idea about the kafka part, but from the documentation it seems plainPartitionedManualOffsetSource is a Source[(TopicPartition, Source[ConsumerRecord, NotUsed]), NotUsed] which could be strange bcs this source generates other sources. flatMapMerge or mapAsync could be a good resolver, but probably this is not what you are looking for.

About the slick part;
https://doc.akka.io/docs/alpakka/current/slick.html#flow-with-pass-through
The first function is a (A, Connection) => SQLQuery like function, basically what SQL should you run for every element.
The second function is an (A, B) => C where A is the input element (same as the first function), and B is what the SQLOutput was.
The whole Flow will emit Cs.

For a more concrete example, if A is a Commitable message, and the first function is a simple insert, the second function will get a (Commitable, Int) where the int is the inserted row number, and you can simply return with the Commitable message, and commit it in a mapAsync. If A is a product and the first function is some kind of select for the historical prices, your second function will get a (Product, List[Prices]) and could create a png image with the products price history, or return other aggregated data.