Multiple Topics in one Lagom service - Timeouts, service cannot start

Hello, we are creating 2 persistent entities in a single Lagom service, and defining 2 Topics for them.
With 1 Topic, everything works. If I add the 2. topic to the service descriptor’s withTopic call, it causes the errors below (getting them indefinitely):

java.lang.IllegalArgumentException: Unable to create read journal plugin instance for path [cassandra-query-journal], class [akka.persistence.cassandra.query.CassandraReadJournalProvider]!
	at akka.persistence.PersistencePlugin$$anonfun$akka$persistence$PersistencePlugin$$createPlugin$4.applyOrElse(PersistencePlugin.scala:96)
	at akka.persistence.PersistencePlugin$$anonfun$akka$persistence$PersistencePlugin$$createPlugin$4.applyOrElse(PersistencePlugin.scala:90)
	at scala.util.Failure.recoverWith(Try.scala:242)
	at akka.persistence.PersistencePlugin.akka$persistence$PersistencePlugin$$createPlugin(PersistencePlugin.scala:90)
	at akka.persistence.PersistencePlugin$$anon$1.createExtension(PersistencePlugin.scala:59)
	at akka.persistence.PersistencePlugin$$anon$1.createExtension(PersistencePlugin.scala:57)
	at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1141)
	at akka.actor.ExtensionId.apply(Extension.scala:77)
	at akka.actor.ExtensionId.apply$(Extension.scala:76)
	at akka.persistence.PersistencePlugin$$anon$1.apply(PersistencePlugin.scala:57)
	at akka.persistence.PersistencePlugin.pluginFor(PersistencePlugin.scala:55)
	at akka.persistence.query.PersistenceQuery.readJournalFor(PersistenceQuery.scala:53)
	at akka.persistence.query.PersistenceQuery.readJournalFor(PersistenceQuery.scala:60)
	at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry.$anonfun$eventsByTagQuery$1(AbstractPersistentEntityRegistry.scala:47)
	at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry$$Lambda$2407.00000000640A9D80.apply(Unknown Source)
	at scala.Option.map(Option.scala:243)
	at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry.eventsByTagQuery$lzycompute(AbstractPersistentEntityRegistry.scala:47)
	at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry.eventsByTagQuery(AbstractPersistentEntityRegistry.scala:46)
	at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry.eventStream(AbstractPersistentEntityRegistry.scala:146)
	at hu.organicsoft.payment.impl.paymentrequest.PaymentEventTopicProducer.lambda$paymentEvents$0(PaymentEventTopicProducer.java:31)
	at hu.organicsoft.payment.impl.paymentrequest.PaymentEventTopicProducer$$Lambda$1831.00000000E3F04A00.apply(Unknown Source)
	at com.lightbend.lagom.internal.javadsl.broker.kafka.JavadslRegisterTopicProducers.$anonfun$new$4(JavadslRegisterTopicProducers.scala:72)
	at com.lightbend.lagom.internal.javadsl.broker.kafka.JavadslRegisterTopicProducers$$Lambda$1832.00000000E3F04F70.apply(Unknown Source)
	at com.lightbend.lagom.internal.broker.kafka.TopicProducerActor$$anonfun$receive$1.$anonfun$applyOrElse$2(TopicProducerActor.scala:121)
	at com.lightbend.lagom.internal.broker.kafka.TopicProducerActor$$anonfun$receive$1$$Lambda$2360.000000007802CF40.apply(Unknown Source)
	at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:624)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:501)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:599)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:768)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:783)
	at akka.actor.Actor.aroundReceive(Actor.scala:532)
	at akka.actor.Actor.aroundReceive$(Actor.scala:530)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
	at akka.actor.ActorCell.invoke(ActorCell.scala:543)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
	at akka.dispatch.Mailbox.run(Mailbox.scala:230)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.util.concurrent.TimeoutException: Future timed out after [20 seconds]
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:212)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:225)
	at scala.concurrent.Await$.$anonfun$result$1(package.scala:200)
	at scala.concurrent.Await$$$Lambda$269.00000000E1BB5520.apply(Unknown Source)
	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:173)
	at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313)
	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:171)
	at scala.concurrent.Await$.result(package.scala:124)
	at akka.stream.SystemMaterializer.createAdditionalLegacySystemMaterializer(SystemMaterializer.scala:80)
	at akka.stream.ActorMaterializer$.apply(ActorMaterializer.scala:76)
	at akka.stream.ActorMaterializer$.apply(ActorMaterializer.scala:51)
	at akka.persistence.cassandra.query.scaladsl.CassandraReadJournal.<init>(CassandraReadJournal.scala:118)
	at akka.persistence.cassandra.query.CassandraReadJournalProvider.liftedTree1$1(CassandraReadJournalProvider.scala:16)
	at akka.persistence.cassandra.query.CassandraReadJournalProvider.<init>(CassandraReadJournalProvider.scala:15)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:40)
	at akka.actor.ReflectiveDynamicAccess$$Lambda$172.00000000E06AE590.apply(Unknown Source)
	at scala.util.Try$.apply(Try.scala:212)
	at akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:35)
	at akka.persistence.PersistencePlugin.akka$persistence$PersistencePlugin$$instantiate$1(PersistencePlugin.scala:79)
	at akka.persistence.PersistencePlugin$$anonfun$akka$persistence$PersistencePlugin$$createPlugin$1.applyOrElse(PersistencePlugin.scala:86)
	at akka.persistence.PersistencePlugin$$anonfun$akka$persistence$PersistencePlugin$$createPlugin$1.applyOrElse(PersistencePlugin.scala:84)
	at scala.util.Failure.recoverWith(Try.scala:242)
	at akka.persistence.PersistencePlugin.akka$persistence$PersistencePlugin$$createPlugin(PersistencePlugin.scala:84)
	... 41 more

Also, the memory consumption and processor usage gets very high (triple of the normal usage with one topic).

It is also interesting the timeout says 20 secs, but these errors are happening within milliseconds of eachother.

We are using Lagom 1.6.0-M7 (javadsl) with scala version 2.13.

Ahhh it might be caused by https://github.com/akka/akka/pull/28046… I’ll try a version upgrade

1 Like

version upgrade solved it

1 Like

Just for the record: lagom version is bumped from 1.6.0-M7 to 1.6.0.

Another service produced this failure… within the same project… :slight_smile: Thanks David