Hello, we are creating 2 persistent entities in a single Lagom service, and defining 2 Topics for them.
With 1 Topic, everything works. If I add the 2. topic to the service descriptor’s withTopic call, it causes the errors below (getting them indefinitely):
java.lang.IllegalArgumentException: Unable to create read journal plugin instance for path [cassandra-query-journal], class [akka.persistence.cassandra.query.CassandraReadJournalProvider]!
at akka.persistence.PersistencePlugin$$anonfun$akka$persistence$PersistencePlugin$$createPlugin$4.applyOrElse(PersistencePlugin.scala:96)
at akka.persistence.PersistencePlugin$$anonfun$akka$persistence$PersistencePlugin$$createPlugin$4.applyOrElse(PersistencePlugin.scala:90)
at scala.util.Failure.recoverWith(Try.scala:242)
at akka.persistence.PersistencePlugin.akka$persistence$PersistencePlugin$$createPlugin(PersistencePlugin.scala:90)
at akka.persistence.PersistencePlugin$$anon$1.createExtension(PersistencePlugin.scala:59)
at akka.persistence.PersistencePlugin$$anon$1.createExtension(PersistencePlugin.scala:57)
at akka.actor.ActorSystemImpl.registerExtension(ActorSystem.scala:1141)
at akka.actor.ExtensionId.apply(Extension.scala:77)
at akka.actor.ExtensionId.apply$(Extension.scala:76)
at akka.persistence.PersistencePlugin$$anon$1.apply(PersistencePlugin.scala:57)
at akka.persistence.PersistencePlugin.pluginFor(PersistencePlugin.scala:55)
at akka.persistence.query.PersistenceQuery.readJournalFor(PersistenceQuery.scala:53)
at akka.persistence.query.PersistenceQuery.readJournalFor(PersistenceQuery.scala:60)
at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry.$anonfun$eventsByTagQuery$1(AbstractPersistentEntityRegistry.scala:47)
at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry$$Lambda$2407.00000000640A9D80.apply(Unknown Source)
at scala.Option.map(Option.scala:243)
at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry.eventsByTagQuery$lzycompute(AbstractPersistentEntityRegistry.scala:47)
at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry.eventsByTagQuery(AbstractPersistentEntityRegistry.scala:46)
at com.lightbend.lagom.internal.javadsl.persistence.AbstractPersistentEntityRegistry.eventStream(AbstractPersistentEntityRegistry.scala:146)
at hu.organicsoft.payment.impl.paymentrequest.PaymentEventTopicProducer.lambda$paymentEvents$0(PaymentEventTopicProducer.java:31)
at hu.organicsoft.payment.impl.paymentrequest.PaymentEventTopicProducer$$Lambda$1831.00000000E3F04A00.apply(Unknown Source)
at com.lightbend.lagom.internal.javadsl.broker.kafka.JavadslRegisterTopicProducers.$anonfun$new$4(JavadslRegisterTopicProducers.scala:72)
at com.lightbend.lagom.internal.javadsl.broker.kafka.JavadslRegisterTopicProducers$$Lambda$1832.00000000E3F04F70.apply(Unknown Source)
at com.lightbend.lagom.internal.broker.kafka.TopicProducerActor$$anonfun$receive$1.$anonfun$applyOrElse$2(TopicProducerActor.scala:121)
at com.lightbend.lagom.internal.broker.kafka.TopicProducerActor$$anonfun$receive$1$$Lambda$2360.000000007802CF40.apply(Unknown Source)
at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:624)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:501)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:599)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:768)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:783)
at akka.actor.Actor.aroundReceive(Actor.scala:532)
at akka.actor.Actor.aroundReceive$(Actor.scala:530)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:690)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
at akka.actor.ActorCell.invoke(ActorCell.scala:543)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
at akka.dispatch.Mailbox.run(Mailbox.scala:230)
at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.util.concurrent.TimeoutException: Future timed out after [20 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait0(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:225)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:200)
at scala.concurrent.Await$$$Lambda$269.00000000E1BB5520.apply(Unknown Source)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:173)
at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3313)
at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:171)
at scala.concurrent.Await$.result(package.scala:124)
at akka.stream.SystemMaterializer.createAdditionalLegacySystemMaterializer(SystemMaterializer.scala:80)
at akka.stream.ActorMaterializer$.apply(ActorMaterializer.scala:76)
at akka.stream.ActorMaterializer$.apply(ActorMaterializer.scala:51)
at akka.persistence.cassandra.query.scaladsl.CassandraReadJournal.<init>(CassandraReadJournal.scala:118)
at akka.persistence.cassandra.query.CassandraReadJournalProvider.liftedTree1$1(CassandraReadJournalProvider.scala:16)
at akka.persistence.cassandra.query.CassandraReadJournalProvider.<init>(CassandraReadJournalProvider.scala:15)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at akka.actor.ReflectiveDynamicAccess.$anonfun$createInstanceFor$1(ReflectiveDynamicAccess.scala:40)
at akka.actor.ReflectiveDynamicAccess$$Lambda$172.00000000E06AE590.apply(Unknown Source)
at scala.util.Try$.apply(Try.scala:212)
at akka.actor.ReflectiveDynamicAccess.createInstanceFor(ReflectiveDynamicAccess.scala:35)
at akka.persistence.PersistencePlugin.akka$persistence$PersistencePlugin$$instantiate$1(PersistencePlugin.scala:79)
at akka.persistence.PersistencePlugin$$anonfun$akka$persistence$PersistencePlugin$$createPlugin$1.applyOrElse(PersistencePlugin.scala:86)
at akka.persistence.PersistencePlugin$$anonfun$akka$persistence$PersistencePlugin$$createPlugin$1.applyOrElse(PersistencePlugin.scala:84)
at scala.util.Failure.recoverWith(Try.scala:242)
at akka.persistence.PersistencePlugin.akka$persistence$PersistencePlugin$$createPlugin(PersistencePlugin.scala:84)
... 41 more
Also, the memory consumption and processor usage gets very high (triple of the normal usage with one topic).
It is also interesting the timeout says 20 secs, but these errors are happening within milliseconds of eachother.
We are using Lagom 1.6.0-M7 (javadsl) with scala version 2.13.