Question about stateful multiplexed protocols on top of akka-streams

I’m trying to implement a web socket GraphQL subscription service on top of akka-streams/akka-http, which should have a simple protocol: upon connection, clients send one or more subscribe(query) messages and get a stream of results in response, with the ability to add and remove subscriptions on the fly.

  1. a stateful protocol requires either an actor or a GraphStage[FlowShape[Message, Message]] to manage the state machine, with GraphStage solution being probably cleaner and safer
  2. this dynamic fan-in configuration suggests that I should try using MergeHub for merging all subscriptions into the socket output

My initial idea was pretty simple: code a GraphStage[FlowShape[Message, Message]] with an internal MergeHub, expose its outlet as stage’s and on incoming subscribe(query) messages just do dataSource.subscribe(query).runWith(mergehub). However, this turned out to be a non-starter for a very obvious reason: there’s no Sink to materialize the MergeHub with.
The only potentially working option I came up with is something like

    val protocolActor: ActorRef = ???
    upgradeToWebSocket.handleMessages(
      Flow.fromSinkAndSourceCoupled(Sink.actorRef(protocolActor, "complete"), MergeHub.source.mapMaterializedValue(protocolActor ! _))
    )

, but this is clearly a messy solution: passing socket handle as a message, actor’s unbounded message box, reduced type safety, having to manually manage protocolActor lifecycle etc etc. Is there any better way?

1 Like

I had a similar project and this resource was helpful:

The actor in that example can be expanded to implement the client side GraphQL subscription protocol.