Asynchronous command validation on EventSourceBehavior

On akka persistence, when receiving a command, how can we execute asynchronous function to validate the command before deciding to reject the command of to persist an event?

EventSourcedBehavior accepts:

commandHandler: (State, Command) => ReplyEffect[Event, State]

I would like to do something like:

val validateCommand(command: Command): Future[Boolean]

commandHandler: (State, Command) => 
validateCommand(command).map(boolean => 
   if(boolean)
     Effect.reply(replyTo)(Rejected("reason"))
   else
     Effect.persist(MyEvent()).thenReply(reply)(_ => Accepted)
)
  1. Use context.pipeToSelf to map the future validation result to a new validated command
  2. Return Effect.none
  3. Add logic to handle the validated command in your command handler.
  4. Profit

thanks for your support?
What is the difference between returning

  • Effect.none
  • Effect.noReply

Effect.noReply is specifically for the EventSourcedBehaviorWithEnforcedReplies, see docs: https://doc.akka.io/docs/akka/current/typed/persistence.html#replies

1 Like

Hello Johan,
Looking for a similar thing, how do you get the context in a commandHandler ?

Anser to myself: Well, it’s documented, see https://doc.akka.io/docs/akka/current/typed/persistence.html#accessing-the-actorcontext

Anyway, thanks for these messages, they got me on the right track.

1 Like

You can get a context from setup

 Behaviors.setup[Command]{ context => 
    EventSourcedBehavior.withEnforcedReplies[Command, Event, State](
         persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
         emptyState = State(),
         commandHandler = (ste, cmd) => ste.applyCommand(cmd, context),
         eventHandler = (ste, evt) => ste.applyEvent(evt)
    )
}

Thanks brabo-hi,

Wrapped that around the commandHandler like:

Behaviors.setup[Command]{ context => 
   EventSourcedBehavior
     .withEnforcedReplies(
         persistenceId,
         MyState("new"), 0),
         commandHandler =  { (state, command) =>
            commandHandler(context, state, command)
          },
          eventHandler = eventHandler
      )

But getting the context used for pipeToSelf in the behavior is still quite a puzzle.

   val futureResult = callSomefunctionThatGivesAFuture(....)
   context.pipeToSelf(futureResult) {
      case Success(answer: String) => WrappedUpdateResult(UpdateSuccess(answer), replyTo)
      case Failure(e) =>  WrappedUpdateResult(UpdateFailure("something", e.getMessage), replyTo)
    }

The above code is based on the Interaction Patterns Documentaton but gives me compilation errors:

found   : scala.util.Try[String] => Any
[error]  required: scala.util.Try[String] => _$2

Have you seen this kind of behavior before and someone maybe a hint how to figure out what _$2 should be ?

I 've created a separate question for this: