I have implemented the error kernel pattern in a sample I was trying to create.
The idea was to create a conversation between 3 actors but one does not respond immediately, you need to send multiple message before receive an answer. The implementation should be resilient even if the actors runs in a distributed environment.
So I’ve create a child actor which retries using a scheduler and, when the answer finally arrives, the child notify is parent and stops.
Even if this solution works well I have few doubts.
First into the child actor I’ve implemented a scheduler that executes a function every 50 milliseconds, I have few doubts about the execution context. I mean if the code inside the method sendMessage can modify its own actor?
When an actor schedule an execution, what happens, waits that the actor ends its own work?
import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.postfixOps
var cancellableSchedule : Option[Cancellable] = None
var counter = 0
var maxCounter = 10
def receive = LoggingReceive {
case r:MessageB2C_Ack => {
log.info("ActorChildB - Received MessageB2C_Ack from " + sender())
parentActor ! r
context.stop(self)
}
case r:SendMessage => {
log.info("ActorChildB - Received SendMessage from " + sender())
sendScheduledMessage
}
}
private def sendScheduledMessage(): Unit = {
import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.postfixOps
cancellableSchedule = Option(context.system.scheduler.schedule(0 milliseconds, 50 millisecond){
sendMessage()
})
}
private def sendMessage(): Unit = {
log.info("ActorChildB - sendMessage " + msg.getClass.getName + " to " + dest)
if (counter < maxCounter) {
dest ! msg
counter = counter + 1
} else {
throw new MyRetryTimeoutException("Fine")
}
}
override def postStop(): Unit = {
cancellableSchedule.foreach(c => c.cancel())
super.postStop()
}
I mean if the code inside the method sendMessage can modify its own actor?
That logic is not good, because it touches the mutable actor state from a thread outside the actor.
The scheduler runs scheduled jobs on the dispatcher you have given it (so the Scala global one in this case).
I’d recommed using the Timers trait to schedule a periodic message to the actor itself instead Classic Actors • Akka Documentation and then do the counting and resending when it receives that message.
Note that you can also use the timeout of ask , combining it with pipeTo to get a message when a request times out, potentially a little bit cleaner than scheduling a timeout. Docs for that here: Classic Actors • Akka Documentation
Thanks for answering, the timer is indeed what I was looking for. On the other side I had no lucky trying ask pattern and retry.
If I understood well, I was also discouraged to use them by @TimMoore in this conversation:
I have updated the source I was trying to implement using the Timers scheduled message
Not sure if there are cases when timers.cancel(id) is not executed, I mean the postStop() method.
timers.startPeriodicTimer(id, TimerRetry(), 100.millis)
dest ! msg
log.info(s"ActorChildB - Started because of ActorB")
var counter = 0
var maxCounter = 10
override def postStop(): Unit = {
timers.cancel(id)
super.postStop()
}
def receive = LoggingReceive {
case r:MessageB2C_Ack => {
log.info("ActorChildB - Received MessageB2C_Ack from " + sender())
context.parent ! r
context.stop(self)
}
case r:TimerRetry => {
log.info("ActorChildB - Received TimerRetry from " + sender())
if (counter < maxCounter) {
dest ! msg
counter = counter + 1
} else {
throw new MyRetryTimeoutException("End")
}
}
}