Cluster Sharding with Persistent Entities

Hello,

I have a two node cluster using persistent actors with sharding. The nodes are exposed to the external world using AKKA HTTP routes. if I get a request for an entity to a node that is hosting its shard, I obviously get the data. However, if the request comes to the other node, I get an ask timeout. What is the best way to ensure I can access entity state regardless of the node that serves the HTTP request. The snippet below is how I initialize the entity reference.

    val sharding = ClusterSharding(context.system)

    sharding.init(Entity(typeKey = ShoppingCartActor.TypeKey) { entityContext =>
      ShoppingCartActor(entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId))
    })
    val shoppingCartActor = sharding.entityRefFor(ShoppingCartActor.TypeKey, "Cart")
    val routes = new ShopingCartRoute(shoppingCartActor)(context.system).routes

    startHttpServer(routes, context.system)

Kind regards

I am still finding my way around the API. I can send a message to the ShardingRegion actor instead of the entity reference from any node to target the right entity instance. However, I am only able to do that using a tell (!), not sure how to do the same using an ask (?)

  val sharding = ClusterSharding(system)
  val shardRegion = sharding.init(Entity(ShoppingCartActor.TypeKey)(createBehavior = entityContext => ShoppingCartActor()))

  def addItem(id: String, quantity: Int) : Future[CartState] = {
    val entityRef = sharding.entityRefFor(ShoppingCartActor.TypeKey, "Cart")
    entityRef ? (ShoppingCartActor.AddItem(id, quantity, _))
  }

  def removeItem(id: String) : Future[CartState]  = {
    val entityRef = sharding.entityRefFor(ShoppingCartActor.TypeKey, "Cart")
    entityRef ? (ShoppingCartActor.RemoveItem(id, _))
  }

  def viewCart() : Future[CartState] = {
    val entityRef = sharding.entityRefFor(ShoppingCartActor.TypeKey, "Cart")
    entityRef ? (ShoppingCartActor.ViewCart(_))
  }

  def ping() = {
    shardRegion ! typed.ShardingEnvelope("Cart", ShoppingCartActor.Ping)
  }

I would like the first three method, addItem, removeItem and viewCart to work exactly the same as ping. Any pointers will be highly appreciated.

Thank you

Kind regards

You should not get a timeout in such a case and Akka will handle the forward to the right node where the entity is living.

Maybe you have issues at another level. Are you sure your actor is responding to the message?

Can you post here the command handler for one of the messages you are sending?

Cheers,

Renato

1 Like

Thank you Renato.

I am getting a timeout if I use the entity reference and the call comes to the node that is not hosting the entity. I thought that was expected behavior and the way to address it was to route the message using shard region. I can get it to behave in a location transparent way in the ping call using shard region. However, that uses tell. I want assItem, removeItem and viewCart to work the way ping works, but using ask pattern. Possibly, I am missing something quite fundamental.

Kind regards

I have slightly changed the code below. The tellShard call works as expected. But askShard method only works if an external request hits the node hosting the shard, on all other nodes it times out.

  val sharding = ClusterSharding(system)
  
  val messageExtractor =
    new HashCodeNoEnvelopeMessageExtractor[ShoppingCartActor.CartCommand](numberOfShards = 30) {
      override def entityId(message: ShoppingCartActor.CartCommand): String = "Cart"
    }
  
  val shardRegion: ActorRef[ShoppingCartActor.CartCommand] = sharding.init(
      Entity(ShoppingCartActor.TypeKey) { context =>
        ShoppingCartActor()
      }.withMessageExtractor(messageExtractor)
  )


  def tellShard() = {
    shardRegion ! ShoppingCartActor.Ping
  }

  def askShard(id: String, quantity: Int): Future[CartState] = {
    shardRegion ? (ShoppingCartActor.AddItem(id, quantity, _))
  }

I have figured the issue. This was due to serialization failing on the reply. I added configuration to allow Java serialization and it works fine. Thank you.

Good, but avoid Java Serialization. Not recommended. It’s slow and unsafe.

Prefer JacksonSerialization and peek the jackson-cbor variation.
https://doc.akka.io/docs/akka/current/serialization-jackson.html#dependency

Yes, thank you. I have changed it now. Many thanks for your help.