Running into AbruptIOTerminationException with idleTimeout usage



I’m trying to do the following:

Source.repeat(ByteString("Hello World!"))
.filter(e => myFilter(e))
.idleTimeout(5 seconds)

Since my source is running indefinitely, I put the idleTimeout after the filter. I want my stream to stop and complete my stream if there’s no new elements after the filter within a certain amount of time. The idleTimeout is running a `TimeoutException as expected but before that there’s a AbruptIOTerminationException

This will result in a corrupted file and not properly closed gzip file. Am I using the correct built-in stage
for this or is there another stage I need to be aware of ?


(Gergő Törcsvári) #2

You can try recover:


Thanks for the hint @tg44 I manage to do it with:

Source.repeat(ByteString("Hello World!"))
.filter(e => myFilter(e))
.idleTimeout(5 seconds)
      case NonFatal(ex) => {
        println(s"======> Flow recovered : $ex", ex)

(Efe Kahraman) #4


I also needed a similar functionality. However, I was thinking some another way without failing the pipeline, so created following GraphStage:


import ...

final class Session[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
  override def initialAttributes = DefaultAttributes.idle

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with InHandler with OutHandler {
      private var nextDeadline: Long = System.nanoTime + timeout.toNanos

      setHandlers(in, out, this)

      override def onPush(): Unit = {
        nextDeadline = System.nanoTime + timeout.toNanos
        push(out, grab(in))

      override def onPull(): Unit = pull(in)

      final override protected def onTimer(key: Any): Unit =
        if (nextDeadline - System.nanoTime < 0)

      override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout))

  override def toString = "IdleTimeout"


It’s almost same as the, except after the given duration it calls completeStage rather than failStage. This solved my problem, but I’m a bit unsure this is an overkill or not. Wouldn’t it be better to have such functionality in the library?