Acessing persistent actor from kafka source

Hey guys,

I have a persistent actor. It uses Akka Durable State.

I can access that one based on following code

sharding.init(Entity.of(PersistentActor.ENTITY_TYPE_KEY,
        entityContext -> PersistentActor.create(entityContext.getEntityId(),
            PersistenceId.of(entityContext.getEntityTypeKey().name(), entityContext.getEntityId()))));
.....
Source.from(...)
 .mapAsync(buffer, message -> {
          EntityRef<Command> ref = sharding.entityRefFor(PersistentActor.ENTITY_TYPE_KEY,
              String.valueOf(message.getId()));
          return ref.ask(replyTo -> new Command(message, replyTo), askTimeout);
        })

That works very well.

If I replace the source with a Alpakka Consumer (Consumer
.committableSource(…)) then I get the following error:

java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Recipient shard region of [EntityRef(EntityTypeKey[scheduler.Command](PersistentActor), 2)] had already been terminated.
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
	at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674)
	at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662)
	at java.base/java.util.concurrent.CompletableFuture.thenApplyAsync(CompletableFuture.java:2173)
	at scala.concurrent.java8.FuturesConvertersImpl$CF.thenApply(FutureConvertersImpl.scala:35)
	at scala.concurrent.java8.FuturesConvertersImpl$CF.thenApply(FutureConvertersImpl.scala:26)
	at Consumer.lambda$consume2$60ab0450$1(Consumer.java:77)
	at akka.stream.javadsl.Source.$anonfun$mapAsync$1(Source.scala:2288)
	at akka.stream.impl.fusing.MapAsync$$anon$30.onPush(Ops.scala:1307)
	at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:542)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:423)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
	at akka.actor.Actor.aroundReceive(Actor.scala:537)
	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
	at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

The application.conf looks like following:

akka {
  loggers = ["akka.event.slf4j.Slf4jLogger"]
  loglevel = "DEBUG"
  stdout-loglevel = "OFF"
  logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"

  actor {
    provider = "cluster"
    serializers {
      jackson-json = "akka.serialization.jackson.JacksonJsonSerializer"
      jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
    }
    serialization-bindings {
      "DurableStateMessage" = jackson-json
    }
  }

  remote {
    artery {
      enabled = on
      transport = tcp
      canonical.hostname = "127.0.0.1"
      canonical.port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka://Test@127.0.0.1:2551"
    ]

    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"

    split-brain-resolver {
      active-strategy = keep-majority
    }
  }

  management {
    http {
      hostname = "127.0.0.1"
      port = 8558
    }
  }
  persistence {

    state {
      plugin = "jdbc-durable-state-store"
      // Enable the line below to automatically start the snapshot-store when the actorsystem is started
      auto-start-snapshot-stores = ["jdbc-durable-state-store"]
    }

  }
}

Is that possible that the Source cannot resolve that shard? I didn’t use KafkaClusterSharding in that example, but maybe that is incompatible with the durable state store?

I removed now Kafka Cluster Sharding, it does not change the issue.
I now use Akka Cluster (2.6) + Akka Streams (2.6) + Alpakka Kafka (2)

Hi Sigurd,

The error is somewhat misleading, and is not about the “recipient shard region” but about the local shard region or proxy.

It should only be triggered when trying to call EntityRef.ask after the local shard region has shut down, and that in turn should only happen if sharding failed to start and the shard region actor therefore stopped before entityRefFor is called or if the cluster node is shutting down when entityRefFor is called (for that scenario to work gracefully with a stream sending elements into sharding you may have to include your stream in graceful termination and stop it in one of the early phases)

I’d recommend that you carefully look at your logs to see if there is some error/warning from sharding saying that it failed to start or that the node started coordinated shutdown for some reason.

Yeah, you are right. There was no running shard at that moment. Thanks!