Akka persistence for stateful actors - jobmanager -> jobs

TL;DR:
I have a service that runs fine right now but does not have persistence. I’m a noob to akka and I’m trying to find a good tutorial/example to use to learn how persistence works. The ones I did find from the docs don’t really help. If someone can point me to a sample that is more that can run on it’s on (not just in a unit test) and doesn’t require installing a horde of other technologies that would be great. Even better would be if it includes a mgr/children where the mgr holds actorrefs to the children (similar to the chatroom example without persistence here: Introduction to Actors • Akka Documentation ) that would be even better. I’m working in scala but java docs/examples would still help a lot if there aren’t good scala ones available.
TIA


Long Version:
My current version of my service runs fine but if the server goes down all current state is lost. I need to add persistence. My service basically has job manager actor gets requests with a jobId and if a request is type START it spawns a new child job actor that has a long running process. Any other requests and messages are routed to the child job actor for that jobId. I do this similar to the chat room example ( Introduction to Actors • Akka Documentation ) where the manager has state of a Map jobId → ActorRef. The job actors also have their own state object that would need to be persisted. First off I’m really confused what my “pre-reqs” are besides akka. Do I need akka-cluster and clustersharding? akka-projections? akka-platform? Any direction someone can point to for where I should start would be great.

Some things I’m confused about:
Normally the job manager spawns the jobs and the jobs themselves send messages to other systems during their process and react to the responses (that are routed back to the correct job via jobid) So in trying to understand switching to the Command->Event pattern instead of just a Command, it seems like the restarted job manager replayed events would end up spawning fresh children while the old children would get restarted and the manager would have no references to them. Plus would the replayed events cause the child jobs to resend all the messages to the other systems? Does this mean I need to use Durable state instead of Event Sourcing?

Trying to find the right journal/snapshot plugin to use for the storage I keep seeing much of them marked “deprecated” Is everything moving to using the JDBC plugin?

How does persistence work with timers? I use timers for a number of things (e.g. timeouts waiting for responses for outside systems and scheduled stop time for job)

Thanks!
-Jason

Unless you really need a distributed system for your workload, all you need is Akka Persistence and a journal plugin.

Cluster together with sharding solves the problem of having at most one actor instance (writer) for a given persistence id in a cluster of multiple actor systems, it also handles how to route messages to the right actor based on that id.

Akka Persistence Query and Akka Projections allows a separate component to consume a stream of changes and create a projection, for example in a different kind of database, or on a different database server to decouple writes from querying needs in a system. For a simple system you may not need to use projections at all.

Durable state stores/updates its entire state, while event sourcing stores more fine grained events, facts about things that has happened that can be replayed, but also consumed from other components as a stream. Which one is right is very much dependent on what is needed for a specific use case. I’d recommend doing a small proof of concept with each for your problem if you are unsure.

If a persistent actor uses timers, those are not automatically part of its state so it will be up to you to setup new timers when a persistent actor starts, and some how store in its state what timers were started.

Unless you have special DB needs and like relational databases the Akka Persistence JDBC is a good choice to start with.

I’m afraid I don’t know of any good self contained samples that showcase a single actor system application with persistence, most of our samples are focusing on more complex distributed applications (for example the samples in GitHub - akka/akka-samples: Akka Sample Projects and Implementing Microservices with Akka :: Akka Platform Guide)

I hope this helps!

thanks for responding!

I’m still confused about the actorrefs, if I have a persistent actor that is a manager and persistent actors that are spawned from the manager whose actorrefs are part of the manager’s state, when the manager and children reinflate after a crash will all those actorrefs still work? And does this only work with durable state?

From what I read about eventsourcing peresistance here’s what I was imagining but probably my picture is all wrong.

  1. Service starts, creates akka System and creates fresh persistent manager
  2. Service receives messages from outside to start job A with parameters and sends command C1 to manager
  3. Manager validates command C1 and sends an event(M1) to itself
  4. Manager gets event(M1) and creates job A with jobid and parameters and stores actorRef for job A in map(jobid → actorref) in its state
  5. jobactor A starts up sends event(A1) to itself to get started
  6. Jobactor A gets event(A1), sets timer for job stop time from parameters, and sends message (Request1) with jobid to outside system and sets timer for message timeout
  7. outside system sends message (Response1) to service tagged with jobid
  8. service sends command C2 to manager with message(Response1)
  9. manager validates command C2 with jobid in message (Response1) corresponds to a jobid in its map and sends itself an event (M2).
  10. manager gets event(M2) and gets actorref for jobid from its state and sends command C3 to jobactor A with message (Response1)
  11. jobactor A gets command C3, validates the message (Response1) and sends itself an event(A2)
  12. jobactor A gets event (A2) and continues processing sending more messages to outside system(s) getting responses repeating steps 7-12
    Server goes down

