BusyPoolException in Lagom with AWS Cassandra

Hey, everyone.

I’m running some fairly simple service (Lagom 1.6.0, Scala 2.13.1) with AWS Cassandra as write side and AWS Aurora Serverless as read side (since AWS Cassandra doesn’t currently support logged batches). The service is deployed into an AWS Kubernetes cluster, configured to use Kubernetes API for service discovery.

Everything works perfectly for a while… and then I start getting Cassandra’s BusyPoolException errors:

2020-03-24T16:26:40.187Z [warn] akka.stream.scaladsl.RestartWithBackoffSource [akkaAddress=akka://application@192.168.26.21:25520, sourceThread=application-akka.actor.default-dispatcher-6, akkaSource=RestartWithBackoffSource(akka://application), sourceActorSystem=application, akkaTimestamp=16:26:40.186UTC] - Restarting graph due to failure. stack_trace: 
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.BusyPoolException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Pool is busy (no available connection and the queue has reached its max size 0)))
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:50)
	at scala.util.Try$.apply(Try.scala:210)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.run(package.scala:50)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:47)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:47)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.BusyPoolException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Pool is busy (no available connection and the queue has reached its max size 0)))
	at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:283)
	at com.datastax.driver.core.RequestHandler.access$1200(RequestHandler.java:61)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:375)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.onFailure(RequestHandler.java:444)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1015)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1137)
	at com.google.common.util.concurrent.AbstractFuture.addListener(AbstractFuture.java:707)
	at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.addListener(AbstractFuture.java:112)
	at com.google.common.util.concurrent.Futures.addCallback(Futures.java:996)
	at com.datastax.driver.core.GuavaCompatibility.addCallback(GuavaCompatibility.java:112)
	at com.datastax.driver.core.GuavaCompatibility.addCallback(GuavaCompatibility.java:100)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:400)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:359)
	at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:139)
	at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:121)
	at com.datastax.driver.core.SessionManager.execute(SessionManager.java:696)
	at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:142)
	at akka.persistence.cassandra.query.EventsByTagStage$TagStageSession.selectEventsForBucket(EventsByTagStage.scala:87)
	at akka.persistence.cassandra.query.EventsByTagStage$$anon$1.query(EventsByTagStage.scala:318)
	at akka.persistence.cassandra.query.EventsByTagStage$$anon$1.preStart(EventsByTagStage.scala:270)
	at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:306)
	at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:593)
	at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:701)
	at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:744)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:759)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)
	at akka.actor.Actor.aroundReceive(Actor.scala:533)
	at akka.actor.Actor.aroundReceive$(Actor.scala:531)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
	at akka.actor.ActorCell.invoke(ActorCell.scala:543)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
	at akka.dispatch.Mailbox.run(Mailbox.scala:230)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
	... 4 more

Here’s some pertinent configuration:

akka.persistence {
  journal.plugin = cassandra-journal
  snapshot-store.plugin = cassandra-snapshot-store
}

akka {
  discovery.kubernetes-api {
    pod-label-selector = "run=%s"
  }

  management.cluster.bootstrap.contact-point-discovery {
    discovery-method = kubernetes-api
    required-contact-point-nr = 2
    service-name = "redacted"
  }
}

cassandra.default {
  # AWS Cassandra doesn't support QUORUM yet.
  read-consistency = "LOCAL_QUORUM"
  write-consistency = "LOCAL_QUORUM"
  replication-factor = 3

  ssl.truststore {
    path = "cassandra_truststore.jks"
    password = "amazon"
  }

  authentication {
    username = "redacted"
    password = "redacted"
  }
}

cassandra-journal {
  read-consistency = ${cassandra.default.read-consistency}
  write-consistency = ${cassandra.default.write-consistency}
  replication-factor = ${cassandra.default.replication-factor}

  ssl.truststore {
    path = ${cassandra.default.ssl.truststore.path}
    password = ${cassandra.default.ssl.truststore.password}
  }

  authentication {
    username = ${cassandra.default.authentication.username}
    password = ${cassandra.default.authentication.password}
  }
}

cassandra-query-journal {
  read-consistency = ${cassandra.default.read-consistency}
}

