Error handling in streams


(Adam) #1

Hey everyone,

When error handling in akka streams - is there some common pattern for bubbling errors to a “host” Actor?

Materializing some Future[Done] from somewhere in the graph and watching it is somewhat misleading, since errors upstream (or “sidestream”) to it are invisible.

Materializing all the values often leads to 10+ futures that you have to mess around with.

Since the materializer’s decider sees the exception, is it a good idea to message the host Actor with the result from there? seems somewhat unnatural.
Is there another way to handle these sorts of issues?

Side note: whenever possible, of course encoding errors in types (Either/Try) to handle errors on the application-level is great, I’m referring to cases where errors are out of my control, aka library users or external components.

Thanks


(Johan Andrén) #2

I think there is a terminology issue here: errors are propagated downstream - towards the Sinks, but upstream is cancelled, so upstream will not see the error that made the stream fail. If you have a sink that materializes into a Future[Done] that should be fine to send to the actor itself and handle it in whichever way you see fit. If there is no such sink you could inject a .watchTermination stage just before the sink, that way you will at least see the error for all other cases than when it comes from the Sink itself.

Colin Breck has written a nice three part article series about encapsulating a stream inside an actor that may provide some insights: http://blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-ii/


(Adam) #3

Hey, thanks for the reply.

Yeah, by “invisible errors” I meant components not directly downstream to the error can only see a cancellation, and in some cases (branching away from upstream) - a completion (which is quite confusing).

I use something very similar to watchTermination (it’s a stage combining killswitch+watch) - but when you need Graphs that branch out to multiple Sinks - you have no option but materializing all Future[Done]s and sequencing them (or whatever).
Unfortunately it makes the GraphDSL code extremely verbose and error-prone.

I’ve read the article before and it’s great, however it doesn’t give any insight into the multiple branching scenario.
What do you think about the “messaging from decider” thing? Since we’re usually logging there, it already has side effects - but it feels too strange.


(Adam) #4

Just to illustrate what I mean:

class BranchingTest
    extends TestKit(ActorSystem())
    with WordSpecLike
    with Matchers
    with ScalaFutures {

  implicit val ec = system.dispatcher
  implicit val mat = ActorMaterializer()

  val argEx = new IllegalArgumentException("bla")

  "graph1" should {
    "example" in {
      val sink1 = Sink.ignore
      val sink2 = Sink.ignore
      val sink3 = Sink.foreach[Int] {
        case 3 => throw argEx
        case n => println(n)
      }

      import GraphDSL.Implicits._
      val g = GraphDSL.create(sink1, sink2, sink3)(Tuple3.apply) {
        implicit b => (s1, s2, s3) =>
          val src = b add Source(1 to 5)
          val broadcast = b add Broadcast[Int](3)

          src ~> broadcast
          broadcast ~> s1
          broadcast ~> s2
          broadcast ~> s3

          ClosedShape
      }

      val (d1, d2, d3) = RunnableGraph.fromGraph(g).run()
      whenReady(d1)(_ shouldBe Done)
      whenReady(d2)(_ shouldBe Done)
      whenReady(d3.failed)(_ shouldBe argEx)
    }
  }
}

This passes with:

1
2

Process finished with exit code 0

(Sink1 and Sink2 are complete with Success(Done))

With 3+ sinks (common case for ETLs here) and other materialized values (killswitch, internal actor refs, etc) you often find yourself dealing with “too many” materialized values.


(Johan Andrén) #5

Ah, I see, didn’t read more complex graphs into the original question, but you are right, that complicates things.

I’d still recommend to prefer a watchTermination over doing tricks with the decider, you could side-effect that future to the enclosing actor through something like mapMaterializedValue(future => future.pipeTo(self)) or something along those lines to avoid the problem of actually getting the many materialized values out of the stream when actually running it.


(Adam) #6

Thanks, I’ll try to work around it.
In my experience complex graphs make people forget what gets passed into the GraphDSL block and what isn’t, and end up missing critical materialized values.

We tried ‘spying’ on the Graph from somewhere in the center, but I guess there’s no way to make that happen cleanly.