Alpakka-Cassandra Error handling


(Pierangelo Cecchetto) #1

I am using akka-stream-alpakka-cassandra to read from one table and then do some transformations and save in another table. I modeled the reading stage through a CassandraSource and the saving stage through a CassandraFlow.createWithPassThrough.

Implementing the happy path is straightforward. Just the unhappy path poses some difficulties to me.

E.g. it can happen that during the save something goes wrong on Cassandra and the stream ends up failing (I didn’t define any SupervisionStrategy, yet). What I would like to do is to be in full control of the error that occurred per emitted element, therefore the implementation of createWithPassThrough is not particularly useful in this case because it is based on mapAsync and if the Future returned by

session
          .executeAsync(statementBinder(t, statement))

fails, the stream fails and stops. Putting a recover at the end of it doesn’t solve the problem because the stream completes successfully and stops consuming from upstream.

Wouldn’t it give more control to the developer, to guarantee that the Future in mapAsync is always successful, and emit instead an Either[Throwable, T]?, or even better a pair (T, Either[Throwable, Done]) (we don’t want to expose the ResultSet), so one knows the input for the statement being bound, together with the outcome of the (write) query?

If there are other strategies to handle this, can someone give some suggestions?