Akka-cluster-sharding HandOffStopper issue

Hi,

I have question about akka-cluster-sharding.

We have an application that is using akka cluster sharding that read a message from queue and send it to interested consumers that could be placed in different shards, if all consumer persist message it is then acked on queue. From time to time we’ve lot of nacks caused by timeouts to different shards, to solve the problem we need restart whole application. As some of consumers working well and we see lot of timeouts to particular shards we think that sharding mechanism could be an issue in this situation.

I’ve noticed that logs from HandOffStopper: HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities ... is appearing really frequent in constant time intervals for the same shards. I decided to add some additional logs around this processing and build SNAPSHOT version of akka library, you can check changes here: https://github.com/sebarys/akka/pull/1/files - just additional logging.

What I’ve noticed is that HandOffStopper is created with Set() (empty set) of entities to terminate, e.g. example logs:

$ gunzip -c * | ggrep "Dispatcher/11" | ggrep "HandOffStopper" | jq -s -c 'sort_by(.time)[] | .kubernetes.host + " AA " + .time + " BB " + .log.logger + " CC " + .log.actor + " DD " + .log.message' | head -n 50
"ip-XXX.ec2.internal AA 2019-09-04T08:2:08.277+00:00 BB akka.cluster.sharding.PersistentShard CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11 DD Starting HandOffStopper for shard 11 to terminate following entities: Set(Actor[akka://ProfileAkkaClusterSystem/system/sharding/Dispatcher/11/XXX%2Fa0eb829f-9f66-3f29-a6df-9934e9a5bed8%2FXXX#-1195859775])"
"ip-XXX.ec2.internal AA 2019-09-04T08:2:08.307+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$b DD HandOffStopper of the `11` shard received Terminated message for the ActorRef Actor[akka://ProfileAkkaClusterSystem/system/sharding/Dispatcher/11/XXX%2Fa0eb829f-9f66-3f29-a6df-9934e9a5bed8%2FXXX#-1195859775], the remaining entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:2:08.307+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$b DD HandOffStopper remaining entities are empty, sending ShardStopped message for shard 11"
"ip-XXX.ec2.internal AA 2019-09-04T08:2:54.496+00:00 BB akka.cluster.sharding.PersistentShard CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11 DD Starting HandOffStopper for shard 11 to terminate following entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:22:49.515+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$a DD HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities of the `11` shard after 55000 milliseconds, stopping the remaining entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:23:44.535+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$a DD HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities of the `11` shard after 55000 milliseconds, stopping the remaining entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:24:39.555+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$a DD HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities of the `11` shard after 55000 milliseconds, stopping the remaining entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:25:34.575+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$a DD HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities of the `11` shard after 55000 milliseconds, stopping the remaining entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:26:29.595+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$a DD HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities of the `11` shard after 55000 milliseconds, stopping the remaining entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:27:24.615+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$a DD HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities of the `11` shard after 55000 milliseconds, stopping the remaining entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:28:19.635+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$a DD HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities of the `11` shard after 55000 milliseconds, stopping the remaining entities: Set()"
"ip-XXX.ec2.internal AA 2019-09-04T08:29:14.655+00:00 BB akka.cluster.sharding.ShardRegion$HandOffStopper CC akka.tcp://ProfileAkkaClusterSystem@XXX:2551/system/sharding/Dispatcher/11/$a DD HandOffStopMessage[akka.actor.PoisonPill$] is not handled by some of the entities of the `11` shard after 55000 milliseconds, stopping the remaining entities: Set()"

If I understood correctly it blocking sending ShardStopped(shardId) message back and finish process of rebalancing
Could anyone verify my assumption?

HandOffStoper is created when if (state.entities.nonEmpty) condition is fulfilled but later idByRef.keySet from akka.cluster.sharding.Shard is passed as Set of ActorRefs that should be terminated. I’m not confident what relation is between these two sets but as HandOffStopper with empty Set will be waiting forever for Termination of provided ActorRefs it looks like not desired situation. Couldn’t we just create this actor in case when idByRef.keySet.nonEmpty?

For reference, issue: https://github.com/akka/akka/issues/27647