AKKA Stream mapAsync doesn't return elements by order

I have a flow with mapAsync stage. This stage should emits its elements in the same order they were pushed. I verified (using log printing) that events are pushed by the desired order. However, my mapAsync stage doesn’t always pushed this elements in the desired order. I think the the function I used caused this issue, but

MyFlow (the issue is at the second mapAsync ):

Flow[EventFlowInfo]
      .mapAsync[EventOrError](EventVerificationParallelism) { eventFlowInfo: EventFlowInfo =>
      // toDo: add verification rules
      Future(Right(eventFlowInfo))
    }
      .mapAsync[EventOrError](5) { eventOrError =>
      runStage("event storage", eventOrError, persistToEventStorage)
    }

runStage method:

private def runStage(stageName: String,
                       eventFlowInfoEither: EventOrError,
                       stage: (EventFlowInfo) => Future[EventOrError]): Future[EventOrError] = {
    eventFlowInfoEither match {
      case Right(eventFlowInfo: EventFlowInfo) =>
        logger.info(s"running stage for $eventFlowInfo")
        stage(eventFlowInfo)
      case error@Left(_) =>
        logger.error(s"failed event will not be persisted to $stageName")
        Future(error)
    }
  }

persistToEventStorage method:

private[flow] def persistToEventStorage(eventFlowInfo: EventFlowInfo): Future[EventOrError] = {
    eventFlowInfo.event match {
      case Some(event: Event) =>
        logger.info(s"persisting event of doc [${event.docId}] from sender [${eventFlowInfo.sender}] to event storage")
        create(Event.toEventRow(event))
          .map[EventOrError]{ eventId =>
              logger.info(s"event for doc [${event.docId}] from sender [${eventFlowInfo.sender}] was created successfully with id [$eventId]")
              val updatedEvent: Event = event.copy(id = eventId)
              Right(eventFlowInfo.copy(event = Some(updatedEvent)))
          }
          .recover { case ex: Throwable =>
            logger.error(s"was not able to persist event for doc [${event.docId}] from sender [${eventFlowInfo.sender}]", ex)
            Left(EventStorageException(ex, eventFlowInfo))
          }
      case None =>
        Future.successful(Right(eventFlowInfo))
    }
  }

It’s important to say that I verified that there is no issue with the method create , however i do think that something is wrong in persistToEventStorage method. I’m using the same ExecutionContext .

Scala version “2.12.7”

Akka version “2.5.17”

I have no clue what is the current problem, but I know how mapAsync works.
With a paralellism of 5 mapAsync works like:

  • create a 5 slot thing
  • point the actual slot to the first slot
  • get 5 elements
  • start to evaluate them
  • wait for the actual slots computation
  • if the actual slots element finished push it out,
  • get a new element to the actual slot, start to evaluate it,
  • (add 1)%5 to the actual slot “pointer”
  • wait for the actual slots computation
  • (repeat till we have elements or one computation is failed)

So if you do sth like Source(1 to 10000).mapAsync(i => Future(log.info(i); i)).runWith(Sink.seq) the logs can be in different order than the elements in the sink. And that is totally normal. (bcs slot 5 can be the fastest computation, but we still waiting for 1-4) If you need to some futures go sequentially you should use paralellism with 1.

Is it helped?