Akka Streams Resuming Materializer does not log exceptions


(Dolly Gyanchandani) #1

Hello all,

When I provide a resuming materializer to the stream, the exceptions that occurred in the stream do not get logged. Is there any way to log these exceptions?

import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Materializer, Supervision}

import scala.concurrent.duration.DurationInt

object Demo extends App {

  private implicit val actorSystem: ActorSystem = ActorSystem()

  lazy val settings: ActorMaterializerSettings =
    ActorMaterializerSettings(actorSystem).withSupervisionStrategy(Supervision.getResumingDecider)

  implicit lazy val resumingMat: Materializer = ActorMaterializer(settings)

  var counter = 0
  def eventGenerator(): Int = {
    counter += 1
    if (counter > 5) throw new RuntimeException("Could not create an event")
    else 1
  }

  Source.tick(0.millis, 10.millis, ()).map(_ ā‡’ eventGenerator()).runForeach(println)
}

(Dolly Gyanchandani) #2

Hi Konrad,

Thanks for your response. As you suggested, I added the log stage as shown below:

Source
    .tick(0.millis, 10.millis, ())
    .map(_ ā‡’ eventGenerator())
    .log("Error in event generation")
    .withAttributes(
      Attributes.logLevels(
        onElement = Logging.DebugLevel,
        onFinish = Logging.DebugLevel,
        onFailure = Logging.DebugLevel
      )
    )
    .runForeach(println)

After adding the log stage, elements that are successfully processed are getting logged, however, it does not log the exceptions. It seems the exceptions are not propagated to the downstream stages due to the resuming materializer. Is there something Iā€™m missing?