Hi!
I have tried to implement some WebSocket rate limiting functionality, where a fast upstream WebSocket is being limited by an Akka server to a steady outgoing message rate. I tried the following code:
val clientFlow = Http()
.webSocketClientFlow(wsRequest)
.buffer(1, OverflowStrategy.dropBuffer)
.throttle(1, 1.seconds)
upgrade.handleMessages(clientFlow, protocols.headOption)
With that code I experienced massive hick-ups up to a point where almost no messages got to the downstream socket. After spending many hours of debugging (but learning much about Akka Streams), I think that I have found the problem. As .buffer
simply discards elements when using one of the “dropping” overflow strategies, non-strict, i.e. stream-based, messages are not properly drained, which eventually clogs the upstream WebSocket. In similar situations in the future, tcpdump will definitely be my debugging tool of choice. It showed very clearly that the TCP receiver window shrinked more and more and was never able to recover.
I have solved the problem by replacing the buffer
operator with a conflate
that discards old messages in a safe way.
val bufferedFlow = clientFlow.conflate { (acc, el) =>
acc match { // drain streams
case TextMessage.Strict(_) => ()
case BinaryMessage.Strict(_) => ()
case m@TextMessage.Streamed(_) => m.textStream.runWith(Sink.ignore)
case m@BinaryMessage.Streamed(_) => m.dataStream.runWith(Sink.ignore)
}
el
}
Has my analysis been correct? And if it was, is there a better way to solve the problem? The code above definitely doesn’t look very elegant. If this is in fact a problem, maybe either the documentation of Akka Stream buffers or that of WebSocket could give a hint. Or is this behaviour obvious to the “common” Akka developer?