Bug or by design behavior ? Stoping a stream via KillSwitch + map combination

Given akka doc I would expect the stream to stop after 7th/8th element. Why is it not stopping ? It continues all the way to the last element (20th).

What I want to achive is that on system terminate, the stream stops requesting new elements and the system to wait termination until all elements in the stream are fully processed (reach the sink)

object StreamKillSwitch extends App {

implicit val system = ActorSystem(Behaviors.ignore, “sks”)
implicit val ec: ExecutionContext = system.executionContext

val (killStream, done) =
Source(1 to 20)
.viaMat(KillSwitches.single)(Keep.right)
.map(i => {
system.log.info(s"Start task $i")
Thread.sleep(100)
system.log.info(s"End task $i")
i
})
.toMat(Sink.foreach(println))(Keep.both)
.run()

CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceUnbind, “stop-receiving”) {
() => Future(killStream.shutdown()).map(_ => Done)
}

CoordinatedShutdown(system)
.addTask(CoordinatedShutdown.PhaseServiceRequestsDone, “wait-processing-complete”) {
() => done
}

Thread.sleep(720)

system.terminate()
Await.ready(system.whenTerminated, 5.seconds)
}

also on stackoverflow: https://stackoverflow.com/questions/65062099/gracefully-stopping-an-akka-stream/

Hi @Adri,

there is a race condition between thread execution and termination. Try your code with 2000 elements, instead of 20 and you’ll see how the termination does happen.

The use of Thread.sleep doesn’t help either because that’s a blocking API that will keep hold of the thread invoking it. You could try, the following instead:


object StreamKillSwitch extends App {

  implicit val system = ActorSystem(Behaviors.ignore, "sks")
  implicit val ec: ExecutionContext = system.executionContext
  private val cs: CoordinatedShutdown = CoordinatedShutdown(system)

  val latch = new CountDownLatch(8)

  val (killStream, done) =
    Source(1 to 2000)
      .viaMat(KillSwitches.single)(Keep.right)
      .throttle(1, 1.second)
      .map(i => {
        system.log.info(s"Start task $i")
        latch.countDown()
        system.log.info(s"End task $i")
        i
      })
      .toMat(Sink.foreach(println))(Keep.both)
      .run()

  cs.addTask(CoordinatedShutdown.PhaseServiceUnbind, "stop-receiving") { () =>
    Future(killStream.shutdown()).map(_ => Done)
  }

  cs.addTask(
    CoordinatedShutdown.PhaseServiceRequestsDone,
    "wait-processing-complete"
  ) { () =>
    done
  }

  latch.await()
  private val completed = cs.run(CoordinatedShutdown.JvmExitReason, None)
  Await.ready(completed, 5.seconds)
}

See how I replaced the sleep with throttle on the stream and I replaced the sleep(720) outside the stream with a countdown latch. This implementation still may allow more than 8 elements across the stream since there’s still a race between buffered messages and the completion triggered by kill-switch. In my machine I get something like:


SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay
[2020-11-30 11:06:51,961] [INFO] [akka.event.slf4j.Slf4jLogger] [sks-akka.actor.default-dispatcher-3] [] - Slf4jLogger started
1
[2020-11-30 11:06:52,042] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-3] [] - Start task 1
[2020-11-30 11:06:52,042] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-3] [] - End task 1
2
[2020-11-30 11:06:53,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 2
[2020-11-30 11:06:53,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 2
3
[2020-11-30 11:06:54,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 3
[2020-11-30 11:06:54,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 3
4
[2020-11-30 11:06:55,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 4
[2020-11-30 11:06:55,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 4
5
[2020-11-30 11:06:56,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 5
[2020-11-30 11:06:56,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 5
6
[2020-11-30 11:06:57,060] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 6
[2020-11-30 11:06:57,060] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 6
7
[2020-11-30 11:06:58,061] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 7
[2020-11-30 11:06:58,061] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 7
8
[2020-11-30 11:06:59,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 8
[2020-11-30 11:06:59,062] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 8
9
[2020-11-30 11:07:00,063] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - Start task 9
[2020-11-30 11:07:00,063] [INFO] [akka.actor.typed.ActorSystem] [sks-akka.actor.default-dispatcher-7] [] - End task 9
[success] Total time: 9 s, completed Nov 30, 2020 11:07:00 AM

Which is one element too many because of that race.

Cheers,