Event Sourcing, audit trail for free, really?


(Dominique Schoenenberger) #1

I’m trying to implement a Lagom project managing customers. It has a web interface and I want to have a web page with at some point the detail of one customer and notably a table of all actions performed on that customer. In Lagom terms, it is all events for an entity.

When you read Event Sourcing advantages, one is “It provides event sourcing for free”. Ok, good but the implementation is not trivial, at least for me !

May be there is a simple way to do it ? or an example exists ?

My current solution (which not really working) used a Source service Call on a Topic (Kafka behind) which is nothing else than a web socket I can use in the Javascript part.
It works but I have 2 problems:

  • The web socket is closed after 75 seconds. (Not sure but may be normal as you can read it here: https://github.com/lagom/lagom/issues/1284 but something weird with the error (1011)). To solve that, I re-create the web socket when it is closed.
  • I get all event only at startup and then only the newly created

There is another Lagom feature which look promising at first for this feature: eventStream on PersistentEntityRegistry as you can read here: https://www.lagomframework.com/documentation/1.4.x/scala/ReadSide.html#Raw-Stream-of-Events but it looks like it shouldn’t be used in Service Call (how bad)

Any help would be grateful
If you are interested, the code is here: https://github.com/domschoen/customerLagom


(Michael Liarakos) #2

Another approach is to create a read side for the events, similar to your CustomerRepository. Instead of using the events to create/modify a customer table, simply store the events by customer. You can then query that read side in a service call and get a list of events for a customer (and not a stream). Be aware that read sides are eventually consistent, so there will be a delay between a change in the customer entity and the corresponding event being processed by the read side.


(Renato) #3

Hi @domschoen,

It’s not clear what you are trying to achieve. There are many ways of consuming events in Lagom and you choose the technique according to the use case you want to implement.

It’s also important to understand the principles behind Event Sourcing and the difference of event sourcing and Event Driven. And how one leverage the other.

Can you explain what you are trying to achieve?

Cheers,

Renato


(Dominique Schoenenberger) #4

For the moment, I have 2 events:

  • CustomerCreated
  • CustomerRenamed

I want to be able to open a Web page displaying information of one customer. The page will contain:

  • customer information (name, country, etc.) These are the current state of the customer
  • a table with all changed made on that customer. (This table will be nothing else than the list of all events for that customer, same events as in the event sourcing). It’s an audit trail. I want to use a websocket for this table so I can see live any new change to that customer. That why I’m talking about stream.

I have created a service call for the websocket:

def getLiveCustomerEvents: ServiceCall[NotUsed, Source[CustomerEvent, NotUsed]]

In the implementation, I subscribe to the customer event topic:

  override def getLiveCustomerEvents: ServiceCall[NotUsed, Source[api.CustomerEvent, NotUsed]] = ServiceCall { _ =>
    val source = customerService.customerEventsTopic.subscribe.atMostOnceSource
    val newSource: Source[api.CustomerEvent, NotUsed] =  Source.fromGraph(source)
      .mapMaterializedValue(ev => NotUsed.getInstance())
    Future.successful(newSource)
  }

When I start the project (runAll), It can see all events listed in the audit trail table of the web page. If I rename the customer, It can see the Customer Renamed event added to the table of the web page.
Until here it works perfect ! but If I open another browser page, the audit trail table is empty and never updated.
It’s like only one web socket will get the stream.


(Renato) #5

This is happening because the first websocket consumes the events from Kafka and commits the offset on the topic indicating that it already consumed them and don’t need to receive it again.

When you open it again, using a second websocket, Kafka won’t re-delivery the events because they were already consumed.

That approach won’t work for what you are trying to achieve. Not only for the reasons I just mentioned, but also because you probably want the audit trail from one single customer, while that topic will probably delivery events from all customers.

If you are consuming from the same service, you can instead create a akka-persistence query and select all events for a give customer. This is not Lagom functionality. You will need to go one level down and use akka-persistence-query directly.

You need to, on each new websocket, fetch the events for customer A using akka-persistence-query. It will return a Source that Lagom will bind to the websocket.

Note that each open websocket will be an active database connection to your DB. You should evaluate how this will impact your system and how many of those you can keep open.

You can eventually optimize it and have an in-memory cache of past events that you fill on the first request and reuse when other websockets are open. The cache can be discarded when nobody is consuming from it. Just an idea, I never tried such a thing. :slight_smile:

Cheers,

Renato