Websocket actor interop (receive and publish)

After implementing successfully my websocket flow to interact with a typed Actor thanks to the ActorFlow.ask API my websocket is able to forward incoming messages to the actor and be answered straight away (with a Source of messages as it might need this in some cases to ask for more data to the other end).
I’d like to know if it’d be possible/advised to also use this actor to publish deliberately to the websocket when needed (receiving a message from elsewhere) hence behaving like an ActorSource
I had a look at this ActorSource.actorRef but it requires to materialize the Source so that isn’t really applicable.
Started looking into PubSub API too.
Here is my flow:

val flow: Flow[Message, Message, NotUsed] = Flow[Message]
    .mapAsync(1)(toRunnerMessage)
    .via(
      // Defaults to parallelism of 2 messages in flight, since while one ask message may be being worked on, the second one still be in the mailbox,
      // so defaulting to sending the second one a bit earlier than when first ask has replied maintains a slightly healthier throughput.
      ActorFlow.ask[RunnerMessage, RunnerActorMessage, Source[RunnerMessage, NotUsed]](parallelism = 2)(systemActor)(makeMessage =
        (el, replyTo: ActorRef[Source[RunnerMessage, NotUsed]]) => RunnerActorMessage(el, replyTo)
      )
    )
    .flatMapConcat(identity)
    .map(commandToMessage)
    .via(reportErrorsFlow)
1 Like

I wrote this blog article back in 2016, so only using classic actor APIs but may still be useful in that it shows websockets + Flow.fromSinkAndSource to create interaction with an actor that does not require request-response type of messaging but the actor can send messages to the websocket whenever it wants.

1 Like

thanks for pointing that.
I figured out something like this:

val flow: Flow[Message, Message, NotUsed] = Flow[Message]
    .mapAsync(1)(toRunnerMessage)
    .map { message =>
      val (replyTo, source) = ActorSource
        .actorRef[RunnerMessage](
          completionMatcher = { case RunnerCompleted => logger.info("websocket source RunnerStateActor completed") },
          failureMatcher = { case RunnerFailed(error) => new Exception(error) },
          bufferSize = 8,
          overflowStrategy = OverflowStrategy.fail
        )
        .preMaterialize()
      systemActor ! RunnerActorMessage(runnerMessage = message, replyTo = replyTo)
      source
    }

the main systemActor gets a reference to the source to reply at some point in time.
What do you think? Could it induce some bottleneck somewhere?

This means that for each incoming message you materialize a source to receive replies to (and probably flatMapConcat/merge rather than map as you wrote in the snippet). I can’t say for your specific use case what’s best, you may want to correlate incoming web socket messages with 1:N replies and therefore need to do it like this.

But, in general it would be more efficient to only create one such outgoing stream on web socket connect, send a message with that and let the actor hold on to it for sending messages back.

Such an initial message can be done in a Flow.fromMaterializer that then uses Flow.fromSinkAndSource as the actual flow, something like this:

 handleWebSocketMessages(Flow.fromMaterializer { (_, _) =>
    val fromActor = ActorSource.actorRef[RunnerMessage](
      completionMatcher = { case RunnerCompleted => logger.info("websocket source RunnerStateActor completed") },
      failureMatcher = { case RunnerFailed(error) => new Exception(error) },
      bufferSize = 8,
      overflowStrategy = OverflowStrategy.fail
    ).mapMaterializedValue(sourceActorRef =>
      systemActor ! WsStreamStarted(sourceActorRef)
      sourceActorRef  
    ).map(runnerMessage => ...to ws message...)

    val toActor = Flow[Message].map(wsMessage => ... to something the actor accepts ...)
      .to(ActorSink.actorRef(systemActor, WsStreamInComplete, _ => WsStreamInFailed))
    Flow.fromSinkAndSource(toActor, fromActor)
  })

This would also avoid preMaterialise which adds an additional buffer in-between stream operators.

1 Like

Thanks, I will try this implementation.

As you mentionned, in the case I need to have a 1:N relationship: meaning that the systemActor handling messages has sometimes to reply a stream of messages (i.e. chunks of a file)
ActorSource.actorRef[Source[RunnerMessage]]
would that be semantically possible?

Yeah, sure, you could have the same single flow, but make the actor stateful in that it keeps track of a request which it will send several responses for, and just never send messages to a stream out without that as a trigger, the responses can include the entire request or some correlation id in the responses if you need to correlate with the request somehow when you create the outgoing WS Message instances…

You could also have a reply-to with a stream and flatMapConcat such responses into the response stream like you suggest. That would be slightly more costly, the stream allocation and materialisation for each request, but that may not matter and it might be more clear and require less state tracking in the actor.

1 Like

in the end I came up with this Flow that makes use of fromMaterializer and instanciates my main state actor as well as gives it a reference to the Source actor.
I’ll handle the case of multiple file chunks directly inside another Actor that will publish those sequentially and lazily.
There’s still the notion of backpressure not handled by the state actor I believe but not sure if that is necessary atm.

val wsFlow: Flow[Message, Message, NotUsed] = Flow[Message]
    .mapAsync(1)(toRunnerMessage)
    .via(Flow.fromMaterializer { (_, _) =>
      val (replyTo: ActorRef[RunnerMessage], fromActor: Source[RunnerMessage, NotUsed]) = ActorSource
        .actorRef[RunnerMessage](
          completionMatcher = {
            case RunnerCompleted() => logger.info("websocket source RunnerStateActor completed")
          },
          failureMatcher = {
            case RunnerFailed(error) => new Exception(error)
          },
          bufferSize = 8,
          overflowStrategy = OverflowStrategy.fail
        )
        .preMaterialize()

      val stateActor: ActorSystem[RunnerMessage] = ActorSystem(actors.RunnerStateActor(config, jobRunnerSrv, replyTo), "runnerStateActor")

      val toActor = Flow[RunnerMessage]
        .to(ActorSink.actorRef[RunnerMessage](stateActor, RunnerCompleted(), e => RunnerFailed(e.getMessage)))

      Flow.fromSinkAndSource(toActor, fromActor)
    })
    .map(runnerMessageToMessage)
    .via(reportErrorsFlow)