cassandra-snapshot-store {
  read-consistency = "ONE"
  write-consistency = "ONE"
  replication-factor = ${cassandra.default.replication-factor}

  ssl.truststore {
    path = ${cassandra.default.ssl.truststore.path}
    password = ${cassandra.default.ssl.truststore.password}
  }

  authentication {
    username = ${cassandra.default.authentication.username}
    password = ${cassandra.default.authentication.password}
  }
}

cassandra.default {
  session-provider = akka.persistence.cassandra.ConfigSessionProvider
  contact-points = ["cassandra.us-east-1.amazonaws.com"]
  port = 9142
}

cassandra-journal {
  session-provider = ${cassandra.default.session-provider}
  contact-points = ${cassandra.default.contact-points}
  port = ${cassandra.default.port}
}

cassandra-query-journal {
  session-provider = ${cassandra.default.session-provider}
  contact-points = ${cassandra.default.contact-points}
  port = ${cassandra.default.port}
}

cassandra-snapshot-store {
  session-provider = ${cassandra.default.session-provider}
  contact-points = ${cassandra.default.contact-points}
  port = ${cassandra.default.port}
}

Here’s another kind of exception that also can appear. Looks like Lagom can’t reconnect to AWS Cassandra, for some reason. Any ideas?

akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [HeroAggregate|1072225283], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [HeroAggregate|1072225283], due to: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.TransportException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Connection has been closed))
	at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:221)
	at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:153)
	at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:90)
	at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:65)
	at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:82)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
	at akka.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:157)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
	at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:123)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
	at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:123)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
	at akka.actor.ActorCell.invoke(ActorCell.scala:543)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
	at akka.dispatch.Mailbox.run(Mailbox.scala:230)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [HeroAggregate|1072225283], due to: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.TransportException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Connection has been closed))
	at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:221)
	at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:143)
	... 35 common frames omitted
Caused by: java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.TransportException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Connection has been closed))
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:50)
	at scala.util.Try$.apply(Try.scala:210)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.run(package.scala:50)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:47)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.TransportException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Connection has been closed))
	at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:283)
	at com.datastax.driver.core.RequestHandler.access$1200(RequestHandler.java:61)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:375)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.retry(RequestHandler.java:557)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.processRetryDecision(RequestHandler.java:539)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onException(RequestHandler.java:941)
	at com.datastax.driver.core.Connection$Dispatcher.errorOutAllHandler(Connection.java:1340)
	at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1427)
	at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1408)
	at com.datastax.driver.core.Connection.defunct(Connection.java:585)
	at com.datastax.driver.core.Connection$ChannelCloseListener.operationComplete(Connection.java:1357)
	at com.datastax.driver.core.Connection$ChannelCloseListener.operationComplete(Connection.java:1347)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1159)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:761)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:737)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:943)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:898)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1384)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
	at io.netty.handler.ssl.SslHandler.forceFlush(SslHandler.java:2033)
	at io.netty.handler.ssl.SslHandler.flushIfNeeded(SslHandler.java:1307)
	at io.netty.handler.ssl.SslHandler.channelReadComplete0(SslHandler.java:1289)
	at io.netty.handler.ssl.SslHandler.channelReadComplete(SslHandler.java:1282)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:392)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelReadComplete(AbstractChannelHandlerContext.java:385)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelReadComplete(DefaultChannelPipeline.java:1427)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:392)
	at io.netty.channel.DefaultChannelPipeline.fireChannelReadComplete(DefaultChannelPipeline.java:937)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:168)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 common frames omitted

Ok, so apparently I cannot treat “cassandra.us-east-1.amazonaws.com” as a static contact point. I wonder why no one here pointed such a simple thing out, though.

Essentially, I need to subclass ConfigSessionProvider and override its lookupContactPoints method.

So, Akka’s Cassandra ConfigSessionProvider apparently resolves the contact points to IP addresses, which go stale after a while because of the way AWS Cassandra works. I have written a different ConfigSessionProvider that uses InetAddress.createUnresolved instead. But the same problem shows up. It’s like the Cassandra session created at the start is never replaced.

Any suggestions?