Graph with cycle stops producing elements after some time

I’m trying to implement a locking mechanism per unique element across stages in a graph. My graph contains a cycle which works as a retry mechanism if an element fails to acquire a lock. But on running the graph, it stops producing elements after some time. I’m using version 2.5.31 with Scala 2.11.12

The code below can be used to reproduce the issue:

 implicit val ec: scala.concurrent.ExecutionContext = scala.concurrent.ExecutionContext.global

  implicit val system = ActorSystem("test")
  implicit val materializer = ActorMaterializer()
  val map: concurrent.Map[Int, Boolean] = TrieMap.empty[Int, Boolean]

  val source = Source.fromIterator(() => List.fill(400)(1).toIterator)

  val mutexFlow = Flow[Int].map { value =>
    (value, map.putIfAbsent(value, true).isEmpty)
  }

  val flow = Flow
    .fromGraph(GraphDSL.create() { implicit b =>
      val merge = b.add(MergePreferred[Int](1))
      val partition = b.add(Partition[(Int, Boolean)](2, _._2.fold(0, 1)))
      val delayFlow = b.add(Flow[(Int, Boolean)].map(_._1).delay(1000.milliseconds))
      val unwrapFlow = b.add(Flow[(Int, Boolean)].map(_._1))
      partition.out(0) ~> unwrapFlow
      partition.out(1) ~> delayFlow ~> merge.preferred
      merge ~> mutexFlow ~> partition.in
      FlowShape(merge.in(0), unwrapFlow.out)
    })
  val graph = source
    .via(flow)
    .mapAsync(10) { value =>
      Future {
        Thread.sleep(100)
        map.remove(value, true)
        println(value.toString)
      }
    }
    .to(Sink.ignore)
  graph.run()

This graph stops printing 1s after a while. I’m aware of the deadlock section in the documentation which talks about using buffers and a preferred merge to fix deadlock and liveliness issues but the scenario described in the docs is slightly different than this one. Can someone help me figure out where I’m going wrong?

1 Like