Retry stream in akka on failure of any stage in flow

I am using akka stream to process my data. In which i have 1 Source which consist of element UUID.

The flow is as follows :

  1. is fetching the Element from some third party HTTP service which return complete Element with its properties.

  2. Then i retrieve required data from that element and convert it to object that my application understands.

  3. Then i write the data from that object to DB.

Finally i update the DB with status of all the elements in the stream.

Now i want to add retry mechanism to this flow so that if any of the stage in flow fails it should retry the stage for some no of times say 3 and if after that it fails then only failure of stream should happen. For example, if there are some issues with third party service like HTTP 504 error then most of the time after retrying this element success. So is there any way in akka to achieve this.

Currently i am maintaining 1 list to store all the failed element ids like below.


List<UUID> failedId = new CopyOnWriteArrayList<>();
Source.from(elementIdToProcess).map(f -> {
            return f;
            .runForeach(features -> {
                features.parallelStream().forEach(feature -> {
                    try {
                    } catch (Exception e) {
                        throw new RuntimeException(e);
            }, materializer);

Hmm. I never used the java API so I try to help with scala knowledge. In scala we have types for this kind of behavior for ex. a Try can be a Success or a Failure, so instead of throwing an exception, I use Try to encapsulate a good and bad cases to one single type. I’m sure there is something similar in java8 too.
The mutable List modifying inside the stream is a really bad decision. For example when you get your first exception, there are unprocessed elements in the beginning of the stream. Those elements will never get a proper retry. You don’t want to fail the whole stream!

So my idea is:

  • When you expect an exception try to catch it and add it to a wrapper class (no flying exceptions)
  • Do your job if the non exceptional case happened, pipe down the thing without modification if the exception happened
  • at the end of the stream filter out the good cases, and a Sink.toList will return the bad cases (don’t use foreach)

Java minded nearly working scala demonstration code:

case class Wrapper[T](data: T, exception: Throwable = null)
.map(d => Wrapper(d))
.map(w => 
  if(w.exception == null) 
    catch {case ex => Wrapper(null ex)})
.filter(w => w.exception != null)

Thank you for your reply. Yes Java has exception handling functionality like you did with try here but this will be custom implementation just like the one i have provided in my implementation. I want to know whether akka has any functionality to retry stream for particular element or not. I tries recoverWithRetries but it will skip the actual stream as soon as the failure happens for stipulated no of times. So are there any other api provided by akka for this type of use case ?

Its not really a “custom implementation”. Its your use-case. The difference in my example and in yours that mine is using encapsulation, and Futures, and threads, and (non) shared states the right way :stuck_out_tongue: (dont consider this as a personal attack pls) There is a commonly used way to wrap our data to “holders” instead of wild exception throwing, and generally the exception pattern in the java world is not healthy.

I have no idea what are the other tools you use, but I’m not convinced that the stream need to handle the retries. For example if you have some async http requester somewhere you can probably tell the async wrapper to try the retry with exponential backoff. If you have a sync http requester, you can use for/while. If you really want to solve this problem within the streams you can write a Graph with a cycle in it, but thats always risky if you have no exact knowledge how the used stages do back pressure.

The error handling strategies in the streams are prepared for lethal errors. Like the database go offline, or we went out of memory. The “user errors” like we can’t insert that to the db bcs malformed or other reasons; are non lethal in some use-cases, so the rule here is to wrap the non-lethal things to wrappers, and only rise exceptions if they considered to lethal or if we can’t handle them.

BTW if you insert things into database maybe this post will be informative too:

This section in the docs may also be useful for background about the explicit error handling vs failures in streams:

I went a bigger round with your problem and found this:
Not sure if it have a java api or not, but it seems to be the thing you are looking for.

2 more posts in the topic with tips and tricks and reasons (both scala sry):