Many-to-one Persistent Entity Subscriptions

Is there a pattern for having persistent entities in Service B subscribe to events concerning multiple persistent entities in Service A?

Let me explain with the online-auction example. The ItemServiceSubscriber (in the bidding service) subscribes to the ItemService topic and translates item events to auction commands. This is possible because there’s a one-to-one relationship between items and auctions, the auction persistent entity id is the same as the item id. The ItemServiceSubscriber knows which auction persistent entity to contact because the item id is reused.

My question is if there’s a pattern for a many-to-one relationship. In the online-auction case that would mean events from multiple items being translated to commands for a single auction persistent entity. Trying to rework the subscriber fails because the subscriber doesn’t know which auction persistent entity to contact when there are multiple ids involved.

I’ve considered a few options, which I’ll explain using the online-auction example. Forgive any places the example gets a little tortured, I figured a concrete example would be more helpful than an abstract one.

The first option is to change the bounded context in the bidding service. I can switch from using multiple auction persistent entities to a single auctions persistent entity whose state is a Map[Id, AuctionState]. This auctions persistent entity would have a static known id. In this case the subscriber can simply translate item events to auction commands that all are sent to the auctions entity. Since the auctions entity contains all the auctions it can route the commands. This solution feels like it violates some Lagom principles because it results in one large persistent entity that isn’t scalable.

The second option is to subscribe each auction persistent entity to the item topic. The implementation I’m thinking of is an AuctionServiceSubsriber in the auction service that handles AuctionStarted events by starting a new item topic subscription that handles all item events specifically for that auction:

biddingService.bidEvents.subscribe.atLeastOnce(Flow[AuctionEvent].mapAsync(1) {
  case AuctionStarted(auction) => 
    // The auction object doesn't have an id field, but assume it does for convenience
    val auctionId = auction.id
    // Start an item topic subscription that always targets this auction
    // Use the group id to differentiate the subscriptions
    itemService.itemEvents.subscribe.withGroupId(s"bidding-$auctionId").atLeastOnce(Flow[ItemEvent].mapAsync(1) {
     case event: SomeItemEvent => entityRef(auctionId).ask(SomeAuctionCommand(event))
     case _ => Future.successful(Done)
    })
    Future.successful(Done)
  case _ => Future.successful(Done)
})

private def entityRef(auctionId: UUID) = persistentEntityRegistry.refFor[AuctionEntity](auctionId.toString)

This setup would need to be wrapped in another subscriber or read side that tracks the active auctions in order to resume the subscriptions if/when the service restarts. I feel like this solution might be abusing the subscriptions in some way, but I’m not sure.

I’m interested in any feedback on these options or other options I haven’t considered.

You shouldn’t think about it as a persistent entity subscribing to a topic, but as an entire service subscribing to a topic and then dispatching each message as needed.

Perform the subscribe in something that runs on startup (this could be your service implementation’s constructor, a custom class invoked from your service implementation’s constructor, or a custom class or function directly invoked in your application loader).

The subscriber should handle all messages in the topic and invoke the persistent entity command using a mapAsync stage in the flow. To know which entity ID to dispatch to, you’ll need to either carry that information in the message itself, or maintain some kind of mapping table on the subscriber side.