How to use pipeToSelf in EventSourcedBehavior

Hello Everyone,

In my EventSourcedBehavior, I’d like to call some functions that return a Future in the command handler.

Based on the example in the documentation, I start using pipeToSelf to get the Future response back into the running actor (behavior)

Behaviors.setup[Command]{ context => 
   EventSourcedBehavior
     .withEnforcedReplies(
         persistenceId,
         MyState("new"), 0),
         commandHandler =  { (state, command) =>
            commandHandler(context, state, command)
          },
          eventHandler = eventHandler
      )

But getting the context used for pipeToSelf in the behavior is still quite a puzzle.

   val futureResult = callSomefunctionThatGivesAFuture(....)
   context.pipeToSelf(futureResult) {
      case Success(answer: String) => WrappedUpdateResult(UpdateSuccess(answer), replyTo)
      case Failure(e) =>  WrappedUpdateResult(UpdateFailure("something", e.getMessage), replyTo)
    }

but gives me compilation errors:

found   : scala.util.Try[String] => Any
[error]  required: scala.util.Try[String] => _$2

Looking at the sample, it expects to return the behavior as the event sourced functions return an Effect, so this may not be the right direction at all.
Another option is to have a combination of behavior and effects ?

So next to the fact that some hints on the error, some advice on the best approach would be really appreciated.

Kind regards,

Olger

First thing to check: WrappedUpdateResult a Command.

If that’s not it, I’d add explicit type parameters to .withEnforcedReplies and move inwards until it becomes obvious what the type mismatch is.

Thanks,
I’ll start adding some explicit types. One more question on the typing:
a command handler function returns a ReplyEffect[MyEvent, MyState]

The pipeToSelf wants to send a message expects a Behavior[MyCommand]

When you the pipeToSelf in the commandHandler for EventSourcedBehavior it results in a type mismatch, right ?
If so: should I split the message handling in a way that the Commands that trigger a future are returning Behavior and the pipeToSelf messages return a ReplyEffect ?

Maybe I am completely off here, Trying to understand the Typed structures after building with classic actors quite some software…

I think it may be a bit tricky combining such async validation/enrichment with trying to guarantee replies using .withEnforcedReplies, probably easier with the regular event sourced behavior.

I should add that we had a discussion about this kind of enrichment yesterday, me and @octonato and while I have no regrets or morals he has stronger feelings about doing it inside an event sourced actor being an anti pattern, and that an event sourced behavior should only deal with commands inside of its own consistency boundary.

If you still want to do it: Piping to self is a side effect, so you’d first fire off async operation, pipe future result mapped to command together with original replyTo to self, then return a Effect.noReply (as you’d do the actual reply once the enriched/validated command arrives).

Effect.noReply is what I would use for the first command, and the reply is sent later when handling the piped result of the async operation.

Thanks @johanandren,

Your hint to move inwards with the typing made it clear what was wrong. My ActorContext was typed as ActorContext[_] and that was certainly not a good idea. Passing it really typed compiles properly. Your ‘direction’ to use the Effect.noReply and do the reply on the passed message by the future is clear, happy to hear that it works as I initially expected.

What makes it an anti pattern to do a ‘side effect’ in the command handler ?
Love to hear the arguments against it @octonato ?

Having a call inside the command handling is an easy way to prevent it running again in a replay / projection and that comes in handy from a team perspective. (The patterns used with CQRS need some practice)

Another way for me would be to make a ‘Saga’ (as it is called in Axon) listening to an event, do the “side effect” and report back with a new command to the actor, That creates visible interaction patterns (in the event store) of the integration. Complex integrations (for instance ERP where you integrate on domain logic) end up with a event/command structure that is quite hard to reason about.

Now that the direction is set, compilation works, its time to get the Typed actor do what it should :slight_smile:

Thanks for your support (really appreciated),

Olger

Thanks Patrick,

That was my initial setup, due to a bit of laziness in passing the actor context (with _ ), I went down the drain in creating very complex ‘solutions’.

That made me think though: Is it logical to mix Effect and Behavior return types within an EventSourced Actor ? (and would that be possible ? )

Olger

You can start with plain Behavior and switch to EventSourcedBehavior, but then you can’t switch back to plain Behavior.

Hello, when using akka persistence, can we safely access actorContext in theRun for future processing? eg.

 Effect
              .none
              .thenRun((newState: State) => {
                context.pipeToSelf(futureProcessing) {
                  case Failure(exception) => InternalCommand.ResultProcess(message = exception.getMessage)
                  case Success(value) => InternalCommand.ResultProcess(message = value)
                }
              })

That is fine. thenRun is running in the actor message processing thread.

2 Likes