Akka-Persistence with Queue


(Marc Schlegel) #1

Hello everyone

I am learning Akka by implementing a small integration layer for two microservices. The sole purpose for now is dispatching of events and handling errors.

The basics are running fine so I’ve added some additional behaviour which is able to determine if a external service is unavailable. In this case the behaviour changes to a simple ping, which continually checks the availability until the service is back. During the ping, no domain-events are to that service are dispatched.

Now, I would like to add the capability that no domain-event gets lost. Reading about Akka-Persistence seemed to be exactly what I needed: I can stash messages while waiting for the unavailable service, and unstash them when coming back to life.
Unfortunately the default storage for akka-persistence (as far as I understood) is an event-store with the purpose of replaying events to recreate state. In my case however I am more interested in an actor which can stash/unstash messages using a persistent queue.

Is there any configuration in Akka-Persistence or do I have to roll my own implementation? I found this blog from 2014 which seems to be what I am looking for, but there is a lot of custom code


(Marc Schlegel) #2

I’ve finally settled with Command-Sourcing using Akka Persistence. The actor already receives events, so no additional validation or mapping is necessary.

All I needed was an additional SuccesfullDeliveryCommand(cmd: Command) which removed the contained command from the internal queue of commands, which are waiting to be acknowledged.


(Renato) #3

@marc, I’m not sure if I fully get what you described, but there are a few things that I would like to clarify.

Commands vs. Events

Usually, you don’t send an event to a domain model. You send commands. A command can be originated from an event though. An event happens on your system or externally, you get somehow notified and you react sending a command to your model.

The main distinction is that a command can be rejected if not complying with the model invariant and an event represents a fact, something that happened and that should be taken into account.

Command Sourcing

Command sourcing is a strange beast and it’s usually implemented as a poor-man backpressure. If you are listening to events and sending commands to your model and then persisting the commands, basically what you are doing is transferring the list of events to a list of commands. You are transferring data from one side to the other and creating a persisted buffer.

It probably means that all commands are valid and there is no model invariant to be checked. They arrive, they get persisted, they are applied.

A better way of doing this is actually to be able to backpressure the stream from where you are taking the original events.

Note that the journal will be replayed on actor re-instantiation and your commands will be reapplied. Are you sure this is what you are looking for?

You may think that snapshotting will help you here, but it’s not reliable for the use case you are trying to implement. The reason is that snapshotting is an optimization to avoid replaying the events from scratch, not a mechanism to prevent commands to be replayed.

When you save a snapshot, it happens asynchronously and it can happen that your command is persisted, but the snapshot not due to a system crash. When your actor comes back, that last command will be replayed and will cause side-effects on your external system.

Stashing

I understood that you are moving away from in-memory buffer and opted for a persistent buffer, but want to add it here just for the record.

It’s not a good idea to stash your commands while waiting for another service to become available. Stash is an in-memory data structure and you can get runtime errors if you stash too much.

From the akka docs:

One important thing to be aware of is that the StashBuffer is a buffer and stashed messages will be kept in memory until they are unstashed (or the actor is stopped and garbage collected). It’s recommended to avoid stashing too many messages to avoid too much memory usage and even risking OutOfMemoryError if many actors are stashing many messages. Therefore the StashBuffer is bounded and the capacity of how many messages it can hold must be specified when it’s created.
If you try to stash more messages than the capacity a StashOverflowException will be thrown. You can use StashBuffer.isFull before stashing a message to avoid that and take other actions, such as dropping the message.