Server starts back up and restarts Service

  1. Services starts, creates akka System and akka realizes the last run did not exit cleanly and recreates persistent actors: manager and jobactor A

  2. akka replays event M1 to manager

  3. akka replays event A1 to jobactor A

  4. (same as step 4) Manager gets event(M1) and creates a new instance of job A, we’ll call it job AA with jobid and parameters and stores actorRef for job A in map(jobid → actorref) in its state - Now there are two job A

  5. (same as step 6) Jobactor A gets event(A1), sets timer for job stop time from parameters, and sends message (Request 2) with jobid to outside system and sets timer for message timeout - Now outside system is going to repeat work it already did

  6. Jobactor AA starts up sends event(AA1) to itself to get started

  7. akka replays event M2 to manager

  8. (same as step 10) manager gets event(M2) and gets actorref for jobid from its state and sends command C4 to jobactor AA with message (Response1) ** Now the new copy of A is gonna get old response from original run **

  9. Jobactor AA gets event(AA1), sets timer for job stop time from parameters, and sends message (Request 3) with jobid to outside system and sets timer for message timeout - Now outside system is going to repeat the same work a 3rd time

  10. outside system sends message (Response2) to service tagged with jobid

  11. jobactor AA gets command C4, validates the message (Response1) and logs an error cause it got a response to a request it never sent. we’re very off track now*

  12. manager validates jobid in message (Response2) corresponds to a jobid in its map and sends itself an event (M3).

  13. manager gets event(M3) and gets actorref for jobid from its state and sends command C5 to jobactor AA with message (Response2) ** Now the new copy of A is gonna get a response to a duplicate request made by the jobactor A **

  14. jobactor AA gets command C5, validates the message (Response2) and logs an error cause it got a response to a request it never sent.

  15. outside system sends message (Response3) to service tagged with jobid

  16. manager validates jobid in message (Response3) corresponds to a jobid in its map and sends itself an event (M4).

  17. manager gets event(M4) and gets actorref for jobid from its state and sends command C6 to jobactor AA with message (Response3)

  18. jobactor AA gets command C6, validates the message (Response3) and sends itself an event(AA2)

  19. jobactor A receives timeout for Request1, logs an error and starts it’s error processing which will result in more messages sent to the outside system whose responses will get routed to jobactor AA which will produce errors

  20. jobactor A gets event (AA2) and continues processing sending more messages to outside system(s) getting responses


Here’s what I wanted before reading about event sourcing

  1. Service starts, creates akka System and creates fresh persistent manager
  2. Service receives messages from outside to start job A with parameters and sends command C1 to manager
  3. Manager validates command C1 and sends an event(M1) to itself
  4. Manager gets event(M1) and creates job A with jobid and parameters and stores actorRef for job A in map(jobid → actorref) in its state
  5. jobactor A starts up sends event(A1) to itself to get started
  6. Jobactor A gets event(A1), sets timer for job stop time from parameters, and sends message (Request1) with jobid to outside system and sets timer for message timeout
  7. outside system sends message (Response1) to service tagged with jobid
  8. service sends command C2 to manager with message(Response1)
  9. manager validates command C2 with jobid in message (Response1) corresponds to a jobid in its map and sends itself an event (M2).
  10. manager gets event(M2) and gets actorref for jobid from its state and sends command C3 to jobactor A with message (Response1)
  11. jobactor A gets command C3, validates the message (Response1) and sends itself an event(A2)
  12. jobactor A gets event (A2) and continues processing sending more messages to outside system(s) getting responses repeating steps 7-12
    Server goes down

Server starts back up and restarts Service

  1. Services starts, creates akka System and akka realizes the last run did not exit cleanly and recreates instances of persistent actors: manager and jobactor A.
  2. manager A gets handed its old state with it’s jobid->actorref map and some how the old actorref that was stored with manager A gets /magically/ pointed to the new instances of jobactor A even with durable state, I’m not sure how things get hooked together again, the only answer I found was to use clustering with sharding even though I don’t need a distributed cluster
  3. jobactor A gets handled its old state and continues where it left off.

FWIW, I think I’d recommend taking Lightbend’s reactive architecture course at academy.lightbend.com. Especially the event sourcing section. It’s free. It isn’t hands on, but it will start to let you get your head around some of the concepts. (The expert Akka course probably would be even better, but I’d don’t think that’s available nor free.)

Frankly, if you have some budget, I’d probably recommend engaging with Lightbend with a subscription and some consulting. Because getting the architecture right is important, and I’m not sure you are going to get the detail you need asking Q&A in a forum.

Because if I were designing this, I’d probably start with a clean slate. I’d first make sure I had the right entities and events. And there are other things that really need to be considered, like how actors are restarted. And maybe externalizing the timers, depending on your requirements.

Event sourcing is an insanely good metaphor for job management, because it so well encapsulates the state machines and state transitions involved. But I don’t think you are going to be able to the answers you need from just Q&A with the community.

1 Like

Oh. And two more things:

One, definitely look at how side effects are handled in event sourcing. In a situation like this you typically handle communication with the external systems as side effects. Specifically to avoid the duplication you suggest in steps like 17.

If you look at the recovery section of the docs you will see that side effects are not replayed as part of recovery. That section of the docs also briefly mentions the RecoveryCompleted message. This is where you would typically rebuild your timers (presuming you don’t externalize them).

Two, I should have put a disclaimer on my message. I don’t currently work for Lightbend. So don’t think of my recommendation of Lightbend training and consulting as a sales pitch. I just have been involved in the design of a lot of Akka systems and when looking through your long post I just thought to myself “this is the kind of thing as a consultant I’d usually spend several days around doing requirements analysis, event storming, and design around, there’s no way I can condense everything down into a paragraph or two.”.

1 Like

I mean my OP was asking for where I can go learn stuff cause I wasn’t expecting to feasibly get an answer here. :)

Hi @johanandren , I’m make one demo with actor persistent,
I already persist in my database, I simply call the ActorWithBehavior.apply(id) function, and look in the db and find it,
My question is how do I get it back? because if I do ActorWithBehavior.apply(id) I get the error that it already exists. Am I forced to use akka.cluster.sharding to retrieve it?