Akka Timers in Cluster Sharding

I am trying to run timer actor in akka cluster sharding but when one node going down which contains the schedular timer… it is not getting replicated to new shard …the actor is getting replilcated to new shard but timer is not getting restarted on new shard

public class ManagerActor extends AbstractBehavior<ManagerActor.Command>{

private static final Object StartTimerKey=new Object();

TimerScheduler<ManagerActor.Command> timer;

public static Logger logger = LoggerFactory.getLogger(ManagerActor.class);

private ManagerActor(ActorContext<Command> context) {
	super(context);
}

public static Behavior<ManagerActor.Command> create(){
	return  Behaviors.setup(ctx->{return new ManagerActor(ctx);});
}

public interface Command extends Serializable{
	
}

public static class StartSchedular implements ManagerActor.Command{

	private static final long serialVersionUID = 1L;
	
}
public static class CheckAndStartSession implements ManagerActor.Command{

	private static final long serialVersionUID = 1L;
	
}


@Override
public Receive<Command> createReceive() {
	return newReceiveBuilder()
			
			.onMessage(StartSchedular.class, command -> {

				logger.info(getContext().getSelf().path() + " :: Request Received to Start StartSchedularActor "+" | application port = "+applicationContext.getEnvironment().getProperty("server.port"));

					return Behaviors.withTimers(timer -> {
					
					timer.startTimerWithFixedDelay(StartTimerKey, new CheckAndStartSession(),
							Duration.ofSeconds(0), Duration.ofSeconds(1));
					this.timer = timer;
					
					return this;
				});
			})
			.onMessage(CheckAndStartSession.class, command->{
				
				logger.info(getContext().getSelf().path() + " :: Request Received to CheckAndStartSession "+" | application port = "+applicationContext.getEnvironment().getProperty("server.port"));

				return this;
			})
			
			.build();
}

}

public class ParentActor extends AbstractBehavior<ParentActor.Command>{

public static Logger logger = LoggerFactory.getLogger(ParentActor.class);


private ParentActor(ActorContext<Command> context) {
	super(context);

}



public static Behavior<ParentActor.Command> create(){	
	return  Behaviors.setup(context->{
		return new ParentActor(context);
	});
	
}

public interface Command extends Serializable{
	
}

public static class StartManagerActor implements ParentActor.Command{
	
}

@Override
public Receive<Command> createReceive() {
	
	return newReceiveBuilder()
			.onMessage(StartManagerActor.class, command->{
				
				logger.info(getContext().getSelf().path()+" :: Request Received to Start StartManagerActor");
				ActorRef<ManagerActor.Command> ManagerActor = getContext().spawn(ManagerActor.create(), SESSION_ACTOR");
			

			
shardRegion.
				tell(new ShardingEnvelope<>("SESSION_ACTOR",new ManagerActor.StartSchedular ()));
				
				return this;
			})
			.build();
}

}

The timers available in Akka is either the per actor instance ones you have used, or the local ones through the system.scheduler. The actor timers are cancelled once the actor stops, the system ones are not run if the system stops.

This means that for scheduled messages in sharding that need to survive rebalance of entities across the nodes (when nodes stop, join or just to balance the set of live shards across the set of available nodes) such a timer will need to be persistent and run in a way that it can contact the entity even if it is rebalanced.

There is no such tool out of the box in Akka so you will have to build something like that yourself.

One way would be to still do the timers per entity, use persistent actors (EventSourcedBehavior or DurableStateBehavior and always persist any intent to start a timer before starting it, so that on restarting the actor the timer can look at the state and schedule those timers again.

Another way would be a central persistent scheduler, something like a cluster-wide crontab, I did an experiment a couple of years ago that may be interesting to look at for inspiration: GitHub - johanandren/akron: Crontab implementation for Akka