Since my source is running indefinitely, I put the idleTimeout after the filter. I want my stream to stop and complete my stream if there’s no new elements after the filter within a certain amount of time. The idleTimeout is running a `TimeoutException as expected but before that there’s a AbruptIOTerminationException
This will result in a corrupted file and not properly closed gzip file. Am I using the correct built-in stage
for this or is there another stage I need to be aware of ?
I also needed a similar functionality. However, I was thinking some another way without failing the pipeline, so created following GraphStage:
package akka.stream.impl.fusing
import ...
final class Session[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.idle
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private var nextDeadline: Long = System.nanoTime + timeout.toNanos
setHandlers(in, out, this)
override def onPush(): Unit = {
nextDeadline = System.nanoTime + timeout.toNanos
push(out, grab(in))
}
override def onPull(): Unit = pull(in)
final override protected def onTimer(key: Any): Unit =
if (nextDeadline - System.nanoTime < 0)
completeStage()
override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout))
}
override def toString = "IdleTimeout"
}
It’s almost same as the akka.stream.impl.Timers.Idle, except after the given duration it calls completeStage rather than failStage. This solved my problem, but I’m a bit unsure this is an overkill or not. Wouldn’t it be better to have such functionality in the library?