BusyPoolException in Lagom with AWS Cassandra

Hey, everyone.

I’m running some fairly simple service (Lagom 1.6.0, Scala 2.13.1) with AWS Cassandra as write side and AWS Aurora Serverless as read side (since AWS Cassandra doesn’t currently support logged batches). The service is deployed into an AWS Kubernetes cluster, configured to use Kubernetes API for service discovery.

Everything works perfectly for a while… and then I start getting Cassandra’s BusyPoolException errors:

2020-03-24T16:26:40.187Z [warn] akka.stream.scaladsl.RestartWithBackoffSource [akkaAddress=akka://application@192.168.26.21:25520, sourceThread=application-akka.actor.default-dispatcher-6, akkaSource=RestartWithBackoffSource(akka://application), sourceActorSystem=application, akkaTimestamp=16:26:40.186UTC] - Restarting graph due to failure. stack_trace: 
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.BusyPoolException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Pool is busy (no available connection and the queue has reached its max size 0)))
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:50)
	at scala.util.Try$.apply(Try.scala:210)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.run(package.scala:50)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:47)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:47)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.BusyPoolException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Pool is busy (no available connection and the queue has reached its max size 0)))
	at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:283)
	at com.datastax.driver.core.RequestHandler.access$1200(RequestHandler.java:61)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:375)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.onFailure(RequestHandler.java:444)
	at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1015)
	at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
	at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1137)
	at com.google.common.util.concurrent.AbstractFuture.addListener(AbstractFuture.java:707)
	at com.google.common.util.concurrent.AbstractFuture$TrustedFuture.addListener(AbstractFuture.java:112)
	at com.google.common.util.concurrent.Futures.addCallback(Futures.java:996)
	at com.datastax.driver.core.GuavaCompatibility.addCallback(GuavaCompatibility.java:112)
	at com.datastax.driver.core.GuavaCompatibility.addCallback(GuavaCompatibility.java:100)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.query(RequestHandler.java:400)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:359)
	at com.datastax.driver.core.RequestHandler.startNewExecution(RequestHandler.java:139)
	at com.datastax.driver.core.RequestHandler.sendRequest(RequestHandler.java:121)
	at com.datastax.driver.core.SessionManager.execute(SessionManager.java:696)
	at com.datastax.driver.core.SessionManager.executeAsync(SessionManager.java:142)
	at akka.persistence.cassandra.query.EventsByTagStage$TagStageSession.selectEventsForBucket(EventsByTagStage.scala:87)
	at akka.persistence.cassandra.query.EventsByTagStage$$anon$1.query(EventsByTagStage.scala:318)
	at akka.persistence.cassandra.query.EventsByTagStage$$anon$1.preStart(EventsByTagStage.scala:270)
	at akka.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:306)
	at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:593)
	at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:701)
	at akka.stream.impl.fusing.ActorGraphInterpreter.finishShellRegistration(ActorGraphInterpreter.scala:744)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:759)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:784)
	at akka.actor.Actor.aroundReceive(Actor.scala:533)
	at akka.actor.Actor.aroundReceive$(Actor.scala:531)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
	at akka.actor.ActorCell.invoke(ActorCell.scala:543)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
	at akka.dispatch.Mailbox.run(Mailbox.scala:230)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
	... 4 more

Here’s some pertinent configuration:

akka.persistence {
  journal.plugin = cassandra-journal
  snapshot-store.plugin = cassandra-snapshot-store
}

akka {
  discovery.kubernetes-api {
    pod-label-selector = "run=%s"
  }

  management.cluster.bootstrap.contact-point-discovery {
    discovery-method = kubernetes-api
    required-contact-point-nr = 2
    service-name = "redacted"
  }
}

cassandra.default {
  # AWS Cassandra doesn't support QUORUM yet.
  read-consistency = "LOCAL_QUORUM"
  write-consistency = "LOCAL_QUORUM"
  replication-factor = 3

  ssl.truststore {
    path = "cassandra_truststore.jks"
    password = "amazon"
  }

  authentication {
    username = "redacted"
    password = "redacted"
  }
}

cassandra-journal {
  read-consistency = ${cassandra.default.read-consistency}
  write-consistency = ${cassandra.default.write-consistency}
  replication-factor = ${cassandra.default.replication-factor}

  ssl.truststore {
    path = ${cassandra.default.ssl.truststore.path}
    password = ${cassandra.default.ssl.truststore.password}
  }

  authentication {
    username = ${cassandra.default.authentication.username}
    password = ${cassandra.default.authentication.password}
  }
}

cassandra-query-journal {
  read-consistency = ${cassandra.default.read-consistency}
}

cassandra-snapshot-store {
  read-consistency = "ONE"
  write-consistency = "ONE"
  replication-factor = ${cassandra.default.replication-factor}

  ssl.truststore {
    path = ${cassandra.default.ssl.truststore.path}
    password = ${cassandra.default.ssl.truststore.password}
  }

  authentication {
    username = ${cassandra.default.authentication.username}
    password = ${cassandra.default.authentication.password}
  }
}

cassandra.default {
  session-provider = akka.persistence.cassandra.ConfigSessionProvider
  contact-points = ["cassandra.us-east-1.amazonaws.com"]
  port = 9142
}

cassandra-journal {
  session-provider = ${cassandra.default.session-provider}
  contact-points = ${cassandra.default.contact-points}
  port = ${cassandra.default.port}
}

