Problems using MultiJVM Test + Persistence Plugin Proxy

I’m trying to use the Persistence Plugin Proxy with the MultiJVM test and I’m near to :exploding_head:.

What I understand from the documentation and from the Spec that I found is that:

  1. I must initialize the proxy in one of the nodes. Node0 in my case.
  2. I must point the rest of the nodes to the previous node.

This is my configuration for testing (I called it application-default-multijvm.conf):

akka {
  log-dead-letters = 10000
  log-dead-letters-during-shutdown = on
  loglevel = "DEBUG"

  persistence {
    journal {
      plugin = "akka.persistence.journal.proxy"
      proxy.target-journal-plugin = "akka.persistence.journal.inmem"
    }
    snapshot-store {
      plugin = "akka.persistence.snapshot-store.proxy"
      proxy.target-snapshot-store-plugin = "akka.persistence.snapshot-store.local"
      local.dir = "target/snapshots/default"
    }
  }
}

And this is the node’s config in my test:

object GetNearestNodeSessionSpecConfig extends MultiNodeConfig {

  val node0 = role("node0")
  val node1 = role("node1")
  val node2 = role("node2")

  nodeConfig(node0)(
    ConfigFactory.parseString("""
      |akka.remote.artery.canonical.port = 2551
      |akka.extensions = ["akka.persistence.journal.PersistencePluginProxyExtension"]
      |akka.persistence {
      |  journal.proxy.start-target-journal = on
      |  snapshot-store.proxy.start-target-snapshot-store = on
      |}
    """.stripMargin)
  )

  nodeConfig(node1)(
    ConfigFactory.parseString(s"""
      |akka.remote.artery.canonical.port = 2552
      |akka.extensions = ["akka.persistence.Persistence"]
      |akka.persistence.journal.auto-start-journals = [""]
      |akka.persistence.journal.proxy.target-journal-address = "akka://GetNearestNodeSessionSpec@localhost:2551"
      |akka.persistence.snapshot-store.proxy.target-snapshot-store-address = "akka://GetNearestNodeSessionSpec@localhost:2551"
    """.stripMargin)
  )

  nodeConfig(node2)(
    ConfigFactory.parseString(s"""
      |akka.remote.artery.canonical.port = 2553
      |akka.extensions = ["akka.persistence.Persistence"]
      |akka.persistence.journal.auto-start-journals = [""]
      |akka.persistence.journal.proxy.target-journal-address = "akka://GetNearestNodeSessionSpec@localhost:2551"
      |akka.persistence.snapshot-store.proxy.target-snapshot-store-address = "akka://GetNearestNodeSessionSpec@localhost:2551"
    """.stripMargin)
  )

  commonConfig(
    ConfigFactory
      .parseString(s"""
      akka.cluster.seed-nodes = [ "akka://GetNearestNodeSessionSpec@localhost:2551" ]
      akka.persistence.snapshot-store.local.dir = "target/snapshots/GetNearestNodeSessionSpec"
    """)
      .withFallback(ConfigFactory.load("application-default-multijvm.conf"))
  )

}

And I get a Target snapshot-store not initialized. Use..... error in all nodes except in the one with the proxy:

