Bridging write to read side: Persistence Query -- Polling


#1

Akka Persistence: CQRS

Bridging the gap between read and write models

Hi all!I have a question regarding the handling of CRQS with Akka Persistence.So let me first rephrase what I think I understood so far, so that maybe wrong assumption from my side do not lead to misconceptions.

So I am sorry for the long post, hopefully it is helpful to understand the background of my question though.
If you want to jump right to my question, jump to the conclusion Conclusion - A Misconception? part.

CQRS and Akka Persistence

Doing CQRS, we decouple the write model from the read model.As we usually want to read what was written previously (for some definition of ‘previously’ - see eventual consistency), so we need to bridge between the read and the write model (at least if not both run on the same database in the same data structure).

Akka Persistence takes care of the write model. Akka Persistence Query can be used to query the model, e.g. to build a bridge to the write model.

Akka Persistence Query uses Read Journals to query the data from the write models.There are multiple read journals for various databases. All expose a somewhat key / value storage interface (a journal partitioned into persistence IDs).

Read Journals - Status Quo

The documentation says the following about the Read Models:

Most journals will have to revert to polling in order to achieve this [watch for additional incoming events], which can typically be configured with a refresh-interval configuration property.

The only plugin I found which somewhat does something related to push was the Mongo DB. Though if I take a closer look, with some Issues (#155) or room for improvement (#163).

Except for this read journal, I found no other read journal which did not use polling for fetching new events.

Basically (ignoring tags) there are 2 major sources for read journals:

  • PersistenceIdsQuery defines a Source of persistence IDs

  • EventsByPersistenceIdQuery defines a Source of events ( EventEnvelope to be precise)

In order to subscribe to all events, I need to iterate over all persistence IDs with the first and subscribe to all events for that persistence ID.

Use Cases

So for a polling read journal, this means that I register a database polling action for every persistence ID. Which is acceptable, if there are only a few persistence IDs with a lot of events.

It is however a horrible idea, if there are a lot of persistence IDs with only a few events each. A lot of polling actions, which mostly fetch no new data at all but still keep the database busy.

Alternatively I can choose to lower the refresh-interval , which eases the load on the database, but potentially increases the time between when an update reached the write model and is visible in the read model.

Conclusion - A Misconception?

Given the current status quo mostly polling is used to subscribe to new events.Akka Persistence Query is therefor a bad fit for a lot of persistence IDs with few events each.

Is this a misconception? Where did I branched of with my reasoning from the right path, to get here then?

Possible Workarounds

  • Implement your own journal which uses DB push
    (which limits the journal to databases supporting this feature)

    • You still need to handle failures on the duplication to the write side in order not to miss updates…
  • Use the Mongo DB Journal and use server pushes for subscription to new events
    (there is even an allEvents function, which let’s you subscribe to all events at once)

  • Don’t use Akka Persistence at all and run your own Event Sourcing (dropping CQRS all along and write the read models atomically together with your original write side)


(Olger Warnier) #2

Hi Tilofid,

We use EventsByTag and CurrentEventsByTag to get a stream of events for the relevant persistence ids

Its a polling based system and in our experience its fast enough. Due to the CQRS design it’s eventually consistent anyway.

The way we use it, is to tag the persistent actor based on a specific type that’s needed for aggregation. We did not try multiple tags, that might lead to issues as the way tagging is implemented is not that flexible.

Hope this helps.

Olger


#3

Hi Olger,

thanks for your answer!
If I understand your answer correctly I failed to specify my problem precisely enough.

So let me retry, this time I am using pseudo code. We basically want to fetch all events and do process them in different read models. For this we basically do (stripping down error handling, restarting sources etc. pp.) the following:

readJournal.persistenceIds().runForeach { persistenceId =>
  fetchLatestKnownEventNumberForThatPersistenceId(persistenceId) { latestKnownSequenceNo =>
    readJournal.eventsByPersistenceId(persistenceId, latestKnownSequenceNo + 1, Long.MaxValue)
      .via(readModelUpdateFlow)
  }
}

which is, as far as I can see, the only option to fetch all updates for all persistence IDs.
A far better solution would be something like allEvents which is only available to the Mongo DB Journal (as far as I know)

My problem now is, that each call to readJournal.eventsByPersistenceId registers a polling service, which polls the database. If it would just register a subscription (push based) it would be okay.

In a scenario where I have a lot of persistenceIds (say e.g. 1,000,000) and only a few of them receive updates (say 1,000 per day) a lot of polls run through the system which will never fetch updates at all.


So if I take your answer and match it against my problem:

You proposed EventsByTag as a solution.
I think it could be used to simulate allEvents. I could simply tag all events with a common tag and use this. I did not consider it previously, because I though it’s use case is more of a filtering of events. I would use it for the exact opposite motivation. If I read the ScalaDoc correctly, I would need to have a common Offset for all events with a specific tag.
That is something I can only provide per persistenceId, not for a group or all persistence Ids. So I fear that doesn’t help me much, does it?


(Patrik Nordwall) #4

As Olger suggested tagging is often used for this. Tagging all events with the same tag would be fine, but you can also use eg. 10 different tags. Hash the persistenceId to on of these 10. Then you can have 10 separate read side processors that will consume all events, each one consuming a subset (possibly located on different nodes in a cluster).

The Cassandra plugin is using a time based (timeuuid) offset for eventsByTag.


(Olger Warnier) #5

Hi Tilofid,

Its more that the the combination of calls you do to eventsByPersistenceId is not required when you tag the persistenceIds that need to be in a specific read model with a single tag.
Thereafter you can doe eventsByTag on that tag.

I’ve had a similar issue a long while ago and discussed that with the google group at that time and found that tagging would be a reasonable solution for creating read models that are based on multiple persistence Ids. For the complete conversation have a look at Akka Persistence on the Query Side: The Conclusion

If you like to see how we apply this pattern, have a look at DDD and CQRS supporting framework something we build in order to have our learnings with this pattern together, there is a sample app part of the repo.

Hope this helps together with the remarks of Patrick. Still this may not fit your use case, there are other ways to build a CQRS structure.


#6

Hi patriknw and Olger,

thanks for the input. More things to read and reason about, great! :wink:

My concern about tagging was that by tagging, you partition your events into tag groups (potentially even multiple partitions, if one follows patriknw idea and hashes the persistenceIds).

This leads to the point where the read model needs an Offset to continue syncing from the write side.
Which potentially leads to problems, if there is no such offset.
As patriknw pointed out, Cassandra uses timeuuid for this, which can be used over multiple persistenceIds. A sequence number inside the persistenceID can’t.

I am using the JDBC journal and it seems there is an ordering column just for this purpose, which uses a database sequence. I wasn’t aware of that previously. That can be used to track progression over multiple persistenceIds. Case closed: :tada: this solves my initial problem.


However for this to work, I now have a common dependency I need to pass through for every event: Fetching a new number from the sequence generator.

So a lot of events from some persistenceIds can now impact the write performance on other (otherwise) unrelated persistenceIds? Which was the reason I turned to CQRS in the first place. :thinking:

I need to rethink a few things.