.buffer operator in WebSocket flows

Hi!

I have tried to implement some WebSocket rate limiting functionality, where a fast upstream WebSocket is being limited by an Akka server to a steady outgoing message rate. I tried the following code:

val clientFlow = Http()
                    .webSocketClientFlow(wsRequest)
                    .buffer(1, OverflowStrategy.dropBuffer)
                    .throttle(1, 1.seconds) 

upgrade.handleMessages(clientFlow, protocols.headOption)

With that code I experienced massive hick-ups up to a point where almost no messages got to the downstream socket. After spending many hours of debugging (but learning much about Akka Streams), I think that I have found the problem. As .buffer simply discards elements when using one of the “dropping” overflow strategies, non-strict, i.e. stream-based, messages are not properly drained, which eventually clogs the upstream WebSocket. In similar situations in the future, tcpdump will definitely be my debugging tool of choice. It showed very clearly that the TCP receiver window shrinked more and more and was never able to recover.

I have solved the problem by replacing the buffer operator with a conflate that discards old messages in a safe way.

val bufferedFlow = clientFlow.conflate { (acc, el) => 
                    acc match { // drain streams
                      case TextMessage.Strict(_) => ()
                      case BinaryMessage.Strict(_) => ()
                      case m@TextMessage.Streamed(_) => m.textStream.runWith(Sink.ignore)
                      case m@BinaryMessage.Streamed(_) => m.dataStream.runWith(Sink.ignore)
                    }
                    el
}

Has my analysis been correct? And if it was, is there a better way to solve the problem? The code above definitely doesn’t look very elegant. If this is in fact a problem, maybe either the documentation of Akka Stream buffers or that of WebSocket could give a hint. Or is this behaviour obvious to the “common” Akka developer?

1 Like

Hi Michael,

the documentation page on the implications of streaming unfortunately does not mention WebSockets: https://doc.akka.io/docs/akka-http/current/implications-of-streaming-http-entity.html

Also the HttpResponse class has discardEntityBytes but that is not available on the WebSocket messages. I think having that on WebSocket messages should be done at least for symmetry.

Would you mind opening ticket in the akka-http issue tracker for this?