Entity existence on two nodes after rebalance

Akka entity exists on two nodes when node comes back up after rebalance.
Well, so far, I am using akka persistence event sourcing, remember entities, timeout cancelled from the actor context, akka-persistence-jdbc plugin with postgres.
I really would like some insight into how I can resolve this.

Error message on the original node looks like this:

akka.persistence.typed.internal.JournalFailureException: Failed to persist event type [com.evryword.akkacluster.AkkaClusterApplication$DeviceActor$NewRun] with sequence number [1162] for persistenceId [DeviceActor|31a62ae0-21ef-4f5d-a4bd-42ffd28aaaa3]
        at akka.persistence.typed.internal.Running$PersistingEvents.onJournalResponse(Running.scala:775) ~[akka-persistence-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.persistence.typed.internal.Running$PersistingEvents.onMessage(Running.scala:667) ~[akka-persistence-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.persistence.typed.internal.Running$PersistingEvents.onMessage(Running.scala:652) ~[akka-persistence-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.scaladsl.AbstractBehavior.receive(AbstractBehavior.scala:84) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.persistence.typed.internal.EventSourcedBehaviorImpl$$anon$1.aroundReceive(EventSourcedBehaviorImpl.scala:220) ~[akka-persistence-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:132) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:132) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:132) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:87) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:57) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:132) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:85) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpret(Behavior.scala:282) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:238) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:133) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:109) ~[akka-actor-typed_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579) ~[akka-actor_2.13-2.8.5.jar!/:2.8.5]
        at akka.actor.ActorCell.invoke(ActorCell.scala:547) ~[akka-actor_2.13-2.8.5.jar!/:2.8.5]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[akka-actor_2.13-2.8.5.jar!/:2.8.5]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[akka-actor_2.13-2.8.5.jar!/:2.8.5]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[akka-actor_2.13-2.8.5.jar!/:2.8.5]
        at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) ~[na:na]
        at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182) ~[na:na]
        at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655) ~[na:na]
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622) ~[na:na]
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165) ~[na:na]
Caused by: java.sql.BatchUpdateException: Batch entry 0 insert into "event_journal" ("deleted","persistence_id","sequence_number","writer","write_timestamp","adapter_manifest","event_payload","event_ser_id","event_ser_manifest","meta_payload","meta_ser_id","meta_ser_manifest")  values (('FALSE'),('DeviceActor|31a62ae0-21ef-4f5d-a4bd-42ffd28aaaa3'),('1162'::int8),('de3ecb9b-cebc-438a-8cf0-410b6ed141db'),('1709644898875'::int8),(''),?,('33'::int4),('com.evryword.akkacluster.AkkaClusterApplication$DeviceActor$NewRun'),(NULL),(NULL),(NULL)) was aborted: ERROR: duplicate key value violates unique constraint "event_journal_pkey"
  Detail: Key (persistence_id, sequence_number)=(DeviceActor|31a62ae0-21ef-4f5d-a4bd-42ffd28aaaa3, 1162) already exists.  Call getNextException to see other errors in the batch.
        at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:165) ~[postgresql-42.7.2.jar!/:42.7.2]
        at org.postgresql.core.ResultHandlerDelegate.handleError(ResultHandlerDelegate.java:52) ~[postgresql-42.7.2.jar!/:42.7.2]
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2413) ~[postgresql-42.7.2.jar!/:42.7.2]
        at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:579) ~[postgresql-42.7.2.jar!/:42.7.2]
        at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:912) ~[postgresql-42.7.2.jar!/:42.7.2]
        at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:936) ~[postgresql-42.7.2.jar!/:42.7.2]
        at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1733) ~[postgresql-42.7.2.jar!/:42.7.2]
        at com.zaxxer.hikari.pool.ProxyStatement.executeBatch(ProxyStatement.java:127) ~[HikariCP-5.0.1.jar!/:na]
        at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeBatch(HikariProxyPreparedStatement.java) ~[HikariCP-5.0.1.jar!/:na]
        at slick.jdbc.JdbcActionComponent$InsertActionComposerImpl$MultiInsertAction.$anonfun$run$18(JdbcActionComponent.scala:542) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.jdbc.JdbcBackend$SessionDef.withPreparedStatement(JdbcBackend.scala:427) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.jdbc.JdbcBackend$SessionDef.withPreparedStatement$(JdbcBackend.scala:422) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.jdbc.JdbcBackend$BaseSession.withPreparedStatement(JdbcBackend.scala:491) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.jdbc.JdbcActionComponent$InsertActionComposerImpl.preparedInsert(JdbcActionComponent.scala:511) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.jdbc.JdbcActionComponent$InsertActionComposerImpl$MultiInsertAction.run(JdbcActionComponent.scala:536) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.jdbc.JdbcActionComponent$SimpleJdbcProfileAction.run(JdbcActionComponent.scala:28) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.jdbc.JdbcActionComponent$SimpleJdbcProfileAction.run(JdbcActionComponent.scala:25) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.dbio.SynchronousDatabaseAction$FusedAndThenAction.$anonfun$run$4(DBIOAction.scala:533) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.dbio.SynchronousDatabaseAction$FusedAndThenAction.$anonfun$run$4$adapted(DBIOAction.scala:533) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at scala.collection.immutable.Vector.foreach(Vector.scala:2124) ~[scala-library-2.13.12.jar!/:na]
        at slick.dbio.SynchronousDatabaseAction$FusedAndThenAction.run(DBIOAction.scala:533) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.dbio.SynchronousDatabaseAction$$anon$11.run(DBIOAction.scala:570) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.basic.BasicBackend$DatabaseDef$$anon$3.liftedTree1$1(BasicBackend.scala:276) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at slick.basic.BasicBackend$DatabaseDef$$anon$3.run(BasicBackend.scala:276) ~[slick_2.13-3.4.1.jar!/:3.4.1]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:840) ~[na:na]
Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "event_journal_pkey"
  Detail: Key (persistence_id, sequence_number)=(DeviceActor|31a62ae0-21ef-4f5d-a4bd-42ffd28aaaa3, 1162) already exists.
        at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2725) ~[postgresql-42.7.2.jar!/:42.7.2]
        at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2412) ~[postgresql-42.7.2.jar!/:42.7.2]
        ... 24 common frames omitted

There are two general possibilities here:

  • A cluster split-brain occurred after a network partition. In particular, it may be the case that a node had a pause (likely garbage collection) long enough to still be persisting events after the rest of the cluster had rebalanced shards. The akka.cluster.down-removal-margin which by default is based on akka.cluster.split-brain-resolver.stable-after… this duration needs to be longer than the longest expected garbage collection pause.

  • When new nodes were starting up, the new nodes formed an independent cluster. Possible causes would be discovery not returning pre-existing nodes during bootstrap (while akka.management.cluster.bootstrap.new-cluster-enabled=on was set). One practical mitigation is requiring a minimum number of nodes be discovered (via akka.management.cluster.bootstrap.contact-point-discovery.required-contact-point-nr, e.g. set to the number of replicas in your Kubernetes config) before trying to form a cluster, or alternatively set akka.cluster.min-nr-of-members (perhaps to half +1 of the number of replicas in the Kubernetes config) to delay cluster sharding startup unless/until the node is in a cluster of a certain size.