Most efficient way to process data with akka-steams

The issue is related to akka-streams project.

To begin with, I would like to describe a problem I m trying to solve: You have a lot of numbers and for each number, you should receive(the process takes some time) an additional information, approximately 1 MB in weight and then proceed it. We also may assume that the heap size is 512mb. How can you process all the numbers in the most efficient way?

To model this situation, here is example:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}

import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.{Random, Try}

object Hello extends App {

  implicit val as = ActorSystem()
  implicit val am = ActorMaterializer()

  import as.dispatcher

  Source(1 to 1000000)
    .mapAsync(100) { element =>
      val promise = Promise[Array[Byte]]()
      as.scheduler.scheduleOnce((ThreadLocalRandom.current().nextInt() % 2000 + 3000).millis) {
        promise.complete(Try {
          val arr = new Array[Byte](1024 * 1024)
          Random.nextBytes(arr)
          arr
        })
      }
      promise.future
    }
    .runWith(Sink.foreach((each: Array[Byte]) => {
      val hello = each.mkString(",").size
      println(hello)
    })
    )
}

In this case, the list of numbers is the list consisted of numbers from 1 to 1000000, and the process of receiving an additional information takes from 3 to 5 seconds. After receiving we do processing by printing a size of a string containing all bytes of information.

The main problem I faced in the example is limiting the number of units of additional information. How can I control it?

If I place .buffer(100, OverflowStrategy.backpressure) directly after the Source declaration, can I guarantee, that at the same time at any point there are will be only 100 units of additional information in memory?

How can you process all the numbers in the most efficient way?

This is relative :D What efficiency means you in that case?

I think your example must be good with the given parameters. The 100 after the map async means that 100 Future will be created and waiting in paralell (but 4-8-10-16 etc will be computed paralell based on the execution context and/or actor system config, theese 100 can be uncompleted or completed based on the compute time and the downstream demand). So in theory no more than 100mb of generated data can be in your memory.
From the mapAsync (maybe you can use mapAsyncUnordered too) to the println in theory always flows only one element at once.

I advise you to try with smaller things like:

Source.repeat(5).map{x=> println(x), x}.mapAsync(10)(Future(_)).delay(30.seconds).runWith(Sink.ignore)

Move the println to another places and see how the element flow speed works.

The buffer(100) will not do anything special bcs of the mapAsync(100), but if you move the buffer after the mapAsync it could do interesting things. (In worst case you will have 100 completed element in the async block and 100 waiting element in the buffer, which is 200 element.)

If your example run out of memory that could be some gc/mkString problem too.

What efficiency means you in that case?

I mean reaching maximum throughput.

If your example run out of memory that could be some gc/mkString problem too.

Yes, I run out of memory with a 512mb heap, but I don’t know why it is not working with buffer, I assumed that heap size should not exceed 200 mb. I m using mkstring in order model a processing stage.

Your problem is clearly the mkString!
With mkString:


(wtf I can only include 1 img/post…)

Without mkString:

mkString is just a way to model processing

Your problem is clearly the mkString!

It is not a probelm, in real code there are will another vert cost operation.

Okay I rephrase this. Not the stream leaks your memory. The mkString leaks it. Without the mkString the GC can keep up with the load. The stream creates the elements and the gc release them. When you make a string from an array it will be huge! (There will be a moment when you need to store about 4-8x the current element original size in the memory.) So without an mkstring you create 1mb of data and release it. With the mkString you make about 8-10mb garbage in the memory. With every element. So the GC cannot keep up properly I think.

You need to profile your app if you want to work in harsh environments!

Live memory snapshot at a random point in the running app:

Sampling every 10th object it statistically means 140Mb of byte arrays, 300Mb of strings and 786Mb of char arrays! So from the 1200Mb of used heap 1100Mb is used because of the mkString and the println!

So forget the strings or use bigger heapsize. It’s not an akka thing, it’s a jvm vs string characteristic.

As I said the problem is the mkString. If you replace it with something other more controlled memory thing it will be good.

(tried with .map(a => a ++ a ++ a ++ a)).runWith(Sink.foreach(println(_.size)) )

Thank you, it helps. Now I figured out.