Backpressure with Actors

Hi there, I am having trouble wrapping my head around the various actorRefWithBackpressure options.

Let’s say I have the following:

  1. A fast source
  2. An actor that consumes and processes that source but is somewhat slow.

Is the following the correct way to hook these up? And if so, where am I supposed to be emitting and/or handling the Ack message? Is this somehow wired up by this construction:

fastSource.runWith(ActorRef.actorRefWithBackpressure(slowActor???, Ack, …))

I have read all the online docs, but they seem to be a few steps short of what I need to really grok what is going on, so any more extensive examples would be much appreciated.

Thanks.

Hi @sosborn-trumid,

Can I ask for the use case. In Akka Streams Source converts the data into streams, flow transforms that data and allows you to materialize it and Sink is the consumer of that data. If you want to lets say send data from a source to an Actor. You can use Tell (!) pattern to achieve it as Sink is consuming it.

In Sink you can apply foreach and send each data unit as a message.
[Sink.foreach[Int]( x => Actor ! newNumber(x))
If you want to ensure that the data is sent to the Sink only when its available - check out Conflate in Flow that acts like fold operator but emits the result only when downstream is ready.

Thanks, yes. I was going down the conflate route, but I still could not see how to trigger conflating because it needs some sort of backpressure signal to kick in, and where is that coming from, since the consumer is an Actor that just has a mailbox accepting all messages.

My use case is a little more complicated. It is Source → Snapshotting Actor → Processing Actor, where the processing actor is the potentially slow bit.

But at this point, I’m just trying to understand the mechanics of applying backpressure from an actor to a source and finding the documentation somewhat confusing. ActorSink.actorRefWithBackpressure seems like the right bridge, but I’m still unclear how it all goes together and whether or not I need to be sending Ack messages to someone.

If I have SlowActor[M] where M is my trait MyMessage, it seems like I need to make the protocol messages (Init, etc.) implement MyMessage. Is that correct?

I see try this sink example.

import akka.stream.typed.scaladsl.ActorSink

val sink = ActorSink.actorRefWithBackpressure(
ref = myActor,
onInitMessage = (actorRef: ActorRef[String]) => “Stream started”,
ackMessage = “Ack”,
onCompleteMessage = “Stream completed”,
onFailureMessage = (ex: Throwable) => ex.getMessage
)

source.runWith(sink)

Replace ackMessage with the acknowledge message .

Hope this helps!

@sosborn-trumid Yes, you’re on the right track. The ActorSink.actorRefWithBackpressure method is designed to handle situations like this, where you have a fast producer (source) and a slower consumer (actor in this case).

Here is a demo actor

// Define the messages
case class StreamElement(value: Int)
case object Ack

// Define the actor
class SlowActor extends Actor {
def receive: Receive = {
case StreamElement(value) =>
println(s"Processing $value")
Thread.sleep(1000) // Simulate slow processing
sender() ! Ack // Send acknowledgement back to the stream
}
}

For an ActorSink.actorRefWithBackpressure, the type parameters are T, M, and A. In this case, M will be MyMessage, T is whatever type your stream is feeding into the sink, and A is the ack-message, which signals the sink to stop backpressuring (viz. that the actor the sink is sending to is ready to handle a new message).

So whatever message is constructed as the init-message, the complete-message, and the failure-message (as well as whatever message is constructed in the messageBuilder) would have to be a MyMessage.

The ack-message does not have be a MyMessage (A can be literally any type): the only effective requirement (not captured in the type system, alas) is that whatever message is constructed in messageBuilder has to have a field of type A. A reasonable convention to follow in the absence of any further requirement is that this field will be named replyTo and have the type akka.Done.

Thank you both - this has been super helpful. I think I’ve got a handle on it now. Cheers!

Cheers! Happy coding!

Couple more small follow-up items.

  1. Am I supposed to send an Ack in response to the Init message, or just in response to the forwarded Messages?
  2. I have noticed, at least in my test scenerios, that I can sometimes receive the first Message before the Init message. Is that supposed to be able to happen?

The stream will not start passing actual elements until it gets an ack for the init message.

If you get messages before that it would be a bug, if you can make a minimal reproducer we’d be interested in seeing that (you can see the test coverage in the Akka test suite for that specific case here).

Thank you. Turns out the the stray message I was seeing was coming from somewhere else, so all is right with the world, and it behaves as you say.