Running into AbruptIOTerminationException with idleTimeout usage


#1

Hello,

I’m trying to do the following:

Source.repeat(ByteString("Hello World!"))
.filter(e => myFilter(e))
.idleTimeout(5 seconds)
.via(Compression.gzip())
.runWith(FileIO.fromPath(file))

Since my source is running indefinitely, I put the idleTimeout after the filter. I want my stream to stop and complete my stream if there’s no new elements after the filter within a certain amount of time. The idleTimeout is running a `TimeoutException as expected but before that there’s a AbruptIOTerminationException

This will result in a corrupted file and not properly closed gzip file. Am I using the correct built-in stage
for this or is there another stage I need to be aware of ?

Regards,


(Gergő Törcsvári) #2

You can try recover: https://doc.akka.io/docs/akka/2.5/stream/stream-error.html#recover


#3

Thanks for the hint @tg44 I manage to do it with:

Source.repeat(ByteString("Hello World!"))
.filter(e => myFilter(e))
.idleTimeout(5 seconds)
    .recover({
      case NonFatal(ex) => {
        println(s"======> Flow recovered : $ex", ex)
        ByteString("\n")
      }
    })
.via(Compression.gzip())
.runWith(FileIO.fromPath(file))

(Efe Kahraman) #4

Hey,

I also needed a similar functionality. However, I was thinking some another way without failing the pipeline, so created following GraphStage:

package akka.stream.impl.fusing

import ...

final class Session[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
  override def initialAttributes = DefaultAttributes.idle

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new TimerGraphStageLogic(shape) with InHandler with OutHandler {
      private var nextDeadline: Long = System.nanoTime + timeout.toNanos

      setHandlers(in, out, this)

      override def onPush(): Unit = {
        nextDeadline = System.nanoTime + timeout.toNanos
        push(out, grab(in))
      }

      override def onPull(): Unit = pull(in)

      final override protected def onTimer(key: Any): Unit =
        if (nextDeadline - System.nanoTime < 0)
          completeStage()

      override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout))
    }

  override def toString = "IdleTimeout"

}

It’s almost same as the akka.stream.impl.Timers.Idle, except after the given duration it calls completeStage rather than failStage. This solved my problem, but I’m a bit unsure this is an overkill or not. Wouldn’t it be better to have such functionality in the library?