Why isn't my typed actor being restarted by its guardian?

Adapted from this StackOverflow question:

I’m experimenting with Akka typed. I have a dummy actor which emulates a flaky worker:

import akka.actor.typed.Behavior
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors

import scala.util.Random

object DummyActor {
  def behavior[T](serviceKey: ServiceKey[T]): Behavior[Any] = Behaviors.setup { ctx =>

    ctx.system.receptionist ! Receptionist.Register(serviceKey, ctx.self)

    ctx.log.info("Woohoo, I'm alive!")

    Thread.sleep(1000)

    if (Random.nextInt(10) > 5)
      throw new IllegalStateException("Something bad happened!")

    Behaviors.empty
  }
}

and its guardian with a router:

import akka.actor.typed.{Behavior, SupervisorStrategy}
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.Behaviors

object MyCluster {

  val serviceKey: ServiceKey[String] = ServiceKey[String]("cluster")

  val strategy = SupervisorStrategy.restart

  val behavior: Behavior[String] = Behaviors.setup { ctx =>


    (1 to 5).foreach { i =>
      ctx.log.info(s"Spawning actor #$i")
      ctx.spawn(
        Behaviors.supervise(DummyActor.behavior(serviceKey))
                 .onFailure[Throwable](strategy),
        s"actor-$i"
      )
    }

    val router = ctx.spawn(RandomRouter.clusterRouter(serviceKey), "router")

    Behaviors.empty
  }
}

My router listens to receptionist events:

import java.util.concurrent.ThreadLocalRandom

import akka.actor.Address
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.ClusterEvent.{ReachabilityEvent, ReachableMember, UnreachableMember}
import akka.cluster.typed.{Cluster, Subscribe}

object RandomRouter {
  private final case class WrappedReachabilityEvent(event: ReachabilityEvent)

  // same as above, but also subscribes to cluster reachability events and
  // avoids routees that are unreachable
  def clusterRouter[T](serviceKey: ServiceKey[T]): Behavior[T] =
    Behaviors.setup[Any] { ctx ⇒
      ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self)

      val cluster = Cluster(ctx.system)
      // typically you have to map such external messages into this
      // actor's protocol with a message adapter
      val reachabilityAdapter: ActorRef[ReachabilityEvent] = ctx.messageAdapter(WrappedReachabilityEvent.apply)

      cluster.subscriptions ! Subscribe(reachabilityAdapter, classOf[ReachabilityEvent])

      def routingBehavior(routees: Vector[ActorRef[T]], unreachable: Set[Address]): Behavior[Any] =
        Behaviors.receive { (ctx, msg) ⇒
          msg match {
            case serviceKey.Listing(services) ⇒
              if (services.isEmpty) {
                ctx.log.info("Found no services")
              } else {
                ctx.log.info(s"Found services: ${services.map(_.path).mkString(", ")}")
              }
              routingBehavior(services.toVector, unreachable)
            case WrappedReachabilityEvent(event) => event match {
              case UnreachableMember(m) =>
                ctx.log.warning(s"Member ${m.uniqueAddress.address} has become unreachable")
                routingBehavior(routees, unreachable + m.address)
              case ReachableMember(m) =>
                ctx.log.info(s"Member ${m.uniqueAddress.address} has become reachable again")
                routingBehavior(routees, unreachable - m.address)
            }

            case other: T @unchecked ⇒
              if (routees.isEmpty)
                Behaviors.unhandled
              else {
                val reachableRoutes =
                  if (unreachable.isEmpty) routees
                  else routees.filterNot { r => unreachable(r.path.address) }

                val i = ThreadLocalRandom.current.nextInt(reachableRoutes.size)
                reachableRoutes(i) ! other
                Behaviors.same
              }
          }
        }

      routingBehavior(Vector.empty, Set.empty)
    }.narrow[T]
}

But I find that when I start my cluster, ultimately I’ll find some actors have died (expectedly), but not restarted, leaving me with logs like this:

[INFO] [06/01/2018 18:11:14.242] [cluster-system-akka.actor.default-dispatcher-4] [akka://cluster-system/user/router] Found services: akka://cluster-system/user/actor-4, akka://cluster-system/user/actor-3, akka://cluster-system/user/actor-1, akka://cluster-system/user/actor-5

Why isn’t MyCluster#strategy restarting the failed actors?

The default supervision for typed actors (stop) is different than for untyped (restart). You have to add restarting supervision. See Fault Tolerance section in docs.

Sorry, I didn’t read your example good enough, I see now that you do that. It should. I’ll get back to you when I’m at a computer and have looked closer at this.

restart and resume does nothin when an exception is thrown on actor start as that could too easily lead to infinite restart-loops. If you want supervised setup you need to use the backoff strategy. (Edit: this is documented in the Scaladoc of the individual strategies)

Also, note that you should never Thread.sleep anywhere in an actor .

Cheers, @johanandren. I actually suspected it was because of setup… Presumably the restart strategy would have worked in any of the other Behaviors?

As for the Thread.sleep, this is just a dummy – I’d never do it in a real system. :slight_smile: I just didn’t want my console to be flooded. I’ll switch the strategy when I have a moment to test it out.

Yes, if you perform setup and complete that into a returned behavior and then throw from that behavior when it gets a message that will work as expected, it is just if throwing while doing that setup logic.

If restart and resume would act on that failure they would immediately invoke the setup logic again, which isn’t that likely to succeed since either the setup logic is flawed, or some resource it wants to use is not available, for the resource side a backoff may make sense.

1 Like

Yes, this is basically what I had suspected. It makes perfect sense, too. I will try to submit a PR to add this to the docs explicitly if this makes sense. Thanks again, Johan! And great work with Akka Typed!