Propagating a Failure upstream in Akka.Streams, to effectively restart the Source after Backoff not possible?

Lets say I have a simple Graph: Source → Flow → Sink
The Source is an AzureServiceBusSource.

The Flow takes each message and uses the provided information to make an HTTP request to an API. When that request is successful, I complete the message at the Service Bus, effectively removing it from the Queue. When it fails with an expected error value, it is forwarded to the Dead Letter Message Queue, also removing it from the Queue.

Now the problem is, how do I deal with transient failure in an effective way e.g. when an API is not available?

Inside the Flow, individual HTTP requests to this service are retried 5 times. Currently, when the retry attempts are exhausted, the element is discarded, and the next element is tried. This is not ideal, because the target service could still be unavailable, and I don’t just want to discard elements because of eventual consistency.

When the request has failed 5 times, I can conclude that the external service is unreachable. At this point, I immediately want to stop any new elements to be pulled from my Source, and I also don’t want to lose the element that caused the error.
Ideally I want to have a Backoff period, and afterwards the Flow tries to continue with the last element before failure.

What are my options?

It would be unneccessary to restart the whole stream, because downstream stages not depicted here could very well still work, why disrupt them?
A RestartFlow seems to be lossy, so the original message will be lost. I cannot afford that.
Another idea was to propagate the failure upstream to the Source, so it could be replaced with another instance of the source, effectively restarting it, but there seems to be no way to propagate Exceptions upstream.

What should I do now? Do I need to use a KillSwitch, a custom Graph Stage, or an Actor to do what I need to do?

Any hints would be appreciated.

One pattern which might be useful is to have an actor run the flow (e.g., feeding it with a Source.queue; the actor receives messages from the Azure Service Bus) and restarting the flow on stream failure. In order to make this restart less lossy, the actor feeds the flow with a pair of (element, context), where context is some object that uniquely identifies the element (possibly the element itself): the flow logic propagates the context until at its end it sends an acknowledgement message containing the context to the actor (the flow ideally does at most one of dropping or reordering elements (e.g. FlowWithContext-like semantics) to simplify the actor’s job). The actor then remembers the contexts and elements it has fed to the flow which haven’t been acknowledged and refeeds them when it restarts (potentially delaying and buffering with a backoff, etc.).

That said,

Another idea was to propagate the failure upstream to the Source, so it could be replaced with another instance of the source, effectively restarting it, but there seems to be no way to propagate Exceptions upstream.

Failure of a downstream does in fact always propagate as a failure upstream. Since the Azure Service Bus source can presumably fail (I admit I’m not familiar with its semantics, perhaps some sort of nack is required?), it may be useful to wrap that source and the flow to the point where it can fail in a RestartSource which presumably/hopefully results in the ASB source re-receiving the messages.

Thank you for your input.

I was already considering using Streams in combination with an Actor, but still wanted to check if it is possible with Streams only.

Regarding your second point, I found that confusing, because I could not verify that behavior in a simple example.
Maybe I am missing something.

When I throw an Exception in a Flow, and have a .recover on my Source, there is no Exception caught on that Source.
In that case, what does it mean for a Failure to be propagated upstream?

In terms of a RestartFlow the problem is that I cannot afford to lose any message. In my understanding the element is lost when a failure occurs and the flow is restarted.
The best approach would be to just restart the Service Bus Source, as the message that is not dealth will just be emitted again.

Yes I checked again RestartFlow is not possible because of transit messages being lost. I thought about wrapping the Azure Service Bus Source in a RestartSource like you suggested, however I have not found a good solution to informing the Source from downstream that a failure has occured and it needs to restart.

Edit:

one way would be to use a KillSwitch, but I have not found a way to propagate a KillSwitch materialized value from a Source that is wrapped inside a RestartSource …

recover only responds to upstream failures, because it emits an element in response to the upstream failure. In the case of a downstream failure, emission is impossible.

What happens when a downstream stage fails is that it closes its inlets and signals the upstream that the closure is because of a failure (vs. essentially saying “no more elements please”). The upstream then propagates the failure and will itself fail. If implementing using the GraphStage APIs, this signal can be captured via the onDownstreamFinish handler (typically stages don’t override the default behavior, but it’s possible to do other things (in general, a stage which overrides the default behavior should think very carefully about that and document it): RestartFlow in fact does just this to restart the flow).

Since the Service Bus source will emit the unacked messages in-flight if it’s restarted after failure, the typical approach in these cases will be to wrap both the source and flow in a RestartSource (remember that source.via(flow) is a source):

RestartSource.onFailuresWithBackoff(restartSettings) { () =>
  AzureServiceBus.source(...)
    .via(businessFlow)
    .via(AzureServiceBus.ackFlow)
}

If the materialized value of the source is important (e.g., it’s similar to the Control in Alpakka Kafka) there’s a standard hack to track the materialized value of the most recent restart in the Alpakka Kafka docs: Error handling • Alpakka Kafka Documentation

This is perfect. Of course just wrapping the whole flow in a RestartSource. It all became clear now, thank you very much!