Work pulling: unexpected RequestNext messages

I’m using the work pulling mechanism to implement a dynamic, distributed pool of workers. I’ve read the documentation and the relevant sample code, but while testing some failure modes I’ve encountered an unexpected behavior.

I’ve reduced my context to the smallest possible one:

I’ve modified my worker so that it always fails by throwing a RuntimeException. Since it’s configured with the restart supervision strategy, the worker actor will always restart when it fails.

Reading the documentation this is what I expect:

  1. System starts
  2. The producer receives a RequestNext message for the only active worker.
  3. I send a job to the producer, so that it’s sent to the idle worker.
  4. The worker receives the job but fails and doesn’t send the confirm message to the ConsumerController.
  5. Since the WorkPullingProducerController has tracked that job as unconfirmed, it’s informed by the infrastracture that the worker it has sent that job to has been restarted, and it doesn’t have other workers to send that message to, it re-sends the unconfirmed message to the restarted worker.

And this is precisely what happens. However I’d have also expected that during this process the producer wouldn’t receive any further RequestNext message, since there is only one worker and it’s working on something already. Instead, I see this behavior:

  1. Producer: receives a RequestNext
  2. Producer: uses RequestNext.sendNextTo to send the job to the worker.
  3. Producer: almost immediately received another RequestNext
  4. Consumer: fails, then restarts
  5. Producer: receives another RequestNext
  6. Consumer: receives the same job, fails and restarts again
  7. Producer: receives another RequestNext

The producers received more and more RequestNext messages even if there is only one worker and it does use the sendNextTo ActorRef only of the first one. Why is that?

For the time being I’ve modified my code so that the worker never fails, always replies with the ConsumerController.Confirmed message and tracks the successful or failed execution elsewhere. However I’d like to really understand how the work pull pattern works in the above scenario.

Edit: I’ve also noticed that, since I keep track of job statuses (pending, in progress and completed) in an external database, and every time a job is submitted its status in the database is updated, often I see more in progress jobs than available workers (like: 40 workers and 63 in progress jobs). I think this might be related to the above behavior.

Despite the never failing workers, I can still reproduce the above situation. Given:

  • One node
  • One producer, backed by a durable queue
  • Two workers
  • One (long-running) job

When I:

  1. Start the node
  2. Submit the job
  3. Stop the node when the submitted job is still in progress.

I get the following warning:

[WARN] [2020-12-02 14:21:15,217] | [saas-akka.actor.default-dispatcher-5] | a.a.CoordinatedShutdown: Coordinated shutdown phase [actor-system-terminate] timed out after 10000 milliseconds

So I think it’s reasonable to assume that the worker couldn’t send the ConsumerController.Confirmed message. Then, when I start again the node, the WorkPullingProducerController restarts, reloading the unconfirmed message. This is what I expected:

  • Both workers starts, registering themselves to the WorkPullingProducerController.
  • The WorkPullingProducerController receives two requests from them. Since there is an unconfirmed message to re-send, it sends the unconfirmed message to one of the workers, and sends a RequestNext to the producer for the other one. From what I understand, this shouldn’t change even if I had 100 workers: only one RequestNext at a time is actually sent to the producer, waiting for a reply to it from the producer before sending the next one.

However, after activating the LogMessagesInterceptor, I see the following messages exchange as soon as I start the node again:

[DEBUG] [2020-12-02 14:21:00,866] | [saas-akka.actor.default-dispatcher-21] | a.a.t.i.LogMessagesInterceptor: actor [akka://saas/system/singletonManagerjobs-manager/jobs-manager] received message: WrappedRequestNext(RequestNext(Actor[akka://saas/system/singletonManagerjobs-manager/jobs-manager/producer-controller/$$a-adapter#1182625933],Actor[akka://saas/system/singletonManagerjobs-manager/jobs-manager/producer-controller#580539556]))
[DEBUG] [2020-12-02 14:21:00,955] | [saas-akka.actor.default-dispatcher-19] | a.a.t.i.LogMessagesInterceptor: actor [akka://saas/user/rendition-worker-2] received message: WrappedDelivery(Delivery(GenerateRendition(ConversionJob("test"))),Actor[akka://saas/user/rendition-worker-2/consumer-controller#216793132],jobs-manager-dd7a608d-c606-4e6f-8244-d2efb0d7ca98,1))
[DEBUG] [2020-12-02 14:21:01,241] | [saas-akka.actor.default-dispatcher-21] | a.a.t.i.LogMessagesInterceptor: actor [akka://saas/system/singletonManagerjobs-manager/jobs-manager] received message: WrappedRequestNext(RequestNext(Actor[akka://saas/system/singletonManagerjobs-manager/jobs-manager/producer-controller/$$a-adapter#1182625933],Actor[akka://saas/system/singletonManagerjobs-manager/jobs-manager/producer-controller#580539556]))

Result: the producer receives first RequestNext, then another one before the first has been used to submit more jobs. So it fails, restarts and gets back both RequestNext messages one after the other:

[DEBUG] [2020-12-02 14:21:01,272] | [saas-akka.actor.default-dispatcher-21] | a.a.t.i.LogMessagesInterceptor: actor [akka://saas/system/singletonManagerjobs-manager/jobs-manager] received message: WrappedRequestNext(RequestNext(Actor[akka://saas/system/singletonManagerjobs-manager/jobs-manager/producer-controller/$$a-adapter#1783057446],Actor[akka://saas/system/singletonManagerjobs-manager/jobs-manager/producer-controller#1183803959]))
[DEBUG] [2020-12-02 14:21:01,302] | [saas-akka.actor.default-dispatcher-21] | a.a.t.i.LogMessagesInterceptor: actor [akka://saas/system/singletonManagerjobs-manager/jobs-manager] received message: WrappedRequestNext(RequestNext(Actor[akka://saas/system/singletonManagerjobs-manager/jobs-manager/producer-controller/$$a-adapter#1783057446],Actor[akka://saas/system/singletonManagerjobs-manager/jobs-manager/producer-controller#1183803959]))

Both the code in the documentation and the more extensive sample code assume that this should never happen. So my questions are:

  • Why this is happening?
  • How should I manage those cases?

Since in WorkPullingProducerController.RequestNext[A] I read:

The WorkPullingProducerController sends RequestNext to the producer when it is allowed to send one message via the sendNextTo or askNextTo. Note that only one message is allowed, and then it must wait for next RequestNext before sending one more message.

Seems to me that there is a bug somewhere. Either in my application or in Akka.

Thanks for reporting. We can continue in the issue you created since this seems to be a bug.