I have a Transactional.source
connected to Transactional.flow
wrapped with RestartSource.onFailuresWithBackoff
. I’m expecting the stream to restart itself in case of failures within my business logic. This works as I’m expecting when there is at least one committed message. The failed message is consumed again and reprocessed. But when my first message itself (First message in source topic ever) gives a failure in my business logic, the stream restarts once but the message is “lost”.
What am I missing here or is this expected behavior?
I have following code from the documentation here : https://doc.akka.io/docs/alpakka-kafka/current/transactions.html
val stream = RestartSource.onFailuresWithBackoff(minBackoff = 1.seconds,maxBackoff = 30.seconds,randomFactor = 0.2)
{ () =>
Transactional
.source(consumerSettings, Subscriptions.topics(sourceTopic))
.via(businessFlow())
.map { msg =>
ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset)
}
.via(Transactional.flow(producerSettings, transactionalId))
}
stream.runWith(Sink.ignore)
Thanks!