Complete stream without exception

I’ve found myself wanting to complete a stream without having to raise an exception. To do this I use recoverWith and an empty source. This is quite cumbersome in Java (with a timeout exception as the trigger):

PartialFunction<Throwable, Source<EndDeviceEvents.Event, NotUsed>> completer = 
    new PFBuilder()
            .match(TimeoutException.class, ex -> Source.empty()).build();
...
.recoverWithRetries(1, completer)

Has there been a consideration for a shortcut e.g.:

.completeOnError()

…or may be:

.recoverWithRetries(1, FlowOps.ignore())

I think .completeOnError[T] for Scala and .completeOnError(Class<T>) for Java would be nice.

Yes, I agree those would be a good addition. Had similar need some time ago

Naming could be mapErrorToComplete perhaps since we have a concept like mapError already

You also said that you wanted to complete without raising an exception, so would that be something like:

def completeWhen(predicate: T => Boolean)

It can already be done with a SharedKillSwitch.

Regarding the cumbersome recoverWithRetries I created issue https://github.com/akka/akka/issues/24888 (contribution welcome).

Thanks for the dialogue and raising the issue! I’ll see what I can do re a contribution.

FYI, such completeWhen is exactly takeWhile(not condition)… :slight_smile: I had proposed such complete when some time ago but we decided against it back then, though I would be ok with adding it — since people don’t discover it as they are looking for “complete”.

Right. Well, adding operators with different names doing the same thing will just be confusing.

We can also address this problem by documentation; when we move the docs to “directive style”, we can make a page for operator/completeWhen.html which would just say "it is the same as takeWhile(!...) and direct there. We could this way fix this. Though since the ! makes it slightly different we could consider making it a real one hm