Why does the dead letter happen?

  1. It’s the entity Project.
object Project {
  val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command](name = "Project")
  
  def init(system: ActorSystem[_]): Unit = {
    ClusterSharding(system).init(Entity(EntityKey) { context => Project(context.entityId) })
  }
  
  def apply(id: String): Behavior[Command] = {
    val starter = Starter(id)
    EventSourcedBehavior
      .withEnforcedReplies[Command, Event, Project](
        persistenceId = PersistenceId(entityTypeHint = EntityKey.name, entityId = id),
        emptyState = starter,
        commandHandler = (state, cmd) => state.onCommand(cmd),
        eventHandler = (state, event) => state.onEvent(event)
        )
      .withTagger(event => Set(id, event.getClass.toString))
  }

  final case class Starter(id: String, name: String = "", details: Historic[Details] = null) extends Project {
    override def onCommand(cmd: Command): ReplyEffect[Event, Project] = {
      cmd match {
        case OpenProject(name, staff, replyTo) =>          
            Effect.persist(ProjectOpened(id, name, staff, LocalDateTime.now()))
                  .thenReply(replyTo)(_ => StatusReply.Success(Done))
      }
    }
    
    override def onEvent(event: Event): Project = {
      event match {
        case ProjectOpened(id, name, staff, _) => OpenedProject(...)
      }
    }
  }
}
  1. I test it in a cluster. The node will join itself to the cluster.
ActorSystem(Behaviors.setup[String] { ctx =>
  implicit val system: ActorSystem[Nothing] = ctx.system
  
  val cluster = Cluster(system)
  cluster.manager ! Join(cluster.selfMember.address)  
  Project.init(system)

  val project = ClusterSharding(system).entityRefFor(Project.EntityKey, "2023-001")
  
  val reply: Future[Done] = project.askWithStatus(Project.OpenProject(...))
  
  Console.println(reply.isCompleted)
  
  Behaviors.empty
}, "eProject")
  1. It’s the application.conf.
include "persistence.conf"

