Is there is any way to cancel stream immidiatly

In our application, we have an issue with using KillSwitch, even after shutdown() called we can see mapping operations (they can have blocking calls to DB) continue to execute for a long time.
I was able to reproduce the same behavior in a small code snippet. I would expect to see 10-20 outputs but I see all 100. We can’t change the implementation of functions called in the map operators.
Is there is any way to cancel stream faster?

    IntStream intStream = IntStream.range(0, 100);
    UniqueKillSwitch run = Source.fromIterator(intStream::iterator)
        .map(item -> {
          System.out.println(item);
          Thread.sleep(100);
          return item;
        })
        .viaMat(KillSwitches.single(), Keep.right())
        .toMat(Sink.foreach(item -> {
        }), Keep.left())
        .run(_actorSystem.getMaterializer());

    Thread.sleep(1000);
    run.shutdown();

Hi @dparamoshkin,

try again replacing the Thread.sleep(100) with throttle:

        IntStream intStream = IntStream.range(0, 100);
        UniqueKillSwitch run = Source.fromIterator(intStream::iterator)
                .throttle(10, Duration.of(1, ChronoUnit.SECONDS))
                .map(item -> {
                    System.out.println(item);
                    return item;
                })
                .viaMat(KillSwitches.single(), Keep.right())
                .toMat(Sink.foreach(item -> {
                }), Keep.left())
                .run(_actorSystem);

        Thread.sleep(1000);
        run.shutdown();

;-)

The reason is that all operations are fused into one actor running the stream, which is blocked on your Thread.sleep (or blocking operation), the killswitch depends on being able to interact with the stream, but cannot because it is blocked. If you move the map operation to a separate dispatcher for blocking operations, an async stream island has a buffer for performance reasons, that can be tweaked with the inputBuffer attribute, however, making it less undeterministic by making the buffer small can have a negative effect on the throughput of the stream, other operators often introduce buffers anyway, which you may not have control over, so better just live with that.

.via(Flow.fromFunction((Integer item) -> {
  System.out.println(item);
  Thread.sleep(100);
  return item;
}).async("akka.stream.blocking-io-dispatcher")
.withAttributes(Attributes.inputBuffer(1, 1)))

@ignasi35 I can’t change what inside the map - it could be long DB call, or Rest call, or anything else which is long running

@johanandren Thanks for your answer, is it effectively the same as using mapAsync(1, ...) ?

Yes, what is observable will be pretty much the same.

It’s important to run that async task/CompletableFuture on a threadpool specific for blocking tasks, just like using the “blocking-io-dispatcher” for the blocking section of the stream, so that it does not starve other parts of the system.