I am experimenting with a tiny cluster with 1-2 Producer
seed-nodes:
"akka://WorkStealSystem@127.0.0.1:25251",
"akka://WorkStealSystem@127.0.0.1:25252"
I have another Consumer
actor that subscribes to a Receptionist, selects one of these Producer
s and they exchange messages repeatedly. At one point I kill the only working seed (25251). This is detected by the Consumer
and results in the following message:
17:50:54.939 [WorkStealSystem-work-dispatcher-16] INFO concurrency.Utils$ - concurrency.Consumer$ @processJobs: Selecting producer.
17:50:54.942 [WorkStealSystem-work-dispatcher-16] ERROR concurrency.Utils$ - Supervisor RestartSupervisor saw failure: bound must be positive
java.lang.IllegalArgumentException: bound must be positive
at java.util.Random.nextInt(Random.java:388) ~[?:?]
at scala.util.Random.nextInt(Random.scala:96) ~[scala-library-2.13.3.jar:?]
at concurrency.Consumer$.selectRandomly(Consumer.scala:66) ~[classes/:?]
at concurrency.Consumer$.processJobs$$anonfun$1(Consumer.scala:102) ~[classes/:?]
at akka.actor.typed.internal.BehaviorImpl$DeferredBehavior$$anon$1.apply(BehaviorImpl.scala:119) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.Behavior$.start(Behavior.scala:168) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.Behavior$.interpret(Behavior.scala:275) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.RestartSupervisor.aroundReceive(Supervision.scala:261) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.Behavior$.interpret(Behavior.scala:274) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:129) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.$anonfun$adaptAndHandle$2(ActorAdapter.scala:178) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.$anonfun$adaptAndHandle$2$adapted(ActorAdapter.scala:178) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.withSafelyAdapted(ActorAdapter.scala:189) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.handle$1(ActorAdapter.scala:178) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.adaptAndHandle(ActorAdapter.scala:183) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.$anonfun$aroundReceive$2(ActorAdapter.scala:97) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.$anonfun$aroundReceive$2$adapted(ActorAdapter.scala:95) ~[akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.withSafelyAdapted(ActorAdapter.scala:189) [akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:95) [akka-actor-typed_2.13-2.6.9.jar:2.6.9]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577) [akka-actor_2.13-2.6.9.jar:2.6.9]
at akka.actor.ActorCell.invoke(ActorCell.scala:547) [akka-actor_2.13-2.6.9.jar:2.6.9]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [akka-actor_2.13-2.6.9.jar:2.6.9]
at akka.dispatch.Mailbox.run(Mailbox.scala:231) [akka-actor_2.13-2.6.9.jar:2.6.9]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.lang.Thread.run(Thread.java:834) [?:?]
The Consumer
then restarts and after initialization I get the error:
17:50:54.956 [WorkStealSystem-akka.actor.default-dispatcher-6] INFO akka.remote.artery.Association - Association to [akka://WorkStealSystem@127.0.0.1:25251] having UID [4584493602840708540] has been stopped. All messages to this UID will be delivered to dead letters. Reason: ActorSystem terminated
17:
The actor then gets a time-out as expected and I get this output:
17:50:56.948 [WorkStealSystem-work-dispatcher-16] INFO concurrency.Utils$ - concurrency.Consumer$: @Init Unexpected consumer message = |ResponseFailure(java.util.concurrent.TimeoutException: Ask timed out on [Actor[akka://WorkStealSystem@127.0.0.1:25251/user/producer#-83987191]] after [3000 ms]. Message of type [concurrency.Consumer$Available]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.)|
17:51:04.838 [WorkStealSystem-akka.actor.default-dispatcher-6] WARN akka.remote.artery.Association - Outbound control stream to [akka://WorkStealSystem@127.0.0.1:25252] failed. Restarting it. akka.remote.artery.OutboundHandshake$HandshakeTimeoutException: Handshake with [akka://WorkStealSystem@127.0.0.1:25252] did not complete within 20000 ms
At this point the Consumer
justy sits theire and does nothing. If I launch a new Consumer
it will find the Producer
s and work correctly.
So my question is: how can I revive the Consumer
? Note that I am still using the same behaviour but get no new updates on the cluster members. I assume that this is because all messages are sent to the dead-letter mailbox.
TIA