Akka Stream (async) feedback loop/cycle without message loss

Moving the discussion from Github Issue to here: https://github.com/akka/akka/issues/27908

tl;dr
When creating feedback loops/cycles in akka streams we need to setup the merge junction with eagerComplete so the stream can complete when the source completes. As soon as you introduce async flows at the feedback branch of the loop we loose messages when the source completes as merge cancels eagerly.
One use case can be a stream that gets a finite list as source and calls HTTP apis that return the actual result entities. The HTTP calls can fail (500, 429, etc.) and several retry mechanisms must treat the responses:

  • 429: parse the retry-after header value, delay for that value and pipe back into the actual HTTP request processing flow
  • 500: incremental retry delay
    The retried requests itself can fail, too - so it can happen that once a message passes the loop, it can go into the loop again.

To overcome this we can build a custom graph stage which propagates upstreamComplete to the feedback branch outlet to complete the merge inlets and propagate complete to the main outlet as well. But if there is a record that cycles in the loop to handle expected error cases this can’t be done with outlet completions.

Below you find a simple/naive implementation with merge with async feedback:

package akka.merge

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import akka.stream.testkit.scaladsl.StreamTestKit
import org.scalatest.{AsyncFlatSpec, BeforeAndAfterAll}
import org.slf4j.LoggerFactory

import scala.concurrent.{ExecutionContext, Future}

class AsyncFeedbackLoopTest extends AsyncFlatSpec with BeforeAndAfterAll {

  private val logger = LoggerFactory.getLogger(this.getClass)

  implicit private val system: ActorSystem = ActorSystem()
  implicit private val log: LoggingAdapter = Logging(system.eventStream, this.getClass)
  implicit private val ec: ExecutionContext = system.dispatcher

  implicit private val materializer: ActorMaterializer = ActorMaterializer()

  override def afterAll(): Unit = {
    materializer.shutdown()
    system.terminate()
  }


  it should "test feedback loop doesn't drop events when terminated with eagerComplete = false" in {
    val sink = Sink.seq[(Int, Int)]

    val graph =
      RunnableGraph
        .fromGraph(GraphDSL.create(sink) { implicit b =>
          sink =>
            import GraphDSL.Implicits._

            val source = Source((0 until 5).toList).map(x => x -> x).named("source").log("source")
            val merge = b.add(Merge[(Int, Int)](2, eagerComplete = false))
            val partition = b.add(Partition[(Int, Int)](2, x => if (x._2 % 2 == 0) 0 else 1))

            val add = b.add {
              Flow[(Int, Int)]
                .map(x => (x._1, x._2 + 1))
                .log("add")
            }

            // @formatter:off
            source ~> merge ~>        partition
                                      partition.out(0).log("sink") ~> sink
                      merge <~ add <~ partition.out(1)
            // @formatter:on

            ClosedShape
        })

    StreamTestKit.assertAllStagesStopped {
      graph
        .run()
        .map { result =>
          logger.info("got result: {}", result)
          assert(result.size == 5)
        } recover {
        case t =>
          fail(t)
      }
    }

  }

  it should "test async feedback loop doesn't drop events when terminated" in {
    val sink = Sink.seq[(Int, Int)]

    RunnableGraph
      .fromGraph(GraphDSL.create(sink) { implicit b =>
        sink =>
          import GraphDSL.Implicits._

          val source = Source((0 until 5).toList).map(x => x -> x).named("source").log("source")
          val merge = b.add(Merge[(Int, Int)](2, eagerComplete = true))
          val partition = b.add(Partition[(Int, Int)](2, x => if (x._2 % 2 == 0) 0 else 1))

          val add = b.add {
            Flow[(Int, Int)]
              .map(x => (x._1, x._2 + 1))
              .log("async-add.before")
              .mapAsync(1) { x =>
                Future {
                  Thread.sleep((math.random * 1000).toLong)
                  x
                }
              }
              .log("async-add.after")
          }

          // @formatter:off
          source ~> merge ~>        partition
                                    partition.out(0).log("sink") ~> sink
                    merge <~ add <~ partition.out(1)
          // @formatter:on

          ClosedShape
      })
      .run()
      .map { result =>
        logger.info("got result: {}", result)
        assert(result.size == 5)
      } recover {
      case t =>
        fail(t)
    }
  }

  it should "test feedback loop doesn't drop events when terminated" in {
    val sink = Sink.seq[(Int, Int)]

    RunnableGraph
      .fromGraph(GraphDSL.create(sink) { implicit b =>
        sink =>
          import GraphDSL.Implicits._

          val source = Source((0 until 5).toList).map(x => x -> x).named("source").log("source")
          val merge = b.add(Merge[(Int, Int)](2, eagerComplete = true))
          val partition = b.add(Partition[(Int, Int)](2, x => if (x._2 % 2 == 0) 0 else 1))

          val add = b.add {
            Flow[(Int, Int)]
              .map(x => (x._1, x._2 + 1))
              .log("add")
          }

          // @formatter:off
          source ~> merge ~>        partition
                                    partition.out(0).log("sink") ~> sink
                    merge <~ add <~ partition.out(1)
          // @formatter:on

          ClosedShape
      })
      .run()
      .map { result =>
        logger.info("got result: {}", result)
        assert(result.size == 5)
      } recover {
      case t =>
        fail(t)
    }
  }
}

The problem is that the merge stage can’t determine if there are messages in the flow connected to any of the inputs or merge (at least I’m not aware about any API for this) and the state of completeness can’t be determined.

A possible Workaround that comes to mind is to implement the error handling via an actor FSM, but I like to stay in the stream world as it’s dead simple to construct graphs, etc.
If we have a non terminating stream this edge case is not that relevant, but now I’m asking myself how people out there handle HTTP failures in akka stream - and possibly not knowing that they can loose messages