Why is my source producing more than the buffer size?

scala
streams

(Elijah Rippeth) #1

Adapter from this StackOverflow question:

I’m new to Akka streams. I’m trying to create a simple distributed worker example to perform an arbitrary job. I have created a random job generator:

import java.util.UUID

import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStage, GraphStageLogic, OutHandler}

final case class Job(id: UUID)

class RandomJobSource extends GraphStage[SourceShape[Job]] {
  final val out: Outlet[Job] = Outlet.create("RandomJobSource.out")

  final val shape = SourceShape.of(out)

  override type Shape = SourceShape[Job]

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        push(out, Job(id = UUID.randomUUID()))
      }
    })

  }
}

I can create an infinite stream of Jobs with:

val jobs = Source.fromGraph(new RandomJobSource)

Now to work off the stream, I create a balancer and worker:

  def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b ⇒
      val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
      val merge = b.add(Merge[Out](workerCount))

      for (_ ← 1 to workerCount) {
        // for each worker, add an edge from the balancer to the worker, then wire
        // it to the merge element
        balancer ~> worker.async ~> merge
      }

      FlowShape(balancer.in, merge.out)
    })
  }

  def worker[In, Out](f: In => Out): Flow[In, Out, NotUsed] = Flow.fromFunction(f)

  private def work(job: Job): Unit = {
    println(s"Doing job ${job.id}...")
    Thread.sleep(5000 + Random.nextInt(1000))
  }

Finally, I distribute the work. To put it all together:

import akka.stream._
import akka.stream.scaladsl._
import akka.{Done, NotUsed}
import akka.actor.ActorSystem

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random

object ExampleMain extends App {

  def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b ⇒
      val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
      val merge = b.add(Merge[Out](workerCount))

      for (_ ← 1 to workerCount) {
        // for each worker, add an edge from the balancer to the worker, then wire
        // it to the merge element
        balancer ~> worker.async ~> merge
      }

      FlowShape(balancer.in, merge.out)
    })
  }

  def worker[In, Out](f: In => Out): Flow[In, Out, NotUsed] = Flow.fromFunction(f)

  private def work(job: Job): Unit = {
    println(s"Doing job ${job.id}...")
    Thread.sleep(5000 + Random.nextInt(1000))
  }

  implicit val system: ActorSystem = ActorSystem("QuickStart")
  implicit val mat: Materializer = ActorMaterializer()
  implicit val ec: ExecutionContext = system.dispatcher

  val jobs = Source.fromGraph(new RandomJobSource)

  val res: Future[Done] = jobs
    .withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
    // Work serially to avoid under-working priority jobs
    .buffer(1, OverflowStrategy.backpressure)
    .wireTap(job => println(s"Next job: ${job.id}"))   
    // Put work in a balancer with 5 workers.
    .via(balancer(worker(work), 5))
    .runWith(Sink.ignore)

  res.onComplete(_ => system.terminate())

}

I expect that because there’s a buffer size of 1, whenever a worker finishes its work, a new message will be pulled off the source (and as a consequence, I’ll see the "Next job: ${job.id}" line).

What I see instead if that my first 5 workers work and some relatively large number of "Next job: ${job.id}" lines are printed to the console:

