Performance degrading with Persistent Actors using Cassandra

I am writing an application for proof of concept purposes (not PoC for Akka Persistence per se) using persistent actors with Cassandra.
I run this on my laptop (8 GB of memory,4 cpus)

I use a parent actor spawning children which are persistent actors. Each persistent actor models a simple aggregate root containing a couple of attributes and a Map of value type objects. The persistent actor is called CustomerActor and the parent actor EntryActor.
Now the application reads data using Slick and creates one EntryActor and sends messages to it. The messages are command style e.g. CreateCustomer and CreateAccount.
What I basically do is load data into memory using persistent actors, then execute some functionality on these.

My problem is that when I create more than small numbers of CustomerActors the system is slow and produces WARN log messages that I don’t know how to interpret correctly (and I have done some searching before posting here).

The log messsages from Cassandra:

cassandra_1                 | WARN  [Native-Transport-Requests-2] 2019-05-29 19:24:25,392 BatchStatement.java:301 - Batch for [attachments_journal.tag_scanning] is of size 10.547KiB, exceeding specified threshold of 5.000KiB by 5.547KiB.

and

cassandra_1                 | WARN  [Native-Transport-Requests-11] 2019-05-29 19:24:23,918 NoSpamLogger.java:94 - Unlogged batch covering 500 partitions detected against table [attachments_journal.tag_scanning]. You should use a logged batch for atomicity, or asynchronous writes for performance.

and from Akka:

19:23:08.235UTC WARN  a.p.cassandra.journal.TagWriter akka://poc-attachments/system/cassandra-journal/tagWrites/customer - Buffer for tagged events is getting too large (202), is Cassandra responsive? Are writes failing? If events are buffered for longer than the eventual-consistency-delay they won't be picked up by live queries. The oldest event in the buffer is offset: 30758d95-8247-11e9-9f0c-0b5fa4cbc546 (2019-05-29 19:23:07:881)

My application.conf:

akka.persistence.journal.plugin = "cassandra-journal"
akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
cassandra-journal.contact-points = ["127.0.0.1"]
cassandra-journal.keyspace = attachments_journal
cassandra-snapshot-store.keyspace = attachments__snapshot
cassandra-journal.events-by-tag.max-message-batch-size = 50

akka.actor.default-mailbox.stash-capacity=100

op-rabbit {
  topic-exchange-name = "poc.attachments"
  channel-dispatcher = "op-rabbit.default-channel-dispatcher"
  default-channel-dispatcher {
    # Dispatcher is the name of the event-based dispatcher
    type = Dispatcher

    # What kind of ExecutionService to use
    executor = "fork-join-executor"

    # Configuration for the fork join pool
    fork-join-executor {
      # Min number of threads to cap factor-based parallelism number to
      parallelism-min = 2

      # Parallelism (threads) ... ceil(available processors * factor)
      parallelism-factor = 2.0

      # Max number of threads to cap factor-based parallelism number to
      parallelism-max = 10
    }
    # Throughput defines the maximum number of messages to be
    # processed per actor before the thread jumps to the next actor.
    # Set to 1 for as fair as possible.
    throughput = 100
  }

  connection {
    hosts = ["localhost"]
    username = "mds"
    password = "***"
    connection-timeout = 1s
    port = 5672
  }
}

akka {
  loglevel = "DEBUG"
  //stdout-loglevel = "INFO"
  loggers = ["akka.event.slf4j.Slf4jLogger"]

  }
}


poc.attachments {
  default_rabbit_concurrency =  1
}

Some code in the persistent actor:

