Which kind of signal can be managed in akka?
This topic is based on a conversation between an anonymous user of Akka and our “Ask AI about Akka” bot in the documentation.
Which kind of signal can be managed in akka?
This topic is based on a conversation between an anonymous user of Akka and our “Ask AI about Akka” bot in the documentation.
In Akka, actors can manage different types of signals. These signals are system events that notify actors of certain changes or actions. For instance, the ChildFailed
signal is received by a parent actor when a child actor is terminated due to a failure. This signal contains the cause of the failure. The ChildFailed
signal extends the Terminated
signal, which is received when an actor is stopped. If your use case does not need to distinguish between stopping and failing, you can handle both cases with the Terminated
signal [source].
In addition, actors can also handle PreRestart
and PostStop
signals. The PreRestart
signal is received before an actor is restarted, while the PostStop
signal is received when an actor is stopped. These signals are typically used to clean up resources before an actor is restarted or stopped.
Here is an example of how to handle these signals in code:
return Behaviors.receive(String.class)
.onSignal(
PreRestart.class,
signal -> {
resource.close();
return Behaviors.same();
})
.onSignal(
PostStop.class,
signal -> {
resource.close();
return Behaviors.same();
})
.build();
In this example, the resource.close()
method is called to clean up resources when the PreRestart
or PostStop
signal is received.
I am trying to manage the ChildFailed signal but it happens only when child terminates and not when it failed a call.
In Akka, the ChildFailed
signal is indeed received by a parent actor when a child actor is terminated due to a failure. This signal contains the cause of the failure. However, it’s important to note that this signal is not sent when a child actor fails a specific call or operation, but rather when it is terminated due to a failure.
If you want to handle failures of specific calls or operations, you might need to implement this in your actor’s behavior. For example, you could have the child actor send a message to the parent actor when it fails a call, and then handle this message in the parent actor.
If you want to be notified when a child actor is terminated, you need to watch the child actor using the context.watch(childActorRef)
method. If the child actor was stopped because of a failure, the ChildFailed signal will be received by the parent actor.
Here is an example of how to watch a child actor:
object MiddleManagement {
def apply(): Behavior[Command] =
Behaviors.setup[Command] { context =>
val child = context.spawn(Worker(), "child")
context.watch(child)
Behaviors.receiveMessage { message =>
child ! message
Behaviors.same
}
}
}
In this example, the parent actor (MiddleManagement) watches the child actor (Worker). If the Worker actor is stopped because of a failure, the MiddleManagement actor will receive the ChildFailed
signal.
Is it possible to apply a resume logic but after some failure apply a restart logic?
Yes, it is possible to apply different supervision strategies based on the type of exception that is thrown. In Akka, you can define a supervisor strategy that decides what to do when a child actor fails. The supervisor can decide to resume, restart, stop, or escalate the failure based on the type of exception.
Here is an example of a supervisor strategy that resumes the actor when an ArithmeticException is thrown and restarts the actor when a NullPointerException
is thrown:
Scala:
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
override val supervisorStrategy =
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
Java:
private static SupervisorStrategy strategy =
new OneForOneStrategy(
10,
Duration.ofMinutes(1),
DeciderBuilder.match(ArithmeticException.class, e -> SupervisorStrategy.resume())
.match(NullPointerException.class, e -> SupervisorStrategy.restart())
.match(IllegalArgumentException.class, e -> SupervisorStrategy.stop())
.matchAny(o -> SupervisorStrategy.escalate())
.build());
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
In this example, if an ArithmeticException
is thrown, the supervisor will resume the actor, allowing it to continue processing the next message. If a NullPointerException
is thrown, the supervisor will restart the actor, clearing out its internal state before it processes the next message.
Remember that the decision to resume, restart, stop, or escalate should be based on the nature of the work being performed and the type of failure.
Good solution but using akka typed i receive this error “Bad return type in lambda expression: SupervisorStrategy cannot be converted to Directive”.
In Akka Typed, the supervision strategy is a bit different from Akka Classic. Instead of returning a SupervisorStrategy
, you return a SupervisorStrategy.Directive
. The directives available are SupervisorStrategy.restart()
, SupervisorStrategy.resume()
, SupervisorStrategy.stop()
, and SupervisorStrategy.restartWithBackoff()
.
Here is an example of how to define a supervision strategy in Akka Typed:
import akka.actor.typed.Behavior;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.Behaviors;
Behavior<Command> behavior = Behaviors.supervise(CommandHandler.create())
.onFailure(Exception.class, SupervisorStrategy.restart());
In this example, if an Exception
is thrown, the actor will be restarted. You can replace Exception.class
with the specific type of exception you want to handle, and SupervisorStrategy.restart()
with the directive you want to apply [source].
If you want to apply different directives based on the type of exception, you might need to catch the exceptions in the behavior of the actor and then throw a custom exception that you can handle with the supervision strategy.
How to manage the error during unstash?
In Akka, when you unstash messages, they are processed one by one. If an error occurs while processing a message, the actor’s supervision strategy will be used to handle the failure. This means that if an error occurs during unstashing, it will be handled in the same way as if it occurred during normal message processing.
However, there is a specific issue that can occur when unstashing messages. If a message triggers an error that causes the actor to restart, the rest of the messages in the stash may be lost. This is because the actor’s state (including the stash) is cleared when it restarts.
To handle this, you can use the StashBuffer.unstash
method with the numberOfMessages
parameter. This allows you to unstash a limited number of messages at a time, and then send a message to context.self
before continuing unstashing more. This means that other new messages may arrive in-between and those must be stashed to keep the original order of messages. It becomes more complicated, so it’s better to keep the number of stashed messages low [source].
Here is an example of how to use StashBuffer.unstash:
return Behaviors.withStash(100, stash -> {
return Behaviors.setup(context -> {
return Behaviors.receive(Command.class)
.onMessage(InitialState.class, message -> {
return stash.unstashAll(active(message.value));
})
.onMessage(DBError.class, message -> {
throw message.cause;
})
.onMessage(Command.class, message -> {
stash.stash(message);
return Behaviors.same();
})
.build();
});
});
In this example, if a DBError
is received while unstashing messages, the actor will throw an exception and restart, and the rest of the messages in the stash will be lost. To prevent this, you could modify the behavior to unstash a limited number of messages at a time, and then send a message to context.self
before unstashing more.
I have two message in stash and my supervisor strategy is resume, if i call unstashAll and the first stashed message failed what happen to the second one?
When you call unstashAll()
, the stashed messages are processed one by one in the order they were stashed. If an error occurs while processing a message, the actor’s supervision strategy is used to handle the failure.
In your case, if the first stashed message causes a failure and your supervision strategy is resume
, the actor will continue processing with the next message. This means that the second stashed message will still be processed.
However, it’s important to note that the failed message will be lost and won’t be retried. The resume
strategy simply means “ignore the failure and continue processing the next message”. If you want to handle the failure in a different way (for example, by retrying the failed message), you would need to use a different supervision strategy or handle the failure in your actor’s behavior [source, source].