Are Akka persistence side effects always handled on the same entity?

Hey all, wondering if anyone has seen this kind of weird issue with akka persistence:
I’m essentially sending (via ask) a command to an entity, and the command handler for that command will persist 2 events and thenRun a side effect.

This is fine most of the time, but intermittently, the side effect will be executed by a completely different entity from the one persisting the event.

Here is some pseudocode and logs of what I’m seeing:

val ref = clusterSharding.entityRefFor(ENTITY_KEY, id1)
ref.ask(Command())
...
onCommand(Command::class.java, {
  Effect.persist(events).thenRun {
      log.info("Command received: $command on entityId: $id")
  }

---- 
In the event log:, events have been persisted for id1:

{ persistenceID: id1,
  event: event1 },
{ persistenceID: id1,
  event: event2 }

In the logs, the side effect runs on id2:

Command received: Command(sender: .../temp/id1) on entityId: id2

Any ideas? Are side effects not guaranteed to execute on the same entity that handled the command and persisted the events?

thenRun will be called on the same instance that persisted the events.

Can you show more real code of how you log this?

Sure, I’ll just have to make some of the names generic and cut out some of the fields, but here’s the Behavior:

class Behavior private constructor(
    private val id: String,
    private val tag: String,
    private val customCommandHandlerBuilder: CustomCommandHandlerBuilder,
) :
    EventSourcedBehavior<Command, Event, State>(
        PersistenceId.of(ENTITY_KEY.name(), id),
        SupervisorStrategy.restartWithBackoff(Duration.ofMillis(200), Duration.ofSeconds(5), 0.1)
    ) {
...
    override fun commandHandler(): CommandHandler<Command, Event, State> {
        return customCommandHandlerBuilder
            .withCommandHandlerBuilderBuilder { newCommandHandlerBuilder() }
            .withEffect { Effect() }
            .withId(id)
            .build()
    }
...
}

It’s initialized via a Spring Component:

    @Component
    class Initialize(...) : AkkaCluster.InitializeBehavior {
        override fun initBehavior(actorSystem: ActorSystem<*>) {
            val customCommandHandlerBuilder = CustomCommandHandlerBuilder()
                .withClock(Clock.systemUTC())
                .withClusterSharding(ClusterSharding.get(actorSystem))
                ...
            ClusterSharding.get(actorSystem).init(
                Entity.of(ENTITY_KEY) {
                    create(it.entityId, customCommandHandlerBuilder)
                })
        }

        fun create(
            entityId: String,
            customCommandHandlerBuilder: CustomCommandHandlerBuilder,
        ): Behavior<Command> {
            val index = abs(entityId.hashCode().rem(TAGS.size))
            val tag = TAGS[index]
            return Behavior(entityId, tag, customCommandHandlerBuilder)
        }
    }

As far as I can tell, initBehavior here is only executed once at startup, but let me know if you want to see more details about how it’s called.

The CustomCommandHandlerBuilder where the thenRun is called:

class CustomCommandHandlerBuilder {

    private lateinit var id: String
    private lateinit var effect: () -> EffectFactories<Event, State>
    private lateinit var clusterSharding: ClusterSharding
    private lateinit var commandHandlerBuilder: () -> CommandHandlerBuilder<Command, Event, State>

    fun withCommandHandlerBuilderBuilder(commandHandlerBuilder: () -> CommandHandlerBuilder<Command, Event, State>): CustomCommandHandlerBuilder {
        this.commandHandlerBuilder = commandHandlerBuilder
        return this
    }

    fun withClusterSharding(clusterSharding: ClusterSharding): CustomCommandHandlerBuilder {
        this.clusterSharding = clusterSharding
        return this
    }

    fun withEffect(effectFactory: () -> EffectFactories<Event, State>): CustomCommandHandlerBuilder {
        this.effect = effectFactory
        return this
    }

    fun withId(id: String): CustomCommandHandlerBuilder {
        this.id = id
        return this
    }

    fun build(): CommandHandler<Command, Event, State> {
        return notInitializedCommandHandlerBuilder()
            .orElse(anotherCommandHandler())
            ...
            .build()

    private fun notInitializedCommandHandlerBuilder(): CommandHandlerBuilderByState<Command, Event, NotInitialized, State> =
        commandHandlerBuilder()
            .forStateType(NotInitialized::class.java)
            .onCommand(Command1::class.java, ::onCommand)

    private fun onCommand(cmd: Command1): ReplyEffect<Event, State> {
        val events = mutableListOf<Event>(
            Event.Event1(createdDate = createdDate, ...),
            Event.Event2(attr = "value", ...),
        )

        return effect().persist(events)
            .thenRun {
                  logger.info("Command received by $id: $cmd. Events: $events")
                  dao.insert(
                      History(
                          id = id,
                          ...
                      )
                  )
            }
            .thenReply(cmd.sender) {
                Command1.Succeeded.INSTANCE
            }
    }

Where do you get that logger from, is it the from the actor context or a logger you create yourself? If you create it yourself, could there be a mistake in how the logger is created and accidentally shared between actors?

The logger is custom created:

class CustomCommandHandlerBuilder {
    ...
    private val logger = LoggerFactory.getLogger(CommandHandlerBuilder::class.java)
}

But it shouldn’t matter, since it’s the entity ID being logged that is inconsistent between the event and side effect. That entity ID is set in this class via:

    override fun commandHandler(): CommandHandler<Command, Event, State> {
        return customCommandHandlerBuilder
            .withCommandHandlerBuilderBuilder { newCommandHandlerBuilder() }
            .withEffect { Effect() }
            .withId(id)
            .build()
    }

which should run independently on each entity?

Because ClusterSharding.init is only called once per entity type per node, the same instance of the CustomCommandHandlerBuilder is used for every entity of a given type on the node, thus the instances of that entity share the var id: String in that instance (i.e., modulo Java memory model visibility subtleties around which update to CustomCommandHandlerBuilder::id is seen by a given thread, the id in the last call to withId).

So what’s happening is (this is one of many possible orderings, e.g. id2 could be created before sending the command):

  • send command to id1, which persists event1 and event2
  • create entity with id2, which updates the global CustomCommandHandlerBuilder::id to id2
  • when event1 and event2 are persisted, the first thenRun block is executed by id1, which builds the string for logging from the method-local variables cmd and events but id is CustomCommandHandlerBuilder::id, which is id2

Moving the CustomCommandHandlerBuilder().withClock(...).withClusterSharding(...)... into create will in turn ensure that every entity has its own CustomCommandHandlerBuilder and thus is not sharing (and potentially stomping on…) CustomCommandHandlerBuilder::id/CustomCommandHandlerBuilder::effect/CustomCommandHandlerBuilder::commandHandlerBuilder (CustomCommandHandlerBuilder::clusterSharding is also being shared, but that’s almost certainly not a problem beyond a lot of pointless assignments)

1 Like

Ah good catch, thank you!