Next job: 4f33258a-cd0f-4d54-b782-4ae97c67f125
Next job: 363a2b95-7cfa-4e8a-b683-17d2fccf48c5
Next job: 82a8ca64-1d23-474e-afc4-9b6d69a7f842
Doing job 363a2b95-7cfa-4e8a-b683-17d2fccf48c5...
Next job: d01faedf-65c3-4b88-83c2-fd0ce98607fe
Next job: 67f24856-a843-4df5-a87c-61f46b1f7141
Doing job 82a8ca64-1d23-474e-afc4-9b6d69a7f842...
Doing job 4f33258a-cd0f-4d54-b782-4ae97c67f125...
Doing job d01faedf-65c3-4b88-83c2-fd0ce98607fe...
Next job: 28b62312-a084-428f-ab61-d2426a242e52
Doing job 67f24856-a843-4df5-a87c-61f46b1f7141...
Next job: 80f9cb48-7447-494d-a1d9-e091049e7822
Next job: 38d91ef0-cc90-4bb8-a1f2-4aedfc5f655f
Next job: 546566b7-f7b6-4ed3-84c9-ddc8eb19c65a
Next job: 1a3f8f0c-be5a-4f14-a054-9f7fcd8a356a
Next job: 8f425329-2ac6-4dd0-a7b8-e77345ba125f
Next job: d1d23777-6bf8-4c91-a61d-7bc60f97c725
Next job: 628ddabe-25a1-47b0-9989-f963d6394f64
Next job: 6291e210-5228-4175-9b18-76a35d82dff0
Next job: 8c6344ad-b6f5-43a0-b709-4ddd2d080982
Next job: dd917c8d-ff12-44e2-81a7-98f66f55170c
Next job: 65449eb7-ea9d-4e29-b11f-d37190bb0001
Next job: 5649871d-a4bd-4edc-8c62-167f1da41786
Next job: af3d20cb-2e48-4668-bf42-3fabbd4cffad
Next job: 0f5c9ec2-3b69-4e91-8d87-89b0c1580d8a
Next job: 43f12408-a02c-4ac6-8564-a2ecc96155ac
Next job: 20951e30-1801-4d78-9683-dc0f45dc61e6
Next job: a95c94b9-a9ca-49b6-8ff4-eef81dcb13cd
Next job: 76107909-6f2f-4035-b34f-6c0b10a79d76
Next job: d4551999-f7fb-4bf6-9da9-3e18b8dfc4da
Next job: 2df2ceee-953f-44c8-8169-91152c0a672a
Next job: 7425e65a-89fb-45d1-9514-54b2af3c9791
Next job: f986a2a0-a82c-4fbc-af2c-f5439856bba2
Next job: ac165054-8718-4eb8-9161-40738c8b67b0
Next job: 27ea10d9-34eb-4989-9d5d-c67118eaf41e
Next job: e587fa05-2bc3-4901-8ed0-fe1efa912d3e
Next job: 77ee6cb0-77e1-4f2b-8d81-d57907755832
Next job: 9224f6eb-c81d-4bd5-b7f1-5bb09cc4e97e
...

Surely, these jobs are worked off in batches. Once they are, this pattern repeats. Why aren’t the messages being pulled off as the work is being finished, though?


(Elijah Rippeth) #2

By default, when workers are surrounded by an async boundary, they implicitly gain a buffer with a max size of 16. Thus, the five workers will buffer 5*16=80 total jobs. In my case, this is undesireable, so I simply add an attribute to the worker for an input buffer size of 1:

  def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
    import GraphDSL.Implicits._

    Flow.fromGraph(GraphDSL.create() { implicit b ⇒
      val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
      val merge = b.add(Merge[Out](workerCount))

      for (_ ← 1 to workerCount) {
        // for each worker, add an edge from the balancer to the worker, then wire
        // it to the merge element
        balancer ~> worker.async.addAttributes(Attributes.inputBuffer(1, 1)) ~> merge
      }

      FlowShape(balancer.in, merge.out)
    })
  }

Unfortunately, these workers can buffer no fewer than one message, so my console output yields the slightly incorrect:

Next job: 38c6424c-a943-48bb-af9c-e6b488973781
Doing job 38c6424c-a943-48bb-af9c-e6b488973781...
Next job: ff5b9e0f-40c3-4e6b-ab60-89d349b5ba0f
Doing job ff5b9e0f-40c3-4e6b-ab60-89d349b5ba0f...
Next job: 0d260b3b-61b4-41f4-a67d-bac3190e257d
Doing job 0d260b3b-61b4-41f4-a67d-bac3190e257d...
Next job: 80a743a1-9c6c-4486-b836-c102a11f5503
Doing job 80a743a1-9c6c-4486-b836-c102a11f5503...
Next job: a10bb61b-865f-4252-96a6-3da9413c77fb
Doing job a10bb61b-865f-4252-96a6-3da9413c77fb...
Next job: 3a7f8aeb-a6bf-4caa-ae9b-619fb49442ca
Next job: 209a1b78-f66a-4a54-91f1-93aaadb10522
Next job: a56e02e5-5e7c-4505-ae44-8a9bb68d5fc6
Next job: e218fd63-6ac2-45f5-9837-ce7bcc3c1930