Executing a flow on each element in a list as part of a batch operation

I’m developing a akka-streams(the latest release from v2.6.x) and kafka based system where I’ve got to perform a very slow(network) call for each event passed into the system.

I’ve designed the system so that I group these messages(groupedWithin), then execute a number of them in parallel using this homegrown operator:

import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source}

/**
 * Given a list, I apply the passed flow to each element as part of a batch, then collect the results into a list which is forwarded on.  Elements in the output list can be out of order.  
 * If any incoming element of a list fails, it fails the whole batch
 */
object ListBatch {

  def apply[V, OUT](each: Flow[V, OUT, NotUsed]): Flow[List[V], List[OUT], NotUsed] =
    Flow.apply[List[V]]
      .flatMapConcat(list => {

          Source.apply(list)
            .flatMapMerge(list.size, v =>
              Source.single(v).via(each)
            )
            .fold(List[OUT]())((l,t)=> l.appended(t))

      })

Is this optimal? Would doing this work like this have the same, better or worse runtime performance than an implementation based on mapAsync or mapAsyncUnordered?

thanks