Akka Stream: initial backpressure - throttle on source has no effect

Hi,

Context:
We use Alpakka Google PubSub GRPC to subscribe to messages from PubSub over GPRC. This is a Streaming subscription which receives messages in a stream.

The problem:
When the service first starts up and connects to PubSub, there is no backpressure yet, so Alpakka/Source starts to pull a ton of messages from PubSub, which we can’t respond to in a timely manner, so they are left hanging in a buffer/queue that takes too long to be processed. Since this is on PubSub, it means that some of the messages which have not been discarded, but rather buffered, are going to be sent to the service again by PubSub (that’s how retries work on PubSub: you get a “loan” on each message you receive, but if you don’t “acknowledge” the message in a specified period, the message is retried). Now since the service already has a queue full of messages to process, the retries will again be in the same state and the cycle continues on and our pubsub queue doesn’t drain. Horizontally scaling the service only makes this worse, each replica will fetch a ton of messages but now messages are sent between services and the effect is amplified.

Backpressure seems to work after that because after a short period of time passes after the startup, the number of requested messages from PubSub decreases significantly (and it matches our expectations of the throughput of our system). But, we have been looking for ways to avoid that initial spike in messages.

The spike is about a rate of ~3000-4000/s in the first 2-3 minutes, and then it goes down to 200/s. This is what it looks like when the service is restarted (or the connection to pubsub is lost and and a new connection is made):
image

We tried these things:

Using .throttle on the source, or even a ticking .zip on the source to slow down consumption: doesn’t work
Using .buffer(1, OverflowStrategy.backpressure) on the source: doesn’t work
Decreasing values of these configurations (basically reducing buffer capacity and output burst limit significantly):

     initial-input-buffer-size = 1
     max-input-buffer-size = 2
     output-burst-limit = 50

When I say “doesn’t work” I mean it does not reduce the initial spike, although the effect on the pipeline is visible for all these cases: a significant throttle of 1 per 1 second is visible in the processing pipeline and after the initial spike, no more than one item is received from pubsub per second, but the spike in the beginning is left untouched!

We would really appreciate it if someone could help us understand how can we apply a limit/throttling on the source or somehow overcome this issue.
Thank you!

Worth mentioning of the versions we use:

  "com.lightbend.akka" %% "akka-stream-alpakka-hbase" % "1.1.0",
  "com.lightbend.akka" %% "akka-stream-alpakka-google-cloud-pub-sub-grpc" % "1.1.0",

Thanks for the question, @mdibaiee.

Why is there no backpressure yet? And where is the buffer/queue? Is that something in your code or part of the alpakka connector?

Hey @jrudolph, thanks for the response!

Why is there no backpressure yet?

The server has just started, so I suppose there is no data yet about how much downstream can handle, hence no back pressure? So I’m not sure how the initial demand from downstream looks like. It’s something I suspect, not necessarily reflective of how Akka works under the hood because I don’t know how it works.

And where is the buffer/queue? Is that something in your code or part of the alpakka connector?

This is also something we suspect Akka is handling on its own. We have not explicitly defined any buffer / queue.

I turned on FINE logging to be sure Akka is indeed “receiving” those messages, and yes, the logs show a spike in ReceivedMessages from PubSub in the beginning, even though downstream consumes those events slowly and never catches up. After a few minutes the amount of ReceivedMessages drops to a low rate, almost on-par with downstream, but there is already a ton of them waiting to be processed.

So if Akka is receiving all these messages, but they are being processed very slowly downstream, I suspect Akka has a kind of buffer/queue where it holds all those messages in.

Hi, I;m working with @mdibaiee

This is how we setup the source:

val source: Source[ReceivedMessage, Future[Cancellable]] = GooglePubSub.subscribe(request, 1.seconds)

And these are the settings:

 stream.materializer {
    # Initial size of buffers used in stream elements
    initial-input-buffer-size = 4
    # Maximum size of buffers used in stream elements
    max-input-buffer-size = 16
  }

Thanks for that information.

In general, akka-stream applies backpressure from start to end. But as you say, buffers might introduce unintended delays. So, it goes somewhat against the intention of akka-stream / alpakka to introduce extra buffers/queues so if the behavior should be caused by alpakka google pubsub, I’d say it’s a bug. At this point, it is hard to say whether the behavior is caused by alpakka or your stream setup. It would be best if you could provide a standalone example that would showcase what you are seeing.

Johannes

One tricky thing with Akka gRPC (if that’s the one you are using) is that the underlying Netty client has quite large buffers internally, see https://github.com/akka/akka-grpc/issues/501
I wonder if that can explain what you are seeing perhaps?

1 Like

Great observation.

Google PubSub has even documented that fact here: https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages

I wonder if there might be an alternative implementation that doesn’t rely on transport level buffers for backpressure. Created a ticket to discuss that here: https://github.com/akka/alpakka/issues/2141

1 Like