How to get materialized value of a Flow you don't control?

If I’m breaking any rules by posting this, I’ll take it down. Sorry, I’m just very curious to know the answer for this.

I asked a question like this recently on StackOverflow but wanted to know if I can get some direction from the akka experts. Here is the question which has the total context:

TL;DR - The akka http websocket API wants a Flow which it can handle. I’m trying to use something like the below to give it a flow:

private lazy val connHub: Flow[Message, Message, UniqueKillSwitch] = {
    val (sink, source) = MergeHub.source[Message].toMat(BroadcastHub.sink[Message])(Keep.both).run()
    Flow.fromSinkAndSourceCoupled(sink, source).joinMat(KillSwitches.singleBidi[Message, Message])(Keep.right)

This will give me a Flow that materializes a KillSwitch. However, I’ve realized that I can’t materialize the kill switch myself since the websocket directive provided by Akka would do that. Because this is the case, how can I get the Kill Switch?

The only idea I have is maybe somehow feeding the Flow into a sink that I can somehow call prematerialize() on which I could somehow manipulate to give me the KillSwitch but I’m not sure if that is the correct way to do this?

Can someone with more experience and knowledge help in explaining the correct way to acheive what I want?

Also wanted to say, that the reason I’m doing this the above way is because this is how it is mentioned on the Akka docs to do it (see my StackOverflow post for the docs I reference)

I’ve had to do something like this a few times, but a lot changes based on the expected use of your pattern, particularly with re-use concerns, as most Flows are intended to be materializable more than once

For a non-reusable Flow, the simplest approach is probably to create an external Promise for the KillSwitch, and use mapMaterializedValue to capture it during materialization.

Alternatively if it was being re-used but never at the same time, an AtomicReference could be used instead of the Promise.

When I faced multiple simultaneous materializations, I created a custom graph stage and acquired its ActorRef, sending that to an actor I’d created to track the state of the various streams. The stage’s actor reference could receive requests, and even support deathwatch to allow cleaning up the state of dead streams, avoiding infinite growth of my registry. This approach wouldn’t strictly need a custom stage, the previously mentioned mapMaterializedValue could be used to send the KillSwitch to an actor, and a watchTermination stage used to send the cleanup command.

The approach you take should ultimately be informed by the situation you’re in, and the complexity you’re willing to accept. I had additional uses for the custom stage mentioned above, so the ActorRef approach was suitable, at the time. When using an external actor as a collection point it may be advisable to add a watch stage, to kill the stream(s) should the external actor die.

1 Like

Thank you so much for this amazing reply!

Actually, I have been doing quite a bit of research for an answer to this question (more than necessary). On the Akka gitter channel, someone also suggested mapMaterializedValue() which I did not know existed. Using that seems to be a good way to do it in my use case where I want to persist the kill switch to a ConcurrentHashMap keyed to a user.

The concerns you speak of for multiple materialization also became apparent to me when I started implementing it in a proof of concept. What I’m doing now is that I’m returning the type Flow[Message, Message, NotUsed] to the handleWebSocket() directive while internally I use mapMaterializeValue() to add the kill switch mapping as a side effect.

Since, as you say, this is a non-reusable Flow, I am fine with your first suggestion. I don’t think I’ll need atomic references nor ActorRefs (I am trying to do this without bringing in actors). Thanks a lot.

I’ll be updating the SO post with a definitive answer once I finish my proof of concept.