Akka Persistence Cassandra Plugin - initialising of multiple cassandra cluster instances

Hi,
We are using akka-persistence with cluster sharding. We aim to store our actor state and then separately to store the sharding state.
Reason for this is that in our PROD env we will have 2 akka clusters, running in each of our DC’s and configured so that the sharding state is configured for each DC and the actor state separated
to allow for the akka clusters to be run in a hot/standby configuration.

So the akka node has persistence for

  • actor-journal
  • actor-snapshot
  • sharding-journal
  • shardin-snapshot

Configuring akka using the akka.persistence.cassandra.ConfigSessionProvider, would result in 4 instances of the com.datastax.driver.core.Cluster. Datastax doc’s state that multiple Cluster instances should be avoided, given that a single instance is able to server multiple sessions.

We tried this by implementing our own SessionProvider as below

class CassandraSessionProvider extends SessionProvider {
  implicit class ListenableFutureToFuture[T](lf: ListenableFuture[T]) {
    def asScala: Future[T] = {
      val p = Promise[T]()
      Futures.addCallback(lf, new FutureCallback[T] {
        def onFailure(t: Throwable): Unit = p failure t
        def onSuccess(result: T): Unit    = p success result
      }, MoreExecutors.directExecutor())
      p.future
    }
  }
  override def connect()(implicit ec: ExecutionContext): Future[Session] = {
    Boot.getCassandraCluster().connectAsync().asScala
  }
}

But when starting our nodes we would see the following in the logs

2018-04-13 09:07:24,790 [DmEventStateSystem-akka.actor.default-dispatcher-23] ERROR a.c.s.PersistentShardCoordinator - Failed to persist event type [akka.cluster.sharding.ShardCoordinator$Internal$ShardRegionRegistered] with sequence number [12] for persistenceId [/sharding/eventStateFsmAggregateCoordinator].
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)

By making a change to allow a new Cluster instance, we no longer see the above problem

class CassandraSessionProvider extends SessionProvider {
  implicit class ListenableFutureToFuture[T](lf: ListenableFuture[T]) {
    def asScala: Future[T] = {
      val p = Promise[T]()
      Futures.addCallback(lf, new FutureCallback[T] {
        def onFailure(t: Throwable): Unit = p failure t
        def onSuccess(result: T): Unit    = p success result
      }, MoreExecutors.directExecutor())
      p.future
    }
  }
  override def connect()(implicit ec: ExecutionContext): Future[Session] = {
    Boot.getCassandraClusterBuilder().build().connectAsync().asScala
  }
}

Like I said the datastax docs recommends having a single Cluster instance (per JVM/Cassandra cluster) and allowing multiple Sessions, as we did in the first instance. But I can see that in some places in the plugin a close is issued on the cluster, which basically impacts on anything that has sessions hanging off it. I can guess that having each component in the plugin managing its own cluster instance simplifies things, but it does use un-necessary resources and impacts on initialisation times.

Are there any plans to re-think the approach taken in the plugin re. management of the cluster instance?

Thanks
Paul

We have an issue to track the fact that the plugin creates multiple Cluster objects (write journal, read journal, snapshot), it wasn’t by design but due to the way the configuration has evolved over time.

For 1.0 we to improve this and restructure the configuration for the plugin: https://github.com/akka/akka-persistence-cassandra/issues/242

Hi Christopher,

Thanks for that update. FYI I’m happy to contribute in anyway needed

Paul

Also note that it is currently somewhat cumbersome (but totally possible) to configure multiple Cassandra plugin instances. See issue https://github.com/akka/akka-persistence-cassandra/issues/81