How to handle exception during creation?

Hi all

I am trying to test actor, that is using Akka Typed Persistence. The scenario is, the database is offline and it should send a notification to another actor, that the database is not available.

During the creation of child actor, that will persist the data to the database, it throws the following error:

akka.actor.ActorInitializationException: akka://StoreTestOffline/user/OfflineStoreActor/StoreChilid: exception during creation
        at akka.actor.ActorInitializationException$.apply(Actor.scala:202)
        at akka.actor.ActorCell.create(ActorCell.scala:696)
        at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
        at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
        at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
        at akka.dispatch.Mailbox.run(Mailbox.scala:228)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalArgumentException: Default journal plugin is not configured, see 'reference.conf'
        at akka.persistence.Persistence$.verifyPluginConfigIsDefined(Persistence.scala:193)
        at akka.persistence.typed.internal.EventSourcedSettings$.defaultJournalPluginId$1(EventSourcedSettings.scala:57)
        at akka.persistence.typed.internal.EventSourcedSettings$.journalConfigFor(EventSourcedSettings.scala:61)
        at akka.persistence.typed.internal.EventSourcedSettings$.apply(EventSourcedSettings.scala:39)
        at akka.persistence.typed.internal.EventSourcedSettings$.apply(EventSourcedSettings.scala:22)
        at akka.persistence.typed.internal.EventSourcedBehaviorImpl.apply(EventSourcedBehaviorImpl.scala:83)
        at akka.actor.typed.Behavior$.start(Behavior.scala:331)
        at akka.actor.typed.internal.InterceptorImpl$anon$1.start(InterceptorImpl.scala:45)
        at akka.actor.typed.internal.AbstractSupervisor.aroundStart(Supervision.scala:72)
        at akka.actor.typed.internal.InterceptorImpl.preStart(InterceptorImpl.scala:68)
        at akka.actor.typed.internal.InterceptorImpl$.$anonfun$apply$1(InterceptorImpl.scala:25)
        at akka.actor.typed.Behavior$DeferredBehavior$anon$1.apply(Behavior.scala:264)
        at akka.actor.typed.Behavior$.start(Behavior.scala:331)
        at akka.actor.typed.internal.adapter.ActorAdapter.preStart(ActorAdapter.scala:238)
        at akka.actor.Actor.aroundPreStart(Actor.scala:550)
        at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
        at akka.actor.typed.internal.adapter.ActorAdapter.aroundPreStart(ActorAdapter.scala:51)
        at akka.actor.ActorCell.create(ActorCell.scala:676)
        ... 9 more

[ERROR] [08/30/2019 12:28:15.391] [StoreTestOffline-akka.actor.default-dispatcher-6] [akka://StoreTestOffline/user] death pact with Actor[akka://StoreTestOffline/user/OfflineStoreActor/StoreChilid#-1253371311] was triggered
akka.actor.typed.DeathPactException: death pact with Actor[akka://StoreTestOffline/user/OfflineStoreActor/StoreChilid#-1253371311] was triggered
        at akka.actor.typed.Behavior$.interpretSignal(Behavior.scala:402)
        at akka.actor.typed.internal.adapter.ActorAdapter.handleSignal(ActorAdapter.scala:128)
        at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:87)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
        at akka.actor.dungeon.DeathWatch.$anonfun$receivedTerminated$1(DeathWatch.scala:67)
        at akka.actor.dungeon.DeathWatch.$anonfun$receivedTerminated$1$adapted(DeathWatch.scala:65)
        at scala.Option.foreach(Option.scala:438)
        at akka.actor.dungeon.DeathWatch.receivedTerminated(DeathWatch.scala:65)
        at akka.actor.dungeon.DeathWatch.receivedTerminated$(DeathWatch.scala:64)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:447)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:597)
        at akka.actor.ActorCell.invoke(ActorCell.scala:580)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
        at akka.dispatch.Mailbox.run(Mailbox.scala:229)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The way, how I spawn the child is:

object MessageSupervisorSpec {

  def create(communicator: Option[ActorRef[RpcCmd]], logger: Option[ActorRef[LogCmd]]): Behavior[MessageCmd] =

    Behaviors.setup { context =>

        context.log.info("=============> Start MessageSupervisorSpec <=============")

        val fault = Behaviors
            .supervise(Persistence.create(communicator, logger))
            .onFailure[ActorInitializationException](SupervisorStrategy.stop)

        val store = context.spawn(fault, "StoreChilid")

        context.watch(store)

        def loop(): Behavior[MessageCmd] =
            Behaviors.receiveMessage {
            case SaveMessage(v) =>
                println(v)
                store ! SaveMessage(v)
                Behavior.same
            }


        loop()

    }   
}

The code of the child actor:

object Persistence {


  val storeName = "connector-store"

  /*
   * Persist all incoming messages from KAFKA or SAP
   */

  private val commandHandler: Option[ActorRef[RpcCmd]] => (MessageState, MessageCmd) => Effect[MessageEvent, MessageState]
  = communicator => { (_, command) =>
    command match {
      case SaveMessage(data) =>
        Effect
          .persist(MessageSaved(data))
          .thenRun(state => communicator.foreach(actor => actor ! SendMessage(state.value)))
    }
  }

  private val eventHandler: (MessageState, MessageEvent) => MessageState
  = { (_, event) =>
    event match {
      case MessageSaved(data) => MessageState(data)
    }
  }

  def create(communicator: Option[ActorRef[RpcCmd]], logger: Option[ActorRef[LogCmd]]): Behavior[MessageCmd] =

    Behaviors.setup { context =>

      context.log.info("=============> Start PersistenceMessageActor <=============")

      EventSourcedBehavior[MessageCmd, MessageEvent, MessageState](
        persistenceId = PersistenceId(storeName),
        emptyState = MessageState(),
        commandHandler = commandHandler(communicator),
        eventHandler = eventHandler)
        .onPersistFailure(SupervisorStrategy.restartWithBackoff(minBackoff = 10.seconds, maxBackoff = 60.seconds, randomFactor = 0.1))
        .receiveSignal {
          case (_, _) =>
            println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
            context.log.error("PersistenceMessageActor has been terminated. Please check the persistence.")
            logger.foreach(actor => actor ! SaveLog(Log(Error, "Message store actor was stopped!")))
        }

    }

}

I expect to receive a Signal that I can handle accordingly but I did not get anyone.

The question is, how to handle the case, when the actor can not be started, to receive the signal.

Thanks

You are getting the signal, but it is not handled by the watching actor, and that causes a akka.actor.typed.DeathPactException which then causes also the watching actor to crash/stop.

You will have to have a signal handler which receives Terminated messages to do some arbitrary logic on watcher-termination. Second sample in this part of the docs shows an example of that: https://doc.akka.io/docs/akka/current/typed/actors.html#trying-it-out