Error kernel pattern and scheduler

Hi All,

I have implemented the error kernel pattern in a sample I was trying to create.

The idea was to create a conversation between 3 actors but one does not respond immediately, you need to send multiple message before receive an answer. The implementation should be resilient even if the actors runs in a distributed environment.

So I’ve create a child actor which retries using a scheduler and, when the answer finally arrives, the child notify is parent and stops.

Even if this solution works well I have few doubts.

First into the child actor I’ve implemented a scheduler that executes a function every 50 milliseconds, I have few doubts about the execution context. I mean if the code inside the method sendMessage can modify its own actor?

When an actor schedule an execution, what happens, waits that the actor ends its own work?

  import scala.concurrent.ExecutionContext.Implicits.global
  import scala.language.postfixOps

  var cancellableSchedule : Option[Cancellable] = None

  var counter = 0
  var maxCounter = 10

  def receive = LoggingReceive {
    case r:MessageB2C_Ack => {
      log.info("ActorChildB - Received MessageB2C_Ack from " + sender())
      parentActor ! r
      context.stop(self)
    }
    case r:SendMessage => {
      log.info("ActorChildB - Received SendMessage from " + sender())
        sendScheduledMessage
      }
   }

  private def sendScheduledMessage(): Unit = {
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.language.postfixOps
    cancellableSchedule =  Option(context.system.scheduler.schedule(0 milliseconds, 50 millisecond){
      sendMessage()
    })
  }

   private def sendMessage(): Unit = {
      log.info("ActorChildB - sendMessage " + msg.getClass.getName + " to " + dest)
      if (counter < maxCounter) {
        dest ! msg
        counter = counter + 1
      } else  {
        throw new MyRetryTimeoutException("Fine")
      }
   }

  override def postStop(): Unit = {
    cancellableSchedule.foreach(c => c.cancel())
    super.postStop()
  }

bump :slight_smile:

bump :slight_smile:

I mean if the code inside the method sendMessage can modify its own actor?

That logic is not good, because it touches the mutable actor state from a thread outside the actor.
The scheduler runs scheduled jobs on the dispatcher you have given it (so the Scala global one in this case).

I’d recommed using the Timers trait to schedule a periodic message to the actor itself instead Classic Actors • Akka Documentation and then do the counting and resending when it receives that message.

Note that you can also use the timeout of ask , combining it with pipeTo to get a message when a request times out, potentially a little bit cleaner than scheduling a timeout. Docs for that here: Classic Actors • Akka Documentation

There is also a built in retry pattern that could be useful together with ask: Futures patterns • Akka Documentation

1 Like

Thanks for answering, the timer is indeed what I was looking for. On the other side I had no lucky trying ask pattern and retry.
If I understood well, I was also discouraged to use them by @TimMoore in this conversation:

I have updated the source I was trying to implement using the Timers scheduled message

Not sure if there are cases when timers.cancel(id) is not executed, I mean the postStop() method.

  timers.startPeriodicTimer(id, TimerRetry(), 100.millis)
  dest ! msg

  log.info(s"ActorChildB - Started because of ActorB")

  var counter = 0
  var maxCounter = 10

  override def postStop(): Unit = {
    timers.cancel(id)
    super.postStop()
  }

  def receive = LoggingReceive {
    case r:MessageB2C_Ack => {
      log.info("ActorChildB - Received MessageB2C_Ack from " + sender())
      context.parent ! r
      context.stop(self)
    }
    case r:TimerRetry => {
      log.info("ActorChildB - Received TimerRetry from " + sender())
      if (counter < maxCounter) {
        dest ! msg
        counter = counter + 1
      } else  {
        throw new MyRetryTimeoutException("End")
      }
    }
  }