val receiveCommand: Receive = {
    case attr : CustomerAttributes => {
      log.debug("CustomerActor at {} Received customerattributes command: {}",self.path.name, attr)
      persist(Tagged(attr, Set("customer"))) {
        attr =>
          updateAttributes(attr.payload.asInstanceOf[CustomerAttributes])
      }
    }

    case cce : CustomerCreated => {
      if( cce.id % 100 == 0){
        log.info("I {} got CustomerCreated message: {}", self.path.name, cce)
      }
      persistAsync(Tagged(cce, Set("customer"))) {
        taggedcce => setInitialState(taggedcce.payload.asInstanceOf[CustomerCreated])
      }
    }
    case acc : AccountCreated => {
      log.debug("CustomerActor got accountcreated message: {}", acc)
      if(acc.customerId % 100 == 0) {
        log.info("Added account {} to customer {}",acc.accountNumber, acc.customerId )
      }
      persistAsync(Tagged(acc, Set("customer"))) {
        taggedacc => addAccount(taggedacc.payload.asInstanceOf[AccountCreated])
      }
    }

From the application runing db query and sending messages to the EntryActor:

 val joinQuery1 = for {
        a <- accounts
        c <- customers if c.customerType === "F" && c.id < highestCustomerId + 1 && c.country === a.country && c.id === a.customerId
      } yield (c, a)

      println(s"The join query is : ${joinQuery1.result.statements}")
      val accountsAction = joinQuery1.result
      val publisher = db.stream(accountsAction)
      val fut1 = publisher.foreach
      {
         it => entryActor ! accountCreatedForVAccountC(it._2)
      }

From the EntryActor creating CustomerActor (the persistent actor):

override def receive: Receive =  {
    case c: CustomerCreated => {
      createCustomer(c)
    }
  
  def createCustomer(c: CustomerCreated): Unit = {
    context.child(CustomerActor.name(c.country, c.id.toString)) match {
      case Some(value) => {
        log.warning("**************************** Create customer called for already existing customer: %s".format( c))
      }
      case None => {
        val cc = context.actorOf(Props(classOf[CustomerActor], c.country, c.id),  CustomerActor.name(c.country, c.id.toString))
        customers(CustomerActor.name(c.country, c.id.toString)) = cc
        cc ! c
        context.watch(cc)
      }
    }
  }

I realize the code is probably not so important but I figured I throw in a bit more rather that a bit too little material.
I’m running Cassandra using Docker image cassandra:3.11

cassandra_1 | WARN [Native-Transport-Requests-11] 2019-05-29 19:24:23,918 NoSpamLogger.java:94 - Unlogged batch covering 500 partitions detected against table [attachments_journal.tag_scanning]. You should use a logged batch for atomicity, or asynchronous writes for performance.

This is unlikely to be the cause of the issue but there is an open PR to fix it in the cassandra plugin.

19:23:08.235UTC WARN a.p.cassandra.journal.TagWriter akka://poc-attachments/system/cassandra-journal/tagWrites/customer - Buffer for tagged events is getting too large (202), is Cassandra responsive? Are writes failing? If events are buffered for longer than the eventual-consistency-delay they won’t be picked up by live queries. The oldest event in the buffer is offset: 30758d95-8247-11e9-9f0c-0b5fa4cbc546 (2019-05-29 19:23:07:881)

There is only ever one outstanding write per tag for the tag_views table. 202 isn’t that big if you have a large number of tagged events being persisted but does imply that writes to cassandra are taking quite some time. You can log the query latency with: https://github.com/akka/akka-persistence-cassandra/blob/master/core/src/main/resources/reference.conf#L95 to see if that really is a problem.

Both of these warnings could be red herring. What measurement are you taking to consider the system slow? I’d work back from there rather than these warnings.

Thanks, what I observe is that the performance changes and eventually essentially does not work. I have to go back to give more detailed info. (My knowledge about Cassandra quite limited too.)
I’ll checkout the logging that you pointed out, thanks for that.

Hi again, I got a couple of the following

DEBUG [ScheduledTasks:1] 2019-06-07 14:12:23,237 MonitoringTask.java:173 - 1 operations were slow in the last 5003 msecs:
<SELECT * FROM attachments_journal.messages WHERE
 persistence_id, partition_nr = SE_42614, 0 AND  ORDER BY (sequence_nr DESC, timestamp DESC, timebucket DESC) LIMIT 1>, time 4689 msec - slow timeout 500 msec
DEBUG [CompactionExecutor:3] 2019-06-07 14:12:29,243 CompactionTask.java:255 - Compacted (3774e760-892e-11e9-bb0c-c584f6efb469) 4 sstables to [/var/lib/cassandra/data/attachments_journal/messages-a53fdb70892d11e9bb0cc584f6efb469/md-5-big,] to level=0.  17.098MiB to 17.401MiB (~101% of original) in 28,896ms.  Read Throughput = 605.878KiB/s, Write Throughput = 616.636KiB/s, Row Throughput = ~7,662/s.  83,483 total partitions merged to 48,479.  Partition merge counts were {1:13558, 2:34838, 3:83, } 

but they don’t account for the slowness.

I also get the following

WARN  [Native-Transport-Requests-4] 2019-06-07 14:12:51,111 NoSpamLogger.java:94 - Unlogged batch covering 500 partitions detected against table [attachments_journal.tag_scanning]. You should use a logged batch for atomicity, or asynchronous writes for performance.

after a little while and also the reported buffer just increases :

14:43:38.405UTC WARN  a.p.cassandra.journal.TagWriter akka://poc-attachments/system/cassandra-journal/tagWrites/customer - Buffer for tagged events is getting too large (233206), is Cassandra responsive? Are writes failing? If events are buffered for longer than the eventual-consistency-delay they won't be picked up by live queries. The oldest event in the buffer is offset: 704a3de0-8930-11e9-be0e-1f4290fa8758 (2019-06-07 14:27:54:686)

At the end of the execution I get

WARN  [CompactionExecutor:12] 2019-06-07 14:53:55,047 BigTableWriter.java:211 - Writing large partition attachments_journal/tag_views:customer:1559916000000 (225.920MiB) to sstable /var/lib/cassandra/data/attachments_journal/tag_views-a5c86c60892d11e9bb0cc584f6efb469/md-15-big-Data.db
WARN  [CompactionExecutor:12] 2019-06-07 14:54:20,954 BigTableWriter.java:211 - Writing large partition attachments_journal/tag_views:customer:1559916000000 (242.812MiB) to sstable /var/lib/cassandra/data/attachments_journal/tag_views-a5c86c60892d11e9bb0cc584f6efb469/md-20-big-Data.db
WARN  [CompactionExecutor:11] 2019-06-07 14:54:24,691 BigTableWriter.java:211 - Writing large partition attachments_journal/tag_views:customer:1559916000000 (818.242MiB) to sstable /var/lib/cassandra/data/attachments_journal/tag_views-a5c86c60892d11e9bb0cc584f6efb469/md-21-big-Data.db

I did run a the code both with jdbc journal (using Postgres) and cassandra journal for comparison and with the jdbc journal the execution has more or less constant speed but with the Cassandra journal the throughput degrades significantly after a while, at least a factor 10 worse.

I might jump to conclusions but since the WARN messages appear after a while and the performance degrades after a while I suspect that the WARN messages may convey something of interest.