How to retire aggregates and archive events?

I am looking at Akka as I would like to use event sourcing for some particular microservices in my system (we are migrating an older space-based architecture to an event based microservice architecture). My challenge is that I have high volumes (always increasing, ~1.5bn entities a day) of fairly short lived (1-10 days) entities which receive only a few commands and thus events, and I need to process those 1.5 bn inside of maybe 4 hours (actually spread through day but with significant spikes).

I have been looking at how to accomplish on a few event sourcing oriented platforms - trying to find a good fit platform that I can adopt - Akka, Axon, EventStore, KStreams.

To enable adequate performance on reasonable resources my belief is that I need to be able to ‘archive’ the events for ‘retired’ entities off to another longer term read only store (most likely as ‘history’ documents in a Doc DB). I would expect maybe I could do that by building that store off the original events, but then delete the entity and its events from Akka at some appropriate point.

So my major challenge looking at Akka is this archiving question.This seems to me (at least from my background area of finance) to be a fairly typical requirement so I am surprised to see that there is no direct support in any of these platforms so far (hopefully I am wrong). If you receive a large number of new aggregates to process every day and you want good performance (and especially if you want reasonable resource requirements) and if the aggregates have a clear limit to their active life in the system then you want to leverage that to keep your active event store lean and offload old but still interesting for audit (i.e. read only) etc data to a separate store.

A typical deployment of our system will be (on premise) ~1.5 bn new aggregates per day, to be processed in around 1-10 days, and then retained for 10 years. I don’t see how that would be viable without such an approach.

Hi Jason,

Events are persisted to a database and are read once you instantiate an entity for a given id. At that point, only events for this entity will be read. In your description, you mention that each entity will have a few events, so that should not impact your performance. Your entity stays in memory for a while and then it’s removed from memory, if no commands are sent to it for a given period (default to 120 seconds).

You can minimize the amount of data you read on each instantiation by saving snapshots of your state, but it doesn’t seem that this is an issue in your case.

That said, the size of your journal is not an issue for Akka itself. You may want move data out of your journal once the affected entities have reached their end-of-life. But that’s not a concern for a library like Akka, Axon, etc. That’s a database backup concern. And that’s why this is not built in the libraries you mentioned.

Thanks Renato for the rapid response - it sounds positive although I need to dig into further.

I agree that the size of the journal is not an issue for Akka on actor side, but it does seem to me to be a concern for the Akka Persistence / Event Sourcing as it relates quite directly to how well the model can perform and/or the resources required to do so.

For my use case I will have roughly 15 bn entities which are involved in some kind of activity, and then roughly 3.75 tn only for query based analytics/reporting/view - so managing the data is key, and that clearly needs to align with the lifecycle of the most important data which is that 15 bn. Backups or replication is not an issue - but I am looking to purge the active side.

It sounds from what you say that I can manage that within the datastore. TBH I was hoping that I would not need to build too much there, as it does seem a typical need to me, however happy to look into that - please would you be able to point me to the documentation around that so I can investigate? I definitely want to be using Akka Persistence Event Sourcing aligned with intent - I don’t want to end up going radically off-piste.

I think you should start with with the main Akka Persistence documentation. All information about how to encode an entity is described there. As well as their lifecycle, passivations and how to perform snapshots.

On how to perform backups and move your data, I fear you will have to investigate it yourself. It also depends on which persistence plugin you choose.

You will need to decide which plugin to use, get familiar with the journal table and devise a strategy for purging/back up data. From the amount of data you have, I can imagine that you are looking for something like Cassandra. If that’s the case, make sure you have in-house Cassandra expertise.

Pay extra attention to how you will serialize your events in the journal. Events are serialized as a blob. You will need to choose a serialization format that you can read back when you move data to some other back storage.

Thanks again - still busy reading…