akka {
  loglevel = DEBUG
  actor {
    provider = "cluster"
    serialization-bindings {
      "priv.abbey.domain.CborSerializable" = jackson-cbor
    }
  }

  remote.artery {
    canonical.port = 2551
    canonical.hostname = 127.0.0.1
  }

  cluster {
    seed-nodes = [
      "akka://eProject@127.0.0.1:2551"
    ]

    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
}
  1. It’s the log.
// preparing
[INFO] [akka.event.slf4j.Slf4jLogger] [] [] [eProject-akka.actor.default-dispatcher-3] - Slf4jLogger started
[DEBUG] [akka.event.EventStream] [] [] [eProject-akka.actor.default-dispatcher-3] - logger log1-Slf4jLogger started
[DEBUG] [akka.event.EventStream] [] [] [eProject-akka.actor.default-dispatcher-3] - Default Loggers started
[DEBUG] [akka.remote.artery.ArteryTransport] [] [] [eProject-akka.actor.default-dispatcher-3] - Using flight recorder akka.remote.artery.jfr.JFRRemotingFlightRecorder@40e10ff8
[INFO] [akka.remote.artery.ArteryTransport] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - Remoting started with transport [Artery tcp]; listening on address [akka://eProject@127.0.0.1:2551] with UID [-3264337724963125487]
[INFO] [akka.cluster.Cluster] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - Cluster Node [akka://eProject@127.0.0.1:2551] - Starting up, Akka version [2.7.0] ...
[INFO] [akka.cluster.Cluster] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - Cluster Node [akka://eProject@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [akka.cluster.Cluster] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - Cluster Node [akka://eProject@127.0.0.1:2551] - Started up successfully
[INFO] [akka.cluster.sbr.SplitBrainResolver] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - SBR started. Config: strategy [KeepMajority], stable-after [20 seconds], down-all-when-unstable [15 seconds], selfUniqueAddress [akka://eProject@127.0.0.1:2551#-3264337724963125487], selfDc [default].
// cluster is running.
[INFO] [akka.cluster.Cluster] [akka://eProject@127.0.0.1:2551] [akkaMemberChanged] [eProject-akka.actor.default-dispatcher-5] - Cluster Node [akka://eProject@127.0.0.1:2551] - Node [akka://eProject@127.0.0.1:2551] is JOINING itself (with roles [dc-default], version [0.0.0]) and forming new cluster
[INFO] [akka.cluster.Cluster] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-5] - Cluster Node [akka://eProject@127.0.0.1:2551] - is the new leader among reachable nodes (more leaders may exist)
[INFO] [akka.cluster.Cluster] [akka://eProject@127.0.0.1:2551] [akkaMemberChanged] [eProject-akka.actor.default-dispatcher-5] - Cluster Node [akka://eProject@127.0.0.1:2551] - Leader is moving node [akka://eProject@127.0.0.1:2551] to [Up]
[INFO] [akka.cluster.sbr.SplitBrainResolver] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - This node is now the leader responsible for taking SBR decisions among the reachable nodes (more leaders may exist).
[INFO] [akka.cluster.Cluster] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-11] - Cluster Node [akka://eProject@127.0.0.1:2551] - Trying to join [akka://eProject@127.0.0.1:2551] when already part of a cluster, ignoring
[INFO] [akka.cluster.sharding.typed.scaladsl.ClusterSharding] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-5] - Starting Shard Region [Project]...
[INFO] [akka.cluster.sharding.ShardRegion] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - Project: Automatic entity passivation: idle entities after [2.000 min], checked every [1.000 min]
[INFO] [akka.cluster.singleton.ClusterSingletonManager] [akka://eProject@127.0.0.1:2551] [akkaClusterSingletonStarted] [eProject-akka.actor.default-dispatcher-3] - Singleton manager starting singleton actor [akka://eProject/system/sharding/ProjectCoordinator/singleton]
[INFO] [akka.cluster.singleton.ClusterSingletonManager] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - ClusterSingletonManager state change [Start -> Oldest]
[INFO] [akka.cluster.sharding.DDataShardCoordinator] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-12] - Project: ShardCoordinator was moved to the active state with [0] shards
// cassandra is connected.
[INFO] [akka.persistence.Persistence] [akka://eProject@127.0.0.1:2551] [] [eProject-akka.actor.default-dispatcher-3] - Auto-starting journal plugin `akka.persistence.cassandra.journal`
[INFO] [com.datastax.oss.driver.internal.core.DefaultMavenCoordinates] [] [] [eProject-akka.persistence.cassandra.default-dispatcher-14] - DataStax Java driver for Apache Cassandra(R) (com.datastax.oss:java-driver-core) version 4.14.1
[INFO] [com.datastax.oss.driver.internal.core.time.Clock] [] [] [s0-admin-0] - Using native clock for microsecond precision
[INFO] [com.datastax.oss.driver.api.core.uuid.Uuids] [] [] [eProject-akka.persistence.cassandra.default-dispatcher-17] - PID obtained through native call to getpid(): 11276
// dead letters happen.
[INFO] [akka.actor.RepointableActorRef] [akka://eProject@127.0.0.1:2551] [akkaDeadLetter] [eProject-akka.actor.default-dispatcher-12] - Message [java.lang.String] from Actor[akka://eProject/system/akka.persistence.cassandra.journal/tagWrites/2023-001#-274243647] to Actor[akka://eProject/system/distributedPubSubMediator#1280263410] was not delivered. [1] dead letters encountered. If this is not an expected behavior then Actor[akka://eProject/system/distributedPubSubMediator#1280263410] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akka.actor.RepointableActorRef] [akka://eProject@127.0.0.1:2551] [akkaDeadLetter] [eProject-akka.actor.default-dispatcher-12] - Message [java.lang.String] from Actor[akka://eProject/system/akka.persistence.cassandra.journal/tagWrites/class+priv.abbey.domain.Project%24ProjectOpened#-760895356] to Actor[akka://eProject/system/distributedPubSubMediator#1280263410] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://eProject/system/distributedPubSubMediator#1280263410] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef] [akka://eProject@127.0.0.1:2551] [akkaDeadLetter] [eProject-akka.actor.default-dispatcher-11] - Message [akka.pattern.StatusReply] to Actor[akka://eProject/deadLetters] was not delivered. [3] dead letters encountered. If this is not an expected behavior then Actor[akka://eProject/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef] [akka://eProject@127.0.0.1:2551] [akkaDeadLetter] [eProject-akka.actor.default-dispatcher-19] - Message [akka.pattern.StatusReply] to Actor[akka://eProject/deadLetters] was not delivered. [4] dead letters encountered. If this is not an expected behavior then Actor[akka://eProject/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akka.actor.RepointableActorRef] [akka://eProject@127.0.0.1:2551] [akkaDeadLetter] [eProject-akka.actor.default-dispatcher-19] - Message [java.lang.String] from Actor[akka://eProject/system/akka.persistence.cassandra.journal/tagWrites/class+priv.abbey.domain.Project%24ProjectClosed#-1300131164] to Actor[akka://eProject/system/distributedPubSubMediator#1280263410] was not delivered. [5] dead letters encountered. If this is not an expected behavior then Actor[akka://eProject/system/distributedPubSubMediator#1280263410] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [akka.actor.RepointableActorRef] [akka://eProject@127.0.0.1:2551] [akkaDeadLetter] [eProject-akka.actor.default-dispatcher-19] - Message [java.lang.String] from Actor[akka://eProject/system/akka.persistence.cassandra.journal/tagWrites/2023-001#-274243647] to Actor[akka://eProject/system/distributedPubSubMediator#1280263410] was not delivered. [6] dead letters encountered. If this is not an expected behavior then Actor[akka://eProject/system/distributedPubSubMediator#1280263410] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

I run the test code above more than 3 times, and find the cluster is running, the cassandra is connected, the keyspace and table are created automatically. But no command is delivered, all of them become dead letters.

At the other side, I’ve tested the entity Project with ScalaTestWithActorTestKit. All the commands and events handler, states works well.

Feature("Test fixtures of project") {
  Scenario("Open a project.") {
    When("officer sends command Open.")
    val jack   = AssignedContact(TransientId, "Jack", "Department Settlement", Mobile("18900020003"))
    val result = project.runCommand[StatusReply[Done]](replyTo => OpenProject("Construction", jack, replyTo))
    
    Then("a new project is opened, the staff is assigned.")
    result.reply should ===(StatusReply.Success(Done))
    result.state.isInstanceOf[OpenedProject] shouldBe true
    result.state.name shouldBe "Construction"
    result.state.details should ===(Confirmed(current = Details(staff = jack)))
  }
}

I put the node at 127.0.0.1:2551. Is the configuration of the node wrong?

Thanks.

Not sure why the messages are dropped but they are related to an optimisation to shorten the latency between write and reading an event. It is disabled by default enabled through akka.persistence.cassandra.events-by-tag.pubsub-notification = on and are not critical for any functionality, so you can disable it in your config if you are not specifically after low-latency projections.

THANK YOU VERY MUCH !

It’s my persistence.conf

akka {
  persistence {
    journal.plugin = "akka.persistence.cassandra.journal"
    journal.auto-start-journals = ["akka.persistence.cassandra.journal"]
    snapshot-store.plugin = "akka.persistence.cassandra.snapshot"

    cassandra {
      events-by-tag {
        bucket-size = "Day"
        eventual-consistency-delay = 2s
        flush-interval = 50ms
        pubsub-notification = on
        first-time-bucket = "20230101T00:00"
      }

      query {
        refresh-interval = 2s
      }

      journal {
        keyspace = "eproject"
        keyspace-autocreate = true
        tables-autocreate = true
      }

      snapshot {
        keyspace = "eproject"
        keyspace-autocreate = true
        tables-autocreate = true
      }
    }
  }

  projection {
    cassandra.offset-store.keyspace = "eproject"
    cassandra.session-config-path = "akka.persistence.cassandra"
  }
}

// reference: https://doc.akka.io/docs/akka-persistence-cassandra/current/configuration.html#contact-points-configuration
datastax-java-driver {
  advanced.reconnect-on-init = true
  basic.contact-points = ["192.168.1.200:9042"]
  basic.load-balancing-policy.local-datacenter = "datacenter1"
}

I tried to disable the pubsub-notification, but the messages are dropped too.

I checked the cassandra. The messages are persisted.