JVM-2] [2020-03-15 12:41:27,240] [ERROR] [com.simplexportal.spatial.index.grid.tile.actor.TileIndexActor$] [GetNearestNodeSessionSpec-akka.actor.default-dispatcher-34] [akka://GetNearestNodeSessionSpec/system/sharding/TileEntity/5/2_2] - Supervisor RestartSupervisor saw failure: Exception during recovery from snapshot. PersistenceId [Tile_GetNearestNodeSessionTest|2_2]. Target snapshot-store not initialized. Use `PersistencePluginProxy.setTargetLocation` or set `target-snapshot-store-address`

Anyone can help me or point me in the right direction to find where the problem is?

Full Log error:

[JVM-3] [2020-03-15 12:41:27,520] [ERROR] [com.simplexportal.spatial.index.grid.tile.actor.TileIndexActor$] [GetNearestNodeSessionSpec-akka.actor.default-dispatcher-27] [akka://GetNearestNodeSessionSpec/system/sharding/TileEntity/7/3_3] - Supervisor RestartSupervisor saw failure: Exception during recovery from snapshot. PersistenceId [Tile_GetNearestNodeSessionTest|3_3]. Target snapshot-store not initialized. Use `PersistencePluginProxy.setTargetLocation` or set `target-snapshot-store-address`
[JVM-3] akka.persistence.typed.internal.JournalFailureException: Exception during recovery from snapshot. PersistenceId [Tile_GetNearestNodeSessionTest|3_3]. Target snapshot-store not initialized. Use `PersistencePluginProxy.setTargetLocation` or set `target-snapshot-store-address`
[JVM-3] 	at akka.persistence.typed.internal.ReplayingSnapshot.onRecoveryFailure(ReplayingSnapshot.scala:102)
[JVM-3] 	at akka.persistence.typed.internal.ReplayingSnapshot.onSnapshotterResponse(ReplayingSnapshot.scala:148)
[JVM-3] 	at akka.persistence.typed.internal.ReplayingSnapshot.$anonfun$createBehavior$1(ReplayingSnapshot.scala:59)
[JVM-3] 	at akka.actor.typed.internal.BehaviorImpl$ReceiveMessageBehavior.receive(BehaviorImpl.scala:152)
[JVM-3] 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
[JVM-3] 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
[JVM-3] 	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
[JVM-3] 	at akka.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:157)
[JVM-3] 	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
[JVM-3] 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
[JVM-3] 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
[JVM-3] 	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
[JVM-3] 	at akka.actor.typed.internal.RestartSupervisor.aroundReceive(Supervision.scala:262)
[JVM-3] 	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
[JVM-3] 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
[JVM-3] 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
[JVM-3] 	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
[JVM-3] 	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85)
[JVM-3] 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
[JVM-3] 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
[JVM-3] 	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
[JVM-3] 	at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:123)
[JVM-3] 	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
[JVM-3] 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
[JVM-3] 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
[JVM-3] 	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:125)
[JVM-3] 	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:105)
[JVM-3] 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
[JVM-3] 	at akka.actor.ActorCell.invoke(ActorCell.scala:543)
[JVM-3] 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
[JVM-3] 	at akka.dispatch.Mailbox.run(Mailbox.scala:230)
[JVM-3] 	at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
[JVM-3] 	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
[JVM-3] 	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
[JVM-3] 	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
[JVM-3] 	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
[JVM-3] Caused by: java.util.concurrent.TimeoutException: Target snapshot-store not initialized. Use `PersistencePluginProxy.setTargetLocation` or set `target-snapshot-store-address`
[JVM-3] 	at akka.persistence.journal.PersistencePluginProxy.akka$persistence$journal$PersistencePluginProxy$$timeoutException(PersistencePluginProxy.scala:128)
[JVM-3] 	at akka.persistence.journal.PersistencePluginProxy$$anonfun$initTimedOut$1.applyOrElse(PersistencePluginProxy.scala:210)
[JVM-3] 	at akka.actor.Actor.aroundReceive(Actor.scala:534)
[JVM-3] 	at akka.actor.Actor.aroundReceive$(Actor.scala:532)
[JVM-3] 	at akka.persistence.journal.PersistencePluginProxy.aroundReceive(PersistencePluginProxy.scala:74)
[JVM-3] 	... 9 common frames omitted

Environment:
Akka 2.6.4
Java: openjdk version “1.8.0_242”
Ubuntu 19.10

Did you call PersistencePluginProxy.setTargetLocation
that the exception refers to?

Ah, you try to do it in config with target-snapshot-store-address, I missed that.

Perhaps dump the config in the beginning of your test and look that the actual config used is what you expect as that could perhaps explain it.

I can’t see that we are using a proxied journal or snapshot store in any of the internal multi-jvm tests of Akka, but here is a regular scalatest that exercises the proxy, for reference: https://github.com/akka/akka/blob/6bf20f4117a8c27f8bd412228424caafe76a89eb/akka-persistence-shared/src/test/scala/akka/persistence/journal/leveldb/PersistencePluginProxySpec.scala