Solved: Akka HTTP Websocket session context and Alpakka RabbitMQ

alpakka

(Rowland Watkins) #1

Hi folks,

So I can successfully forward messages from an Akka HTTP websocket -> rabbitmq and back. However, I appear to be having some issues when I have multiple clients. Here is the Flow I have:

WS client --> Akka HTTP Websocket Source -> Flow[Message] (map to ByteString) -> RabbitMQ sink (queue1) --> RabbitMQ source --> Flow[IncomingMessage] --> mapAsync(serverActor) --> RabbitMQ sink (queue2) --> Flow[ByteString] (map to TextMessage) --> WS client

Rather long winded, but I basically have a stretched out Websockets flow with two separate rabbitmq queues. The question I have is how can I correlate so that the WS client id is the same? It seems that the return leg via RabbitMQ back to the websocket doesn’t always know which client it is connected to, so either client might receive the message.

Any tips would be most appreciated!

Cheers,

Rowland


(Rowland Watkins) #2

Here is a diagram showing how all the sources and sinks fit together as reference:


(Rowland Watkins) #3

Hi all,

So I’ve simplified the Flow implementation as follows:

val rabbitMqSink: Graph[SinkShape[ByteString], NotUsed] = GraphDSL.create() { implicit builder =>

    val flow = builder.add {
      RabbitMQFlows.amqpSink
    }
    SinkShape(flow.in)
  }

  def wsResponseSource: Graph[SourceShape[IncomingMessage], NotUsed] = GraphDSL.create() { implicit builder =>

      val flow = builder.add {
        RabbitMQFlows.source
      }
      SourceShape(flow.out)
    }  

  val rabbitMqFlow: Flow[ByteString, IncomingMessage, NotUsed] = {
        val in =
          Flow[ByteString]
            .to(rabbitMqSink)

        val out = wsResponseSource

    Flow.fromSinkAndSource(in, out)
  }

  val wsViaFlow: Flow[Message, Message, NotUsed] = {
    Flow[Message]
    .collect {
      case TextMessage.Strict(msg) => ByteString(msg)
    }
    .via(rabbitMqFlow)
    .map {
      case msg: IncomingMessage =>
        TextMessage.Strict(msg.bytes.utf8String)
    }
  }

During testing, when there is more than one client, reply messages get returned to the last client to connect, then subsequent reply messages alternate in a round-robin fashion, which doesn’t seem right. It seems that extending the flow via RabbitMQ does some damage to the WS connection, so the inbound Flow doesn’t know which client to send the message to.

Is there a way to list connected clients server side? All tips appreciated!

Cheers.

Rowland


(Rowland Watkins) #4

Hi folks,

OK, so I figured this out, or at least I have a usable work around: Use the Alpakka AMQP RPC Flow; this creates a temp queue, per client, which helps with correlation. Server-side needs an AmqpSink.replyTo, and you set the message properties from the original IncomingMessage to the OutgoingMessage. And all is well.

Cheers,

Rowland