Backpressure with streamed responses appears to not work in lagom 1.3.10

I think there is a bug in lagom websocket support.

I have this test service call to test behaviour of fast producer, slow consumer:

    @Override
    public ServiceCall<NotUsed, Source<String, NotUsed>> debugEcho0(Optional<String> message) {
        String s = message.orElse("Hello world!");
        return request -> CompletableFuture.completedFuture(
            Source.range(0, 100000000)
                .map(i -> {
                    String msg = s + i;
                    if (i % 10000 == 10) {
                        log.warn("Sending {}", msg);
                    }
                    return msg;
                }));
    }

I use a rxjava2-chirper-client (with removed gson deseralization) in a debugger to connect to the above endpoint and put a breakpoint on printout of the first message received. The breakpoint stops all threads in the client.

I can observe the above WARN logs continue get printed out indefinitely while the client is connected (and blocked) and I can observe the memory usage of the server grow indefinitely.

To me it seems lagom (or Netty) is buffering unsent messages without bounds instead of signalling backpressure to the Source.

Is the above expected behavior or a bug?

Can you specify what version you’re using? (Or better, offer a full test project to replicate the error?) I’ve had some interactions with the websocket interfaces, at least in 1.4, and they’ve all blocked appropriately.

This is lagom 1.3.10. I’ll try to create a reproducer.

we’re staying a bit behind until we get the release out the door, don’t want another moving piece right now.

server:

client:

Hmm, I can confirm that on my linux system that sample does seem to buffer indefinitely. I killed it when it had used up 2.4 gigabytes of ram.

Looking at your code I also can’t identify any reason why it would fail to backpressure appropriately, so I’m forced to agree that there’s apparently a problem further down. That’s outside my expertise but hopefully someone with more knowledge of that area will benefit from the samples too.

I upgraded the sample to lagom 1.4 and backpressure works correctly there

any chance to get this fixed for 1.3?

1 Like

That might be because IIRC akka-http became default in 1.4, if you force it on to netty is it still fixed?

TL;DR; it works with netty on 1.4, too

Good idea! I forced it to netty, as per migration guide and did a sbt clean runAll
got loads of eviction warnings, which I ignored and netty jars were loaded

[info] 	[SUCCESSFUL ] com.typesafe.netty#netty-reactive-streams-http;2.0.0!netty-reactive-streams-http.jar (482ms)
[info] 	[SUCCESSFUL ] com.typesafe.play#play-netty-server_2.11;2.6.12!play-netty-server_2.11.jar (635ms)
[info] 	[SUCCESSFUL ] io.netty#netty-transport-native-unix-common;4.1.22.Final!netty-transport-native-unix-common.jar (648ms)
[info] downloading https://nexus.digiverse.si/repository/nps-maven/io/netty/netty-transport-native-epoll/4.1.22.Final/netty-transport-native-epoll-4.1.22.Final-linux-x86_64.jar ...
[info] 	[SUCCESSFUL ] io.netty#netty-transport-native-epoll;4.1.22.Final!netty-transport-native-epoll.jar (530ms)

And the server is keeping reasonably ahead of the consumer

I’m afraid I’m out of useful contributions here, but I can offer one last suggestion that might help you draw more attention.

If it will let you, you might want to rename the topic-title to contain the version you’re targeting and that you’re hoping to get something backported, just to make it easier for the experts to identify.

I do think though that outside of major security issues the team is mostly focused on 1.4+, but of course I don’t speak for them so there’s always hope.

@bozzzzo,

I think that the problem is on the client your are using. Have you tried to use the Lagom Client instead?

I modified the hello-stream module in order to run a quick test. Here is what I did:

HelloStream.java

public interface HelloStream extends Service {

  ServiceCall<Source<String, NotUsed>, Source<String, NotUsed>> stream();

  ServiceCall<NotUsed, String> startEcho();

  @Override
  default Descriptor descriptor() {
    return named("hellostream")
            .withCalls(pathCall("/hellostream/echo", this::startEcho));
  }
}

HelloStreamImpl.java

@Override
    public ServiceCall<NotUsed, String> startEcho() {
        return notUser -> {
           return helloService
                    .debugEcho(Optional.of("lagom-client-renato-was-here-"))
                    .invoke()
                    .thenCompose(src -> {
                        return src.map(str -> {
                            System.out.println(str);
                            Thread.sleep(900);
                            return str;
                        }).runWith(Sink.last(), materializer);
                    }).toCompletableFuture();
        };

Please note that behind the scenes this call is translated to a websocket connection between the two services. The hello-stream service is calling the hello service via websocket.

If you try it, you will see that client (lagom client) controls the flow of data that is fetched from the server. The server won’t keep sending data. Some data will be sent to the cleint and the server will wait until the client requests more.

Have you checked if AutoManagedWebSocket is supporting back-pressure?

@octonato thanks for looking into this.

The issue for me is that I need this for an external API which means I cannot prescribe what client to use. Using the above (semi-arbitrarily chosen) client I observe growth of (lagom) server producing the data while the client is blocked in the debugger (all threads blocked). Is this a valid approximation of a slow consumer?

the fact that it works for you with a lagom client makes me wonder, does the client need to somehow participate in backpressure in lagom 1.3 (as I noted, it works flawlessly as you describe if I upgrade the server to lagom 1.4)?.

@bozzzzo, I looked it further and the problem is actually on the ServiceGateway in 1.3.10 that’s based on Netty. The server is not buffering, but the ServiceGateway is not propagating the back-pressure. The ServiceGateway is the one buffering actually. And that’s a bug.

The test that I run using the Lagom client worked out because I was using it from inside the hello-stream project and it binds the client using the internal url (localhost:52685). So, my test was flawed.

You can test it using your Rx client. Try to call the service on port 52685 instead of 9000. You will by-pass the gateway and you have back-pressure as expected.

In 1.3.7 (or higher), you can switch to a akka-http based ServiceGateway by adding the following to your build:

lagomServiceGatewayImpl in ThisBuild := "akka-http"

(more info: https://www.lagomframework.com/blog/lagom-1-3-7.html)

If you enable the akka-http gateway like explained above, it will also work when calling on port 9000.

However, I really recommend you to move to Lagom 1.4.2. There are some important bug fixes in Lagom, Play and Akka-Http that you get if you switch to 1.4.2. Because akka-http in Lagom 1.3.x was experimental, we won’t be back-porting those fixes into 1.3.x branch.

@octonato Thanks for getting to the bottom of this. I was indeed using the gateway, mostly out of laziness to avoid needing to hunt for the port in the log output… From your comment is seems the port numbers are less random than they appear?

I’ll re-test with direct connection to the service.

As for upgrade to 1.4 it turned out it’s not completely smooth sailing for us, so we put it on hold for a while. As soon as we catch a bit of breath I’ll happily start a new topic with upgrade issues :slight_smile:

You are welcome.

It’s correct to use the gateway, it’s not a lazy thing. ;-)
In prod you will proabably have some sort of gateway as well.

Service port assignment in dev mode is a hash function of the service name so the port stays stable as long as the name stays stable.

You can setup the port range to be used for that assignment.

You can also override that logic and hardcode what port to be used for each service.

All details at: https://www.lagomframework.com/documentation/1.4.x/java/ServicePort.html#How-are-ports-assigned-to-services

Cheers,