I am not worrying about backups or moving the data (hopefully correctly): I want to retain a history of every entity for 10 years, so I need a service (or several for different entities) which provides the required ability to search and otherwise work with that data - so I can just have that as a consumer of my original published events, but building its own model. I plan to use Alpakka & Kafka, as I generally use Kafka as my inter-service event bus.

So then all I need is to be able to do the purging aspect - to purge/delete from the persistence plugin backend event storage all events for an entity, for each entity that I deem no longer required for active work . As that purging needs to be done fairly continuously but using some business logic (it might be age of entity but also ensuring some status of entity) that requires some access to the entity. It seems then that the best thing is to drive from some scheduled logic against some read model over the entities, which would send messages to the entity actors to indicate they are about to be purged (thus allowing some strongly consistent validation), and then have that finally allow them to mark themselves as removable. From what I can see I can use PersistenceId within that read model, to allow the actual storage delete query to operate.

You might also be interested in Akka Projections.

The current Akka API to delete events is based on number of events, not on state. We have an issue to make it also state based, but no concrete plans to implement it for now.

But if you have a read side, you can detect when an entity has become obsolete and then start some process to purge it. That will need to be something you implement yourself though.

Thanks. Projections could be of interest there - however at first reading the mechanism for sharing/partitioning for projections seems to be a bit less flexible than and unrelated to the partitioning for command/event processing - is that really the case (sorry, I should maybe ask via another question)?

Yes, that’s exactly the case and the main reason for that is that the read-side have different requirements regarding throughput.

If we would have the same partitioning in both sides, we would have N read-side projections running per entity. We will need to keep them all in memory waiting for events from the entities.

That’s why one projection instance is responsible to consume from more than one entity. As such, we can have a much smaller number of them constantly running and listen to events from different entities.

If you compare it with Kafka, you also don’t have one consumer per unique ID in the topic. It’s the same strategy. One consumer picks events from different entities.

Note that for Cassandra, there’s already tooling to support purging events, see Cleanup. The concern there is that you’d need to ensure that while you’re doing the purge there is no actor associated with that persistence ID (and that all the queries/projections you care about have offsets after the events, if you’re deleting from tag views).

Thanks both. Sorry, my question was vague - I understand that the actual scale-out is independent, but the mechanism for scaling seems much more static as is based on tags, which was my concern.

Yeah, that’s true. This has been a concern for us as well. We know that is not flexible.

Have a look at the akka-persistence-r2dbc plugin and its events by slice query. It has new way of partitioning the journal that allows for more flexibility for the read-side.

For the record:

and then

Slices can also be implemented as a domain-application level concept in Cassandra. I used the term “domain shard” (this predates the slice implementation). The rough approach was to have a number of domain shards configured, with a comment in the conf along the lines of

Should be about 1 per X potential users… this should never decrease

Then on the first event that’s taggable, a shard is computed (e.g. abs(hash of id modulo number of domain shards) and saved as part of the event. The tagger sees the event and tags it with the shard appended. The state updates such that once the shard is set, it ignores attempts to update it but otherwise uses the first shard from an event as the domain shard in the state. Subsequent events are persisted ensuring that the shard from state is used.

This allows for increasing the shard count, as the new shards could only be selected for new entities.

So if configured with 4 domain shards until the first 40k entities are there, then we’d expect 10k entities per shard. At this point, we might reconfigure to 8 domain shards, so after 40k more entities, we’d expect 4 with 15k and 4 with 5k. Then 16 domain shards for the next 40k entities, so 4 with 17.5k, 4 with 7.5k, 8 with 2.5k. There’s no necessary need for agreement among the instances on the domain shard count, since an instance with N shards which loads an entity in shard M (even if M >= N) will respect that and if every instance takes some responsibility for keeping sharded projections running, then the running projections will be controlled by the instance with the highest configured number of shards.

Note that something like this could be enforced in the cassandra plugin with minor schema changes, but I think there’s value in leaving it to the application. For instance, I think different tags might well at scale want different shard-sizes which moves this completely out of what the framework can meaningfully promise.