cassandra-query-journal {
  session-provider = ${cassandra.default.session-provider}
  contact-points = ${cassandra.default.contact-points}
  port = ${cassandra.default.port}
}

cassandra-snapshot-store {
  session-provider = ${cassandra.default.session-provider}
  contact-points = ${cassandra.default.contact-points}
  port = ${cassandra.default.port}
}

Here’s another kind of exception that also can appear. Looks like Lagom can’t reconnect to AWS Cassandra, for some reason. Any ideas?

akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [HeroAggregate|1072225283], due to: Exception during recovery. Last known sequence number [0]. PersistenceId [HeroAggregate|1072225283], due to: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.TransportException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Connection has been closed))
	at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:221)
	at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:153)
	at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:90)
	at akka.persistence.typed.internal.ReplayingEvents.onMessage(ReplayingEvents.scala:65)
	at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:82)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
	at akka.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:157)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
	at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:123)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
	at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:123)
	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
	at akka.actor.ActorCell.invoke(ActorCell.scala:543)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
	at akka.dispatch.Mailbox.run(Mailbox.scala:230)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: akka.persistence.typed.internal.JournalFailureException: Exception during recovery. Last known sequence number [0]. PersistenceId [HeroAggregate|1072225283], due to: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.TransportException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Connection has been closed))
	at akka.persistence.typed.internal.ReplayingEvents.onRecoveryFailure(ReplayingEvents.scala:221)
	at akka.persistence.typed.internal.ReplayingEvents.onJournalResponse(ReplayingEvents.scala:143)
	... 35 common frames omitted
Caused by: java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.TransportException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Connection has been closed))
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:552)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:513)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:50)
	at scala.util.Try$.apply(Try.scala:210)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.run(package.scala:50)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:47)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: cassandra.us-east-1.amazonaws.com/3.83.168.151:9142 (com.datastax.driver.core.exceptions.TransportException: [cassandra.us-east-1.amazonaws.com/3.83.168.151:9142] Connection has been closed))
	at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:283)
	at com.datastax.driver.core.RequestHandler.access$1200(RequestHandler.java:61)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.findNextHostAndQuery(RequestHandler.java:375)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.retry(RequestHandler.java:557)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.processRetryDecision(RequestHandler.java:539)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onException(RequestHandler.java:941)
	at com.datastax.driver.core.Connection$Dispatcher.errorOutAllHandler(Connection.java:1340)
	at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1427)
	at com.datastax.driver.core.Connection$ConnectionCloseFuture.force(Connection.java:1408)
	at com.datastax.driver.core.Connection.defunct(Connection.java:585)
	at com.datastax.driver.core.Connection$ChannelCloseListener.operationComplete(Connection.java:1357)
	at com.datastax.driver.core.Connection$ChannelCloseListener.operationComplete(Connection.java:1347)
	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:577)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:570)
	at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:549)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:490)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:615)
	at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:604)
	at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
	at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
	at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1159)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:761)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:737)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:943)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:898)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1384)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:749)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:741)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:727)
	at io.netty.handler.ssl.SslHandler.forceFlush(SslHandler.java:2033)
	at io.netty.handler.ssl.SslHandler.flushIfNeeded(SslHandler.java:1307)
	at io.netty.handler.ssl.SslHandler.channelReadComplete0(SslHandler.java:1289)
	at io.netty.handler.ssl.SslHandler.channelReadComplete(SslHandler.java:1282)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:392)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelReadComplete(AbstractChannelHandlerContext.java:385)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelReadComplete(DefaultChannelPipeline.java:1427)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:405)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelReadComplete(AbstractChannelHandlerContext.java:392)
	at io.netty.channel.DefaultChannelPipeline.fireChannelReadComplete(DefaultChannelPipeline.java:937)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:168)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 common frames omitted

Ok, so apparently I cannot treat “cassandra.us-east-1.amazonaws.com” as a static contact point. I wonder why no one here pointed such a simple thing out, though.

Essentially, I need to subclass ConfigSessionProvider and override its lookupContactPoints method.

So, Akka’s Cassandra ConfigSessionProvider apparently resolves the contact points to IP addresses, which go stale after a while because of the way AWS Cassandra works. I have written a different ConfigSessionProvider that uses InetAddress.createUnresolved instead. But the same problem shows up. It’s like the Cassandra session created at the start is never replaced.

Any suggestions?

Ok, so it turned out neither of the above is the real problem. The real problem is that Lagom sets

connection-pool.pool-timeout-millis = 0

and the Datastax Java driver has this code:

    if (timeout == 0 || maxQueueSize == 0) {
      return Futures.immediateFailedFuture(new BusyPoolException(host.getSocketAddress(), 0));
    }

where timeout is set to

poolingOptions.getPoolTimeoutMillis()

So the solution is to set pool-timeout-millis to something non-zero. The DataStax default appears to be 5000.

1 Like

Thank you so much for the time and effort that you put into investigating this issue! You don’t know how much time and pain you have saved me!

When I ran into this problem, I received this puzzling error message:

Pool is busy (no available connection and the queue has reached its max size 0)

The max queue size of zero in the error message was extremely misleading, and caused me to waste hours going down blind alleys! But then I found your answer, and the the code snippet in the DataStax Java Driver that you found explained everything. Thanks so much again!