AKKA Stream mapAsync doesn't return elements by order


(Danel Kotev) #1

I have a flow with mapAsync stage. This stage should emits its elements in the same order they were pushed. I verified (using log printing) that events are pushed by the desired order. However, my mapAsync stage doesn’t always pushed this elements in the desired order. I think the the function I used caused this issue, but

MyFlow (the issue is at the second mapAsync ):

Flow[EventFlowInfo]
      .mapAsync[EventOrError](EventVerificationParallelism) { eventFlowInfo: EventFlowInfo =>
      // toDo: add verification rules
      Future(Right(eventFlowInfo))
    }
      .mapAsync[EventOrError](5) { eventOrError =>
      runStage("event storage", eventOrError, persistToEventStorage)
    }

runStage method:

private def runStage(stageName: String,
                       eventFlowInfoEither: EventOrError,
                       stage: (EventFlowInfo) => Future[EventOrError]): Future[EventOrError] = {
    eventFlowInfoEither match {
      case Right(eventFlowInfo: EventFlowInfo) =>
        logger.info(s"running stage for $eventFlowInfo")
        stage(eventFlowInfo)
      case error@Left(_) =>
        logger.error(s"failed event will not be persisted to $stageName")
        Future(error)
    }
  }

persistToEventStorage method:

private[flow] def persistToEventStorage(eventFlowInfo: EventFlowInfo): Future[EventOrError] = {
    eventFlowInfo.event match {
      case Some(event: Event) =>
        logger.info(s"persisting event of doc [${event.docId}] from sender [${eventFlowInfo.sender}] to event storage")
        create(Event.toEventRow(event))
          .map[EventOrError]{ eventId =>
              logger.info(s"event for doc [${event.docId}] from sender [${eventFlowInfo.sender}] was created successfully with id [$eventId]")
              val updatedEvent: Event = event.copy(id = eventId)
              Right(eventFlowInfo.copy(event = Some(updatedEvent)))
          }
          .recover { case ex: Throwable =>
            logger.error(s"was not able to persist event for doc [${event.docId}] from sender [${eventFlowInfo.sender}]", ex)
            Left(EventStorageException(ex, eventFlowInfo))
          }
      case None =>
        Future.successful(Right(eventFlowInfo))
    }
  }

It’s important to say that I verified that there is no issue with the method create , however i do think that something is wrong in persistToEventStorage method. I’m using the same ExecutionContext .

Scala version “2.12.7”

Akka version “2.5.17”