AKKA Stream mapAsync doesn't return elements by order

(Danel Kotev) #1

I have a flow with mapAsync stage. This stage should emits its elements in the same order they were pushed. I verified (using log printing) that events are pushed by the desired order. However, my mapAsync stage doesn’t always pushed this elements in the desired order. I think the the function I used caused this issue, but

MyFlow (the issue is at the second mapAsync ):

      .mapAsync[EventOrError](EventVerificationParallelism) { eventFlowInfo: EventFlowInfo =>
      // toDo: add verification rules
      .mapAsync[EventOrError](5) { eventOrError =>
      runStage("event storage", eventOrError, persistToEventStorage)

runStage method:

private def runStage(stageName: String,
                       eventFlowInfoEither: EventOrError,
                       stage: (EventFlowInfo) => Future[EventOrError]): Future[EventOrError] = {
    eventFlowInfoEither match {
      case Right(eventFlowInfo: EventFlowInfo) =>
        logger.info(s"running stage for $eventFlowInfo")
      case error@Left(_) =>
        logger.error(s"failed event will not be persisted to $stageName")

persistToEventStorage method:

private[flow] def persistToEventStorage(eventFlowInfo: EventFlowInfo): Future[EventOrError] = {
    eventFlowInfo.event match {
      case Some(event: Event) =>
        logger.info(s"persisting event of doc [${event.docId}] from sender [${eventFlowInfo.sender}] to event storage")
          .map[EventOrError]{ eventId =>
              logger.info(s"event for doc [${event.docId}] from sender [${eventFlowInfo.sender}] was created successfully with id [$eventId]")
              val updatedEvent: Event = event.copy(id = eventId)
              Right(eventFlowInfo.copy(event = Some(updatedEvent)))
          .recover { case ex: Throwable =>
            logger.error(s"was not able to persist event for doc [${event.docId}] from sender [${eventFlowInfo.sender}]", ex)
            Left(EventStorageException(ex, eventFlowInfo))
      case None =>

It’s important to say that I verified that there is no issue with the method create , however i do think that something is wrong in persistToEventStorage method. I’m using the same ExecutionContext .

Scala version “2.12.7”

Akka version “2.5.17”