Retry failed elements in RestartFlow

Example code:

   var explosionCounter = 0
    Source(1 to 100).via(
      RestartFlow.withBackoff(1.second, 2.seconds, 0.1)(() => {
        Flow[Int].map(theActualNumber => {
          explosionCounter += 1
          if (explosionCounter % 2 == 0) throw new RuntimeException("Bobomb")
          theActualNumber
        })
      }
      )).runForeach(println)

Every element that throws an exception is swallowed, and the printed result is 1,3,5… etc. Is there a mechanism by which I can retry elements that caused the exception?

1 Like

One solution that pops to my mind right now is wrapping every element with a Source.single and having that wrapped in a restartable source:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import scala.concurrent.duration._

implicit val sys = ActorSystem()
implicit val mat = ActorMaterializer()

var explosionCounter = 0

Source(1 to 10).flatMapConcat(n =>
  RestartSource.onFailuresWithBackoff(1.second, 2.seconds, 0.1)(() => {
    Source.single(n).map(theActualNumber => {
      explosionCounter += 1
      if (explosionCounter % 2 == 0) throw new RuntimeException("Bobomb")
      theActualNumber
    })
  }
)).runForeach(println)
1 Like

Good thinking! Thinking inside the box.

I tried to find a way to reuse the same RestartSource/Flow but couldn’t see any. Not that it matters though.

Edit: Actually the fact that you have to rematerialize the flow for every element is rather problematic. Say you are writing to the database, you have to rematerialize that flow for every element… It’s possible to re-use the same flow, but then it will not be re-materialized if there is an error…

Here is the blog post by Colin Breck where I have seen this pattern for the first time I think: https://blog.colinbreck.com/rethinking-streaming-workloads-with-akka-streams-part-i/

And yes, If materializng the flow involves external resources, materializing it often can get pretty costly.

1 Like