Recommended approach for publishing/subscribing persistent entity commands between Lagom services/persistent entities via Kafka?

In the documentation for the message broker API, it states that the primary source of messages that Lagom is designed to produce is persistent entity events. However I need to produce persistent entity commands which are sent from one service to another via Kafka. My use case is that I have one service that represents model entities which accept action commands and produce changed events. I have a separate service that represents agent entities which subscribe to model changed events, and based on their view of the model and internal logic, can then act on the model via action commands that they generate. An agent consuming model changed events fits well with Lagom’s message broker approach, but I am having difficulty figuring out how to properly produce commands which can then be fed into the corresponding model entity. Keep in mind that these commands are really requests to change the model, and it is not guaranteed that the model will be updated accordingly - so they truly are not events.

I could wrap the agent-generated commands as events which can then be consumed by a model entity and handled in its event handler. However this would skip the regular command handling mechanism of the model entity (including validation and responding with failure), and would obviously be the wrong approach to take. Or I could use entityRef and ask within my topic implementation in the model service to forward the wrapped commands, but then I’m not sure about how to properly handle the event stream with possible performance/consistency implications since I would not be using Lagom’s topic producer methods. Or perhaps trying to do this via message broker API is the wrong approach, but I have been thinking along this line because both services are Lagom services.

So what is the recommended approach for publishing/subscribing persistent entity commands between Lagom services/persistent entities via Kafka? Or is there another architectural approach for solving this using Lagom? Any guidance/examples would be greatly appreciated!

hmm… or perhaps I have been mixing concepts from the docs. Instead of focusing on the section on events and topic producer, looking at the section on subscribing to a topic, is it as simple as adding sth along the lines of

private val subscription = myTopic.subscribe.atLeastOnce(
Flow.fromFunction(msg => {
entityRef(msg.entityId).ask(myCommand)
Done
})
)

to my ModelServiceImpl, where myTopic is defined in my ModelServiceApi using withTopics. But if so, where is is recommended to publish back to Kafka? I guess it would be better to do that within my ModelServiceImpl inside the callback to ask (both success and failure responses), using another output topic defined in my ModelServiceApi - no need to push topic handling logic into the persistent entity. And if this is the correct approach, how do I handle backpressure, etc. - by adding more logic into my impl using more from the Akka Streams api?

Sorry about the rambling… It’s just that the focus of the docs is on the event-based approach, and anything that steps outside of it is quite vague for a Lagom beginner. It would be great if anyone can provide some feedback if I’m on the right path or not… Thanks!

Hi @farsimple,

It’s not recommend to do any Kafka operations from inside your entity because they are not guaranteed to succeed. The afterPersist function should not be used for this kind of operations.

I’m not sure how much you have figured out the solution for you, but I guess you are not far.

You usually don’t add commands to a topic, that’s not a recommended thing to do. Instead, if you need react in ServiceB on events on ServiceA, you must subscribe to the events in ServiceA and decide which commands to send to ServiceB based on what you see in the events. So, you need a function with the following signature: EventA => CommandB.

Then, the enitty in ServiceB may simply reject a command, in that case, you should just keep going and don’t let the stream stop.

The implementation you posted here is not good, for two reasons:

  1. Because the call to ask(myCommand) mail fail for reasons that are not related with command validation. For instance, if can fail because of a timeout. You are ignoring the result of ask and returning Done immediately.

  2. You may flood the mailbox of the entity that is receiving the commands. That implementation is picking events from a Kafka topic and sending it to the mailbox non-stop. The PersistentEntity on the other side of the mailbox will probably not be able to handle them on the same pace resulting in a flooded mailbox.

You should only consume the next event when you have an acknowledgement that the previous was processed. And when the Future returned by the ask completes, you need to know if it failed because it was rejected, or because of some other reasons. If it’s because of the former, you need to recover the failed future to a Future[Done] and pick the next event. The later needs to be retried and therefore you need to let the stream fail so it recovers and eventually succeeds to send the failing command.

I hope this clarifies.

Cheers,

Renato