I am using
akka-stream-alpakka-cassandra to read from one table and then do some transformations and save in another table. I modeled the reading stage through a
CassandraSource and the saving stage through a
Implementing the happy path is straightforward. Just the unhappy path poses some difficulties to me.
E.g. it can happen that during the save something goes wrong on Cassandra and the stream ends up failing (I didn’t define any
SupervisionStrategy, yet). What I would like to do is to be in full control of the error that occurred per emitted element, therefore the implementation of
createWithPassThrough is not particularly useful in this case because it is based on
mapAsync and if the
Future returned by
session .executeAsync(statementBinder(t, statement))
fails, the stream fails and stops. Putting a
recover at the end of it doesn’t solve the problem because the stream completes successfully and stops consuming from upstream.
Wouldn’t it give more control to the developer, to guarantee that the
mapAsync is always successful, and emit instead an
Either[Throwable, T]?, or even better a pair
(T, Either[Throwable, Done]) (we don’t want to expose the ResultSet), so one knows the input for the statement being bound, together with the outcome of the (write) query?
If there are other strategies to handle this, can someone give some suggestions?