stream.mapAsync(1) vs single threaded execution context with stream.mapAsync(16) to maintain ordering?


(Pritam Kadam) #1

Use case is that we want to publish events to redis and requirement is that events published from single publisher should be in order.

So to achieve this, we have a code like this for single publisher:
1.

source
.mapAsync(1)(publish)
.to(Sink.ignore)
.run()

2. But when i ran a benchmark with this code on single machine (8GB RAA, 8 vCPUS, MacOs) with 40 Publishers and 40 Subscribers, I got only 25k/sec throughput.

After little investigation, I thought of creating single threaded execution context per Publisher to maintain ordering and increased mapAsync to 8 and ran the same benchmark, in this case, I am getting around 45k/sec throughput and code looks like this:

implicit val synchronousEc = ExecutionContext.fromExecutor((task: Runnable) => task.run())
source
.mapAsync(8)(publish)
.to(Sink.ignore)
.run()

So the question is which one is the recommended way? What are pros and cons for both the approaches or is there any other way I can guarantee ordering?


(Pritam Kadam) #2

Any suggestions on this please?


(Johan Andrén) #3

The guarantee of ordering for mapAsync is only about the order the results are emitted downstream, so mapAsync(n) could lead to unexpected ordering of the things happening in publish (as it will happen concurrently for up to 8 invocations).

One thing to try out could be batching, for example through groupWithin and push batches of elements to the database per mapAsync operation (could help if crossing the async boundary is in fact dominating)