Backpressure between Lagom services using streaming ServiceCall not working - Lagom 1.5.4

Lagom: 1.5.4

Consider having a ServiceCall (example):

def stream: ServiceCall[NotUsed, Source[Int, NotUsed]] = ServiceCall { _ =>
  Future.successful(
    Source(1.to(1000)).wireTap(msg => log.info(s"sending $msg"))
  )
}

When another service (example) is consuming this ServiceCall by e.g.:

val ticker = Source.tick(1.second, 100.millis, true)
helloWorldStreamService.stream.invoke().flatMap(_.zip(ticker).map {
  case (msg, _) =>
    log.info(s"received $msg")
    msg
}.runWith(Sink.seq))

You would expect the artificially slow consumer would slow down the producer. Looking at the logs this doesn’t seem to be the case:

sending 1
sending 2
sending 3
[...]
sending 1000

[1 second pause]

received 1
[100ms pause]
received 2
[100ms pause]
received 3
[...]

Am I missing any hidden buffers?

Example code: https://github.com/an-tex/lagom-backpressure

run sbt runAll

and then execute curl 127.0.0.1:[port of hello-world-stream-client service]/api/test

to see the effect.

Note: I’ve posted this already on StackOverflow but didn’t get any replies so I’m trying my luck here.

Cheers
Andreas

I’ve lifted the new user restrictions for you, so you can go back and edit your post to add more links if you’d like.

1 Like

To answer your question, in this case the hidden buffers are probably the operating system level TCP buffers. Streaming service calls are handled via HTTP and the WebSocket protocol, so you’ll get back pressure when the sending service is unable to write any more data to the socket. You will probably need to use a much larger test before you’ll hit these limits.

Hi Tim,

thanks for your reply and you’re right. I’ve increased the test size and it seems there’s a 512kb burst and a general 128kb system buffer (mac os).
Well, almost too obvious ;)

I’ve updated the github repo nevertheless.