ClusterReceptionist - missing value for Service Key resulting in deadletters

Using akka 2.6.15, management 1.0.9 and akkaHttp 10.1.11 to spin up a cluster with 4 nodes, and 4 shards using Kubernetes discovery. The configuration is given below:

akka {

  cluster {
    log-info-verbose = on

    shutdown-after-unsuccessful-join-seed-nodes = 60s
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"

    split-brain-resolver {
      active-strategy = keep-oldest
    }

    min-nr-of-members = 4

    sharding {
      remember-entities = on
      passivate-idle-entity-after = off
      distributed-data.durable.keys = []
      number-of-shards = 4

      least-shard-allocation-strategy {
        rebalance-absolute-limit = 4
      }
    }

  }

  management {
    cluster.bootstrap {
      contact-point-discovery {
        service-name = "the-app"
        discovery-method = kubernetes-api
        required-contact-point-nr = 1
      }
    }
  }

  discovery {
    method = kubernetes-api
    kubernetes-api {
      pod-namespace = "some-namespace"
      pod-label-selector = "app=the-app"
      pod-port-name = "management"
    }
  }

  coordinated-shutdown.exit-jvm = on
}

I can confirm that the cluster is formed correctly by listening to Member events and logging MemberUp events - 4 nodes are up and running.
The application is starting 4 different shards, which are running in only two nodes (2 shards per node). Note that each shard spins up tens of persistent child reactors, all with unique names and persistence ids. Those child actors register themselves to the receptionist.

In the logs, I see a lot of deadletters like the one shown below:

DeadLetter: [akka.cluster.ddata.Replicator$ModifyFailure] recipient: 'Actor[akka://the-app/deadLetters]' sender: 'Actor[akka://the-app/system/clusterReceptionist/replicator#-1681777846]' message: ModifyFailure [ReceptionistKey_0]: Update failed: missing value for ServiceKey[com.app.SomeMessage$Interface](the_unique_id_for_the_actor)

In addition to that, there are some errors like this:

Couldn't process DeltaPropagation from [UniqueAddress(akka://the-app@172.20.4.68:25520,-3105484348826649608)] due to java.lang.IllegalStateException: missing value for ServiceKey[com.app.SomeMessage$Interface](the _unique_id_for_the_actor)

To provide more context, I can see the cluster creation log twice (this is happening sometimes though, not everytime I start the cluster)

Cluster Node [akka://the-app@172.20.4.68:25520] - Node [akka://the-app@172.20.4.68:25520] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster


Cluster Node [akka://the-app@172.20.5.221:25520] - Node [akka://the-app@172.20.5.221:25520] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster

I believe this errors are leading to receptionist state not geetting replicated properly, thus some actors cannot be found. My question is:

Additional info worth mentioning is that SomeMessage.Interface is the base message type of a message adapter. Each actor registers a message adapter to receive common messages (like Tick messages) without defining them over and over again. The way each actor registers its own message adapter is given below. Interface is the same class for all adapters.

  ActorRef<Interface> adapter =
        reactor.getContext().messageAdapter(Interface.class, this::onCommonMessage);
    ServiceKey<Interface> commonMessagesServiceKey = CommonMessages.serviceKeyForPath(unique_reactor_path);
    reactor
        .getContext()
        .getSystem()
        .receptionist()
        .tell(Receptionist.register(commonMessagesServiceKey, adapter));

In addition to that, I tried taking down one pod, which lead to restarting that pod, and all the actors were re-spawned and re-registered to the receptionist, however the other pods didn’t receive the updates from the receptionist.

Thanks in advance

The warning distributed data warning with missing value for [key] is something around tombstoning. Not entirely sure how/when this would happen, it could perhaps be related to quickly registering and deregistering a high number of services to the receptionist.

As noted in the docs (Actor discovery • Akka Documentation) the receiptionist is best suited for a relatively small number of service keys and a low turnaround - somewhat the opposite of sharding. I’d recommend you use sharding itself to address the sharded entities by id rather than mixing it with the receptionist.

Two nodes saying they are joining themselves means you have two separate clusters rather than one, so for that case there is some form of problem with your cluster bootstrap.

I can confirm that deregistering doesn’t happen at all, but registering happens pretty quickly - like thousands of service keys registering concurrently from all the nodes. Please note that in the non-clustered environment, or single-node cluster environment, the receptionist works fine. So I wonder if this could be a data replication issue in the cluster receptionist.

The receptionist is not used with sharding at all, shared entities are running fine and not being registered with the receptionist.

I would like to provide more information after testing further. I disabled most of the actors throughout the cluster and left only 10 running. I can confirm that sometimes I still see missing value for ServiceKey warning and the receptionist doesn’t have that reference. After some time, the cluster receptionist state ends up converging and that key becomes available in some of the nodes (not all), but I wonder if that could be an issue related to cluster bootstrap or to distributed data configuration.

The receptionist is not used with sharding at all, shared entities are running fine and not being registered with the receptionist.

Good to hear that that trail in the original question was unrelated.

I disabled most of the actors throughout the cluster and left only 10 running.

This is very surprising, if you could boil that down to a minimal reproducer with only your receptionist interactions triggering the error that would be good.

Sure, I can post a reproducer here. I already shared the cluster configuration above.

public class Application {

  public static void main(String[] args) {
    createActorSystemOnNode();
  }

  private static void createActorSystemOnNode() {
    ActorSystem<Interface> system =
        ActorSystem.create(TheServiceGuardian.create(), "TheServiceCluster");

    AkkaManagement akkaManagement = AkkaManagement.get(system);
    akkaManagement.start();

    CoordinatedShutdown.get(system)
        .addTask(
            CoordinatedShutdown.PhaseBeforeServiceUnbind(),
            "Shutdown Akka Management",
            akkaManagement::stop);
  }
}

The guardian actor is shown below:

public class TheServiceGuardian extends AbstractBehavior<TheServiceGuardian.Interface> {

  public static Behavior<TheServiceGuardian.Interface> create() {
    return Behaviors.setup(TheServiceGuardian::new);
  }

  interface Interface extends CborSerializable {}

  static class EMessage implements Interface {}

  public TheServiceGuardian(ActorContext<Interface> context) {
    super(context);
    System.out.println("Creating Service Guardian");

    Cluster cluster = Cluster.get(context.getSystem());

    ShardActor.initSharding(context.getSystem());
    HttpRoutes routes = new HttpRoutes(context.getSystem());
    HttpServer.start(routes.testRoute(), context.getSystem());


    context.spawn(ClusterListener.create(), "cluster-listener");

    // Send first tick - will start all shard actors
    ClusterSharding sharding = ClusterSharding.get(getContext().getSystem());

    Arrays.asList("Shard1", "Shard2", "Shard3", "Shard4").forEach(x -> {
      EntityRef<ShardActor.Interface> interfaceEntityRef = sharding
          .entityRefFor(EntityTypeKey.create(ShardActor.Interface.class, "ShardActor"), x);
      interfaceEntityRef.tell(Tick.INSTANCE);
    });
  }

  @Override
  public Receive<Interface> createReceive() {
    return newReceiveBuilder()
        .onMessage(EMessage.class, this::handleMessage)
        .build();
  }
  
  // Servers as a heartbeat message just to start this shard during shard rebalancing
  private Behavior<Interface> handleMessage(Interface m) {
    return this;
  }

}

The main shard actor is shown below:

public class ShardActor extends AbstractBehavior<Interface> {

  public static final EntityTypeKey<Interface> TYPE_KEY =
      EntityTypeKey.create(ShardActor.Interface.class, "ShardActor");

  public static Behavior<ShardActor.Interface> create(String entityId) {
    return Behaviors.setup(
        ctx -> Behaviors.withTimers(timers -> new ShardActor(ctx, timers, entityId)));
  }

  public static void initSharding(ActorSystem<Void> system) {
    ClusterSharding.get(system)
        .init(
            Entity.of(TYPE_KEY, entityContext -> ShardActor.create(entityContext.getEntityId()))
                .withRole("ShardRole"));
  }

  public interface Interface extends CborSerializable {}

  public static final class SomeMessage implements Interface {

    private final String name;

    public SomeMessage(@JsonProperty("name") String name) {
      this.name = name;
    }

    @Override
    public String toString() {
      return "SomeMessage{" + "name='" + name + '\'' + '}';
    }
  }

  public enum Tick implements Interface {
    INSTANCE
  }

  private final String name;

  private final List<ActorRef<TestActor.Interface>> actors = new ArrayList<>();

  public ShardActor(
      ActorContext<Interface> context, TimerScheduler<Interface> timers, String entityId) {
    super(context);
    this.name = context.getSelf().path().name();
    System.out.println("Creating shard actor! " + entityId);
    initActors(); // initializing child test actors
    timers.startTimerWithFixedDelay("Tick", Tick.INSTANCE, Duration.ofSeconds(5));
  }

  private void initActors() {
    IntStream.range(0, 5)
        .forEach(i -> {
          ActorRef<TestActor.Interface> spawn = getContext()
              .spawn(TestActor.create(this.name + "_" + "actor_" + i), this.name + "_" + "actor_" + i);
          actors.add(spawn);
        });
  }

  @Override
  public Receive<Interface> createReceive() {
    return newReceiveBuilder()
        .onMessage(SomeMessage.class, this::handleSomeMessage)
        .onMessage(Tick.class, this::handleTick)
        .onSignalEquals(PostStop.instance(), this::handlePostStop)
        .build();
  }

  private Behavior<Interface> handleTick(Tick m) {
    getContext().getLog().info("Ticking {}", name);
    return Behaviors.same();
  }

  private Behavior<Interface> handleMessage(SomeMessage message) {
    System.out.println("Some message received " + message.toString());
    return Behaviors.same();
  }

  private Behavior<Interface> handlePostStop() {
    getContext()
        .getLog()
        .info("Stopping shard actor, losing all internal state for it {}", name);
    return Behaviors.same();
  }
}

And the child actors that get spun up:

public class TestActor extends AbstractBehavior<TestActor.Interface> {

  public static Behavior<TestActor.Interface> create(String id) {
    return Behaviors.setup(ctx -> new TestActor(ctx, id));
  }

  public interface Interface extends CborSerializable {}

  public static final class ActorMessage implements Interface {
    private String data;

    public ActorMessage() {}

    public ActorMessage(String data) {
      this.data = data;
    }

    public String getData() {
      return data;
    }
  }

  public static final class ActorMessageWithReplyTo implements Interface {

    private String data;

    private ActorRef<StatusReply<String>> replyTo;

    public ActorMessageWithReplyTo() {}

    public ActorMessageWithReplyTo(String data, ActorRef<StatusReply<String>> replyTo) {
      this.data = data;
      this.replyTo = replyTo;
    }

    public String getData() {
      return data;
    }

    public ActorRef<StatusReply<String>> getReplyTo() {
      return replyTo;
    }
  }

  public TestActor(ActorContext<Interface> context, String id) {
    super(context);
    System.out.println("Creating test actor with name: " + id);

    ServiceKey<Interface> interfaceServiceKey = ServiceKey.create(Interface.class, id);
    System.out.println("Service key: " + interfaceServiceKey);

   // Register with receptionist
    getContext()
        .getSystem()
        .receptionist()
        .tell(Receptionist.register(interfaceServiceKey, getContext().getSelf()));
  }

  @Override
  public Receive<Interface> createReceive() {
    return newReceiveBuilder()
        .onMessage(ActorMessage.class, this::handleActorMessage)
        .onMessage(ActorMessageWithReplyTo.class, this::handleActorMessageWithReply)
        .build();
  }

  private Behavior<Interface> handleActorMessage(ActorMessage message) {
    System.out.println(getContext().getSelf().path() + " received a message " + message.getData());
    return Behaviors.same();
  }

  private Behavior<Interface> handleActorMessageWithReply(ActorMessageWithReplyTo message) {
    System.out.println(getContext().getSelf().path() + " received a message " + message.getData());
    message.replyTo.tell(StatusReply.success("Wow"));
    return Behaviors.same();
  }
}

The missing value for ServiceKey issues can be seen for TestActor in some of the nodes. I can reproduce this easily by running the cluster in K8, and restarting the whole deployment after a few minutes.

One more thing worth mentioning is that I updated required-contact-point-nr = 4 in the configuration.

Diferent steps to reproduce:

  1. While the cluster is running, restart the K8 deployment.
  2. Kill one node and wait for another to be recreated by K8.

The different error logs that can be seen are:
Couldn't process DeltaPropagation from [UniqueAddress(akka://the-service@172.20.14.200:25520,-4999680910791805366)] due to java.lang.IllegalStateException: missing value for ServiceKey

missing value for ServiceKey

I can still notice those missing service keys after fine-tuning the distributed data gossip and pruning intervals, but not that frequently. Let’s take the following scenario:

  • The cluster is deployed, all actors have registered with the receptionist in all nodes, the receptionist state has converged and we are in a happy state, all SKs are available.
    If we take down one node, we should expect it to leave the cluster and stop the actors running on that node. Service keys in the receptionist of other nodes which were pointing to actors on this node are obsolete now. However, the new node is coming up, and the actors start to register with the receptionist whose state needs to propagate to other nodes.

I believe this is where a clash is happening - obsolete SKs and new service keys, leading to those SKs being pruned and lost forever. If that’s the case, I wonder how does akka is handling these situations internally?

Even if that’s the case, shouldn’t the actual data of the receptionist (all non-obsolete SKs) get replicated properly to all nodes after some time (after the obsolete SKs are removed)?

@johanandren Some additional info that you might find useful. I am able to start a cluster of 4 nodes, A, B, C, and D. I did the following tweaks to receptionist ddata - mainly around increasing the gossip interval until all actors have successfully started up:

custom-ddata-for-cluster-receptionist = ${akka.cluster.distributed-data} {
  gossip-interval = 30 s
  notify-subscribers-interval = 10 s
  pruning-interval = 30 s
}

I kill node B, which leads to node E starting up. On the new node E, I can see the following logs:

Couldn't process DeltaPropagation from [UniqueAddress(akka://the-app@172.20.9.190:25520,4973440803380960879)] due to java.lang.IllegalStateException: missing value for ServiceKey[Some service key referring to an actor in node C]

and

missing value for ServiceKey[Some service key referring to an actor in node D]

This last log was repeating a few times and then stopped. But I can confirm that it doesn’t become available - when I am making an HTTP request on that node, it can’t find that actor, while other nodes are fine.

I did more experiments that I like to share. In the TestActor I was registering a message adapter, to handle common messages sent to that actor. I was registering a service key for that adapter to the receptionist, with the same name as the TestActor

ServiceKey<Interface> interfaceServiceKey = ServiceKey.create(Interface.class, id);
    System.out.println("Service key: " + interfaceServiceKey);

ActorRef<MessageAdapter> adapter = getContext().messageAdapter(MessageAdapter.class, this::onCommonMessage);

ServiceKey<MessageAdapter> messageAdapterSK = ServiceKey.create(MessageAdapter.class, id);

   // Register with receptionist
    getContext()
        .getSystem()
        .receptionist()
        .tell(Receptionist.register(interfaceServiceKey, getContext().getSelf()));

   // Register message adapter with receptionist
   getContext()
        .getSystem()
        .receptionist()
        .tell(Receptionist.register(messageAdapterSK, adapter));

It seems like the tombstoning is somehow related to this.

  1. Not registering the adapter with the receptionist works fine.
  2. Registering the adapter with the receptionist with a Service key with a different name (like the original actors name + some suffix) works fine

Thanks for the details, I haven’t had a chance to look deeper into this yet, but there is some tricky logic around tombstones and the fact that update wins over deletion in ddata, so it could be related to that in some way perhaps together with new incarnations of the same nodes.

Those tuned settings looks very slow btw, given that the defaults are 2 s for gossip, 500ms for notify subscribers and 3 s for pruning of removed nodes. Was that only to try to repeat the issue?

@johanandren Yes, those settings are just for testing and figuring out how to avoid the issue, but I think the root cause lies in the way tombstoning works, as you explained. It seems like regardless of how much I fine-tune the settings, there is a possibility (ex. during a rolling deploy) that some actor reference would be deleted just because that’s how cluster replication works, and basically the whole application breaks.