Prevent loss of first message on RestartSource.onFailuresWithBackoff with Transactional.source connected to Transaction.flow

I have a Transactional.source connected to Transactional.flow wrapped with RestartSource.onFailuresWithBackoff. I’m expecting the stream to restart itself in case of failures within my business logic. This works as I’m expecting when there is at least one committed message. The failed message is consumed again and reprocessed. But when my first message itself (First message in source topic ever) gives a failure in my business logic, the stream restarts once but the message is “lost”.
What am I missing here or is this expected behavior?

I have following code from the documentation here :

val stream = RestartSource.onFailuresWithBackoff(minBackoff = 1.seconds,maxBackoff = 30.seconds,randomFactor = 0.2)
 { () =>
    .source(consumerSettings, Subscriptions.topics(sourceTopic))
    .map { msg =>
      ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset)
    .via(Transactional.flow(producerSettings, transactionalId))