Akka Persistence Cassandra - unrecoverable failures, tag_writes buffer grows unboundedly

Context: the database we’re using only handles insertion batch sizes of 100. We are getting into scenarios where our tag_writes buffer appears to be growing at a crazy rate and we’re unable to recover unless we restart our services.

We’ve set akka.persistence.cassandra.events-by-tag.max-message-batch-size to 100. Our flush interval is 50ms.
We are using akka-persistence-cassandra version 1.0.1. Would upgrading help? I see this PR that seems to touch some of this code: https://github.com/akka/akka-persistence-cassandra/pull/841/files

I think what’s happening is the following, though I could be completely wrong.

  1. We get rate limited by our DB occasionally and get some OverloadedExceptions.
  2. After enough of these exceptions, we start getting Writing tags has failed. This means that any eventsByTag query will be out of date. The write will be retried. Reason com.datastax.oss.driver.api.core.servererrors.WriteFailureException: Cassandra failure during write query at consistency QUORUM (1 responses were required but only 0 replica responded, 1 failed)
  3. And then not long after, we start getting Buffer for tagged events is getting too large (401) and it keeps building up (to over 10k in some instances!)
    • In the debug logs up to that, I see lots of Sequence nr > than write progress. Sending to TagWriter
    • Since the buffer keeps increasing (and never seems to decrease) we’re unable to continue writing tags to our DB. We have to manually restart things over and over again to start fresh. Is there some sort of back pressure for when the buffer gets too large?
  4. After this we keep getting timeouts, preventing our actors from making forward progress
    • We are also getting a boatload of errors around recovery timeouts. Do we need to bump up the event-recovery-timeout config value? Supervisor RestartSupervisor saw failure: Exception during recovery. Last known sequence number [0]. PersistenceId [Aggregate|12345678-1234-5678-1234c630376bbcc0], due to: Replay timed out, didn't get event within [30000 milliseconds], highest sequence number seen [0]

Any help here is highly appreciated. I can provide our configuration for different params if needed.

1 Like

Both errors are due to database overload. One is the write to the tag_views, the other is the recovery of actors. To fix these you’ll need to increase your Cassandra cluster capacity.

The PR you reference could definitely help with the failure behaviour but you’re still going to see failures as the database will still be overloaded. With that PR you’ll see actor failures far sooner which means you could choose to backpressure some how to the source of requests rather than having a background write fail.

NOTE We are actually using CosmosDB with a Datastax Cassandra Driver, and 100 events (or 2MB, whichever is hit first) is a hard constraint on CosmosDB.

While we are getting rate limited (overloaded) occasionally, when the throttling stops on the Cassandra side, we appear to enter an endless cycle of failures on our side.

My initial hunch here was based on the following:

  • Pre-condition: our Cassandra cluster only accepts batches of <=100 events for writes.
  • because of the rate limiting, the buffer gets to a size greater than 100, but never seems to decrease
  • because this buffer is >100, the only way around this is to restart the service to “clear” the buffer
    • Even if it grows unboundedly, I would expect it to send batches of 100 events (because that’s what we specified in the config). But once it’s over 100, it is, from what I observe, it never decreases. Therefore, we are never able to recover because our DB doesn’t accept batches of events > 100.

Let me know if I’m misinterpreting anything here

There were cases in olderversions of APC where the batch size wasn’t observed. This shoudl be fixed in 1.0.4. Regardless of the reported buffer size only the batch size should be written in one go.

Hi @chbatey, our team just upgraded to akka-persistence-cassandra v1.0.4 from v1.0.1, and we’re seeing about ~10x the normal latency for a persist call in our EventSourcedBehavior class. On v1.0.1, we’re getting around 10ms, and on v1.0.3/1.0.4 we’re getting 100ms. v1.0.2 seems ok though.

Flush-interval is set to 0ms. v1.0.4 added buffering - is this making the persist call take longer? If there was I’d expect it to emit immediately if max-message-batch-size is 1.

I just want to understand this issue some more. Is the change here primarily to reduce network I/O (at the potential cost of latency?)

Here’s how we’re tracking it:

    long before = gettime()
    return Effect()
        .persist(Stream.of(event1, event2).collect(Collectors.toList()))
        .thenRun(() -> {
          emitLatency(now() - before)
          sendOverMessageBus(event1, event2);
        .thenReply(replyTo, nil -> theReply());

Here’s the diff FWIW (mostly for myself to keep track of :)): Comparing v1.0.3...master · akka/akka-persistence-cassandra · GitHub

Are you using tags? If so the likely reason for greater latency in v1.0.4 is that the write to the tag_views table is now synchronous i.e. the persist isn’t completed until the event will be visible in both recovery and events by tag queries.

The reason for the change was that when the write to tag_views was asynchronous if the database was becoming overloaded on the tag_view right there was no feedback to your persistent actors whereas now the persists slow down which can backpressure into the rest of your system.

We are! However low-latency for the raw messages is important to us - we send those in the thenRun function. Sending before the persist won’t work for failure scenarios. Any recommendations for getting around this?

We did consider making the synchronous part optional but decided against it as it is the most correct.
We could resurrect that idea. If you think so then create an issue for it.

In the meantime rather than setting the batch size to 0, set the flush-interval to 0 (the new default in the latest version). Setting a batch size to 0 will mean that any queued writes will be written one at a time whereas what I think you want is for writes to be done ASAP and if multiple are queued while waiting for the previous write to finish then they can be written in a single batch as it is more efficient and will reduce latency.