How to re-create entities in cluster Sharding when the node crashed

Hi,

I am using Cluster Sharding in Akka Cluster with two nodes.

Below is the ClusterShardingExtension settings

akka.cluster.sharding {
remember-entities = on
journal-plugin-id = “akka-persistence-redis.journal”
snapshot-plugin-id = “akka-persistence-redis.snapshot”
state-store-mode = “persistence”
}

  1. I start the two node cluster (N1 and N2).
  2. I start a couple of entities.
  3. I kill the node N1.

Below are my queries

  1. When I stop node N1 it is not creating entities in node N2
  2. Also it is not creating entities when I restart the node

Both the times, I am getting below warning

WARN] [07/26/2019 14:54:23.711] [ReactorCluster-akka.actor.default-dispatcher-21] [akka.tcp://ReactorCluster@127.0.0.1:4002/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[WARN] [07/26/2019 14:54:25.711] [ReactorCluster-akka.actor.default-dispatcher-20] [akka.tcp://ReactorCluster@127.0.0.1:4002/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[WARN] [07/26/2019 14:54:27.711] [ReactorCluster-akka.actor.default-dispatcher-4] [akka.tcp://ReactorCluster@127.0.0.1:4002/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[WARN] [07/26/2019 14:54:29.712] [ReactorCluster-akka.actor.default-dispatcher-4] [akka.tcp://ReactorCluster@127.0.0.1:4002/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity

Sharded actors are only created and re-created on demand, i.e. when you send a message to the actor. Could that be the issue?

Thanks @ignatius

I will make this clear, I have provided a link for the reference https://github.com/akka/akka/issues/22868

The following link says "Rebalancing of Shards works correctly (the entities are started on the new node) in a multi node cluster."

In my case I am trying to do the same.

  1. I started two nodes N1 and N2 as per the below ClusterSharding settings,
  1. I started couple of entities by sending message to the shard actor.

In the link he says he is able to rebalance the shards but for me it is not working.

What should I supposed to do to start the live entities on a new node, when one of the node gets crashed ?
Please guide me on this

As I said, sharded actors are only re-created on demand, i.e. when you send a message to the actor. After the one node is down, try sending a message to the actor you think is missing, and see if the corresponding actor is created on the other node and receives the message.

Which Akka version, latest 2.5.23 I suppose. Since you are trying to use remember-entities those should be started on N2 if N1 is removed from cluster.

How do you stop N1? Kill -9? Then you need a downing provider so that the node is removed from cluster membership. Have you verified that it has been removed?

Maybe start with ddata instead of the redis plugin?

I am using akka version 2.5.22

Below is the cluster sharding settings and remeber entities set to on for both the nodes (N1 and N2)

akka.cluster.sharding {
  remember-entities = on
  journal-plugin-id = "akka-persistence-redis.journal"
  snapshot-plugin-id = "akka-persistence-redis.snapshot"
  action-store-mode = "persistence"
}

I am using ctrl+c to stop the node N1. Since I am subscribing to cluster events I am getting MemberExited event and it in turn calls remove node function. So I am assuming that it is working fine.

package com.zoho.reactor.actor;

import akka.actor.AbstractActor;
import akka.actor.ActorSelection;
import akka.actor.Address;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.Member;
import com.zoho.reactor.entity.Execution;
import java.util.*;

public class SupervisorActor extends AbstractActor {

    private static Map<String, Set<Address>> nodes = new HashMap<>();

    Cluster cluster = Cluster.get(getContext().system());

    public static Map<String, Set<Address>> getNodes() {
        return nodes;
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(Execution.class, event -> {
                    ActorSelection shardActor = getShardActor(SupervisorActor.getNodes().get("default"));
                    shardActor.tell(event, getSelf());
                })
                .match(ClusterEvent.CurrentClusterState.class, state -> {
                    for (Member member : state.getMembers()) {
                        addMember(member);
                    }
                })
                .match(ClusterEvent.MemberUp.class, memberUp -> {
                    addMember(memberUp.member());
                })
                .match(ClusterEvent.MemberExited.class, memberExited -> {
                    removeMember(memberExited.member());
                })
                .build();
    }
    @Override
    public void preStart() {
        cluster.subscribe(self(), ClusterEvent.MemberEvent.class);
    }
    @Override
    public void postStop() {
        cluster.unsubscribe(self());
    }
    private void addMember(Member member) {
        System.out.println("add member called");
        Set<Address> value = new HashSet<>();
        if (member.roles() != null && member.roles().head() != null && nodes.get(member.roles().head()) != null) {
            value = nodes.get(member.roles().head());
            value.add(member.address());
        } else
            value.add(member.address());

        nodes.put(member.roles().head(), value);

        System.out.println("nodes - " + nodes);

    }

    private void removeMember(Member member) {
        System.out.println("remove member called");
        Set<Address> value = new HashSet<>();
        if (member.roles() != null && member.roles().head() != null && nodes.get(member.roles().head()) != null) {
            value = nodes.get(member.roles().head());
            value.remove(member.address());
        }
        nodes.put(member.roles().head(), value);
    }
    private ActorSelection getShardActor(Set<Address> addresses) {

        if (addresses != null) {
            Iterator<Address> itr = addresses.iterator();
            if (itr.hasNext())
                return getContext().actorSelection(addresses.iterator().next() + "/system/sharding/shardRegion");
        }
        throw new NoSuchElementException();
    }
}

I am not using “ddata” but it is still creating below files in my machine
1. ddata-ReactorCluster-defaultReplicator-4001
2. ddata-ReactorCluster-defaultReplicator-4002

Als o below find the warnings

[WARN] [07/30/2019 17:52:45.971] [ReactorCluster-akka.actor.default-dispatcher-25] [akka.tcp://ReactorCluster@127.0.0.1:4002/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[WARN] [07/30/2019 17:52:45.971] [ReactorCluster-akka.actor.default-dispatcher-25] [akka.tcp://ReactorCluster@127.0.0.1:4002/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[WARN] [07/30/2019 17:52:47.974] [ReactorCluster-akka.actor.default-dispatcher-2] [akka.tcp://ReactorCluster@127.0.0.1:4002/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[WARN] [07/30/2019 17:52:47.974] [ReactorCluster-akka.actor.default-dispatcher-2] [akka.tcp://ReactorCluster@127.0.0.1:4002/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]

I think we would need full logs to be able to understand. Preferably at Debug log level. Or an example to reproduce the problem.

Do you handle the StartEntity message in the shardId extractor function?

Sorry for the delayed reply,

At the first time of execution it went fine. When I start the cluster second time I am getting the below logs

Here, I am not using ddata for but still it shows that Lease Acquired. Getting state from DData

Below find the complete log

[INFO] [08/03/2019 18:16:01.954] [ReactorCluster-akka.actor.default-dispatcher-18] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegionCoordinator/singleton/coordinator] ShardCoordinator was moved to the active state State(Map(),Map(),Set(),Set(-1, -9, 0, WA_6),true)
[INFO] [08/03/2019 18:16:03.625] [ReactorCluster-akka.actor.default-dispatcher-2] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion/-1] Lease Acquired. Getting state from DData
[INFO] [08/03/2019 18:16:03.625] [ReactorCluster-akka.actor.default-dispatcher-4] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion/0] Lease Acquired. Getting state from DData
[INFO] [08/03/2019 18:16:03.625] [ReactorCluster-akka.actor.default-dispatcher-17] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion/WA_6] Lease Acquired. Getting state from DData
[INFO] [08/03/2019 18:16:03.625] [ReactorCluster-akka.actor.default-dispatcher-18] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion/-9] Lease Acquired. Getting state from DData
[WARN] [08/03/2019 18:16:03.633] [ReactorCluster-akka.actor.default-dispatcher-17] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[WARN] [08/03/2019 18:16:03.634] [ReactorCluster-akka.actor.default-dispatcher-17] 
[akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[INFO] [08/03/2019 18:16:03.634] [ReactorCluster-akka.actor.default-dispatcher-24] [akka://ReactorCluster/deadLetters] Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://ReactorCluster/system/sharding/shardRegion#-258038368] to Actor[akka://ReactorCluster/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ReactorCluster/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [08/03/2019 18:16:03.634] [ReactorCluster-akka.actor.default-dispatcher-17] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[INFO] [08/03/2019 18:16:03.634] [ReactorCluster-akka.actor.default-dispatcher-24] [akka://ReactorCluster/deadLetters] Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://ReactorCluster/system/sharding/shardRegion#-258038368] to Actor[akka://ReactorCluster/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ReactorCluster/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [08/03/2019 18:16:03.634] [ReactorCluster-akka.actor.default-dispatcher-17] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[INFO] [08/03/2019 18:16:03.634] [ReactorCluster-akka.actor.default-dispatcher-24] [akka://ReactorCluster/deadLetters] Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://ReactorCluster/system/sharding/shardRegion#-258038368] to Actor[akka://ReactorCluster/deadLetters] was not delivered. [4] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ReactorCluster/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [08/03/2019 18:16:03.634] [ReactorCluster-akka.actor.default-dispatcher-24] [akka://ReactorCluster/deadLetters] Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://ReactorCluster/system/sharding/shardRegion#-258038368] to Actor[akka://ReactorCluster/deadLetters] was not delivered. [5] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ReactorCluster/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [08/03/2019 18:16:05.647] [ReactorCluster-akka.actor.default-dispatcher-25] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[WARN] [08/03/2019 18:16:05.647] [ReactorCluster-akka.actor.default-dispatcher-25] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[INFO] [08/03/2019 18:16:05.647] [ReactorCluster-akka.actor.default-dispatcher-23] [akka://ReactorCluster/deadLetters] Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://ReactorCluster/system/sharding/shardRegion#-258038368] to Actor[akka://ReactorCluster/deadLetters] was not delivered. [6] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ReactorCluster/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [08/03/2019 18:16:05.647] [ReactorCluster-akka.actor.default-dispatcher-25] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[INFO] [08/03/2019 18:16:05.647] [ReactorCluster-akka.actor.default-dispatcher-23] [akka://ReactorCluster/deadLetters] Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://ReactorCluster/system/sharding/shardRegion#-258038368] to Actor[akka://ReactorCluster/deadLetters] was not delivered. [7] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ReactorCluster/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[WARN] [08/03/2019 18:16:05.647] [ReactorCluster-akka.actor.default-dispatcher-25] [akka.tcp://ReactorCluster@172.24.116.211:2551/system/sharding/shardRegion] shardRegion: Shard must not be empty, dropping message [akka.cluster.sharding.ShardRegion$StartEntity]
[INFO] [08/03/2019 18:16:05.647] [ReactorCluster-akka.actor.default-dispatcher-23] [akka://ReactorCluster/deadLetters] Message [akka.cluster.sharding.ShardRegion$StartEntity] from Actor[akka://ReactorCluster/system/sharding/shardRegion#-258038368] to Actor[akka://ReactorCluster/deadLetters] was not delivered. [8] dead letters encountered. If this is not an expected behavior, then [Actor[akka://ReactorCluster/deadLetters]] may have terminated unexpectedly, This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Below is the source code link
https://github.com/anilkumble/AkkaClusterExample

MyClusterSystem is the main class where the execution begins,

My usecase is to just re-create an entity actor when the actor system goes down