I’m developing a akka-streams(the latest release from v2.6.x) and kafka based system where I’ve got to perform a very slow(network) call for each event passed into the system.
I’ve designed the system so that I group these messages(groupedWithin), then execute a number of them in parallel using this homegrown operator:
import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source}
/**
* Given a list, I apply the passed flow to each element as part of a batch, then collect the results into a list which is forwarded on. Elements in the output list can be out of order.
* If any incoming element of a list fails, it fails the whole batch
*/
object ListBatch {
def apply[V, OUT](each: Flow[V, OUT, NotUsed]): Flow[List[V], List[OUT], NotUsed] =
Flow.apply[List[V]]
.flatMapConcat(list => {
Source.apply(list)
.flatMapMerge(list.size, v =>
Source.single(v).via(each)
)
.fold(List[OUT]())((l,t)=> l.appended(t))
})
Is this optimal? Would doing this work like this have the same, better or worse runtime performance than an implementation based on mapAsync or mapAsyncUnordered?
thanks