Akka streams exception handling

Hi,

What is the right way to handle exceptions in akka streams? Closing the stream and retrying is not an option as the failure happens due to invalid properties of the stream element: the exception would just occur again upon retry. Instead I would like to send the failing element to some separate sink in order to make the issue visible to the end-users and continue with the next element / keep the stream running.

My concern with what is described in https://doc.akka.io/docs/akka/current/stream/stream-error.html is that i do not get hold of the faulty element upon the exception. Which means in most cases i cannot output element specific information (e.g. element ID) to the end-user.

Thanks.

Hi,

Errors due to "invalid properties of the stream element” may be handled by treating these errors as data, which means passing the invalid elements downstream and divert them to an appropriate sink.

val flow: Flow[Int, Either[Valid[Int], Invalid[Int]], NotUsed] = Flow[Int]
  .map { x =>
    if (x % 2 == 0) Left(Valid(x))
    else Right(Invalid(x, Some(new Exception("Is odd"))))
  }
  .map {
    case left@Left(_) => businessLogicOn(left)
    case right@Right(_) => right
  }
  ...
  //Divert invalid elements
  .divertTo(errorSink.contramap(_.right.get), _.isRight)

This concept is explained (among others) in this inspiring talk by Colin Breck:

A complete sample is here:

Hope that helps
Paul

1 Like