I think there is a bug in lagom websocket support.
I have this test service call to test behaviour of fast producer, slow consumer:
@Override
public ServiceCall<NotUsed, Source<String, NotUsed>> debugEcho0(Optional<String> message) {
String s = message.orElse("Hello world!");
return request -> CompletableFuture.completedFuture(
Source.range(0, 100000000)
.map(i -> {
String msg = s + i;
if (i % 10000 == 10) {
log.warn("Sending {}", msg);
}
return msg;
}));
}
I use a rxjava2-chirper-client (with removed gson deseralization) in a debugger to connect to the above endpoint and put a breakpoint on printout of the first message received. The breakpoint stops all threads in the client.
I can observe the above WARN logs continue get printed out indefinitely while the client is connected (and blocked) and I can observe the memory usage of the server grow indefinitely.
To me it seems lagom (or Netty) is buffering unsent messages without bounds instead of signalling backpressure to the Source.
Can you specify what version you’re using? (Or better, offer a full test project to replicate the error?) I’ve had some interactions with the websocket interfaces, at least in 1.4, and they’ve all blocked appropriately.
Hmm, I can confirm that on my linux system that sample does seem to buffer indefinitely. I killed it when it had used up 2.4 gigabytes of ram.
Looking at your code I also can’t identify any reason why it would fail to backpressure appropriately, so I’m forced to agree that there’s apparently a problem further down. That’s outside my expertise but hopefully someone with more knowledge of that area will benefit from the samples too.
Good idea! I forced it to netty, as per migration guide and did a sbt clean runAll
got loads of eviction warnings, which I ignored and netty jars were loaded
I’m afraid I’m out of useful contributions here, but I can offer one last suggestion that might help you draw more attention.
If it will let you, you might want to rename the topic-title to contain the version you’re targeting and that you’re hoping to get something backported, just to make it easier for the experts to identify.
I do think though that outside of major security issues the team is mostly focused on 1.4+, but of course I don’t speak for them so there’s always hope.
Please note that behind the scenes this call is translated to a websocket connection between the two services. The hello-stream service is calling the hello service via websocket.
If you try it, you will see that client (lagom client) controls the flow of data that is fetched from the server. The server won’t keep sending data. Some data will be sent to the cleint and the server will wait until the client requests more.
Have you checked if AutoManagedWebSocket is supporting back-pressure?
The issue for me is that I need this for an external API which means I cannot prescribe what client to use. Using the above (semi-arbitrarily chosen) client I observe growth of (lagom) server producing the data while the client is blocked in the debugger (all threads blocked). Is this a valid approximation of a slow consumer?
the fact that it works for you with a lagom client makes me wonder, does the client need to somehow participate in backpressure in lagom 1.3 (as I noted, it works flawlessly as you describe if I upgrade the server to lagom 1.4)?.
@bozzzzo, I looked it further and the problem is actually on the ServiceGateway in 1.3.10 that’s based on Netty. The server is not buffering, but the ServiceGateway is not propagating the back-pressure. The ServiceGateway is the one buffering actually. And that’s a bug.
The test that I run using the Lagom client worked out because I was using it from inside the hello-stream project and it binds the client using the internal url (localhost:52685). So, my test was flawed.
You can test it using your Rx client. Try to call the service on port 52685 instead of 9000. You will by-pass the gateway and you have back-pressure as expected.
In 1.3.7 (or higher), you can switch to a akka-http based ServiceGateway by adding the following to your build:
lagomServiceGatewayImpl in ThisBuild := "akka-http"
If you enable the akka-http gateway like explained above, it will also work when calling on port 9000.
However, I really recommend you to move to Lagom 1.4.2. There are some important bug fixes in Lagom, Play and Akka-Http that you get if you switch to 1.4.2. Because akka-http in Lagom 1.3.x was experimental, we won’t be back-porting those fixes into 1.3.x branch.
@octonato Thanks for getting to the bottom of this. I was indeed using the gateway, mostly out of laziness to avoid needing to hunt for the port in the log output… From your comment is seems the port numbers are less random than they appear?
I’ll re-test with direct connection to the service.
As for upgrade to 1.4 it turned out it’s not completely smooth sailing for us, so we put it on hold for a while. As soon as we catch a bit of breath I’ll happily start a new topic with upgrade issues