Is it possible to cancel outstanding futures in mapAsync on stream failure?


(Igor Baltiyskiy) #1

Is it possible at all in Akka Streams to cancel the futures produced in mapAsync on stream failure? It seems to be desirable, because some computations are cancellable, and only this graph stage knows the current unresolved futures. Keeping track on the side is quite inconvenient.

Here’s the use case:

  • executing many requests through Cassandra driver,
  • with a common deadline.

Statement.executeAsync produces a ResultSetFuture, which has method cancel() — although it doesn’t cancel the request in Cassandra, it cancels its retry if it fails and it eagerly releases resources allocated in the driver to track this request.

I see that Alpakka Cassandra connector sidesteps this issue and never calls cancel(). Is it possible at all with Akka Streams?


(Enno) #2

No, Scala futures do not support cancelling so it can’t be done from mapAsync.

Where in Alpakka Cassandra do you see the need to cancel the futures?

Cheers,
Enno.


(Igor Baltiyskiy) #3

Yes, I know they don’t support cancelling. I was wondering if I could do something like register some kind of “on failure” handler so that if an element is being processed then this handler is called.

As for Alpakka Cassandra: suppose I’m using CassandraFlow and combine it with a timeout operator. I’d expect that when the stream is failed due to timeout, CassandraFlow will take care to call cancel().

Compare it with the traditional blocking usage from the driver documentation (https://docs.datastax.com/en/drivers/java/2.0/com/datastax/driver/core/ResultSetFuture.html#cancel-boolean-):

  ResultSetFuture future = session.executeAsync(...some query...);
   try {
       ResultSet result = future.get(1, TimeUnit.SECONDS);
       ... process result ...
   } catch (TimeoutException e) {
       future.cancel(true); // Ensure any resource used by this query driver
                            // side is released immediately
       ... handle timeout ...
   }

I’m afraid this kind of thing — cancellation on timeout — isn’t possible to express with Akka Streams at all :frowning:


(Igor Baltiyskiy) #4

Should I raise an issue in Alpakka / Akka Streams?


(Enno) #5

This specific case about the Alpakka Cassandra source not calling cancel can easily be solved in the CassandraSourceStage.
The Alpakka team at Lightbend plans to work on Cassandra the coming weeks as we want to consolidate with the tools built for Akka Persistence Cassandra. So you can expect major changes/improvements to Alpakka Cassandra.

But please, go ahead and file a bug report in https://github.com/akka/alpakka/issues so we can make sure this will be fixed when we get to work on it.

Cheers,
Enno.


(Igor Baltiyskiy) #6

Thanks. However I see it as a problem that you can’t do things like cancel() when you don’t own the stage implementation — am I correct that this is a limitation of Akka Streams architecture?


(Enno) #7

I might get you wrong, but the design of Futures in Scala and their use in use in Akka Streams does deliberately not have the possibility to cancel futures.

This future in the Cassandra API has a different design.

The Cassandra API usage in this stage might need to be improved, but I don’t think that reaching into the stage’s inner workings should be something a library user should be able to do.


(Igor Baltiyskiy) #8

My concern is that Akka Streams requires the user to modify the stage and doesn’t allow to do that outside. What could it allow instead:

  • a handler to be called when the stage is failed — something like what supervisor strategy provides for a stage, but with a finer granularity;

  • an alternative variant of mapAsync based on cancelable Futures, that would cancel these futures when the stage is failed.