Merge 2 persistent event streams into 1 public event stream


(Michael Mangeng) #1

Hi!

I created a lagom service with 2 persistent entities. The service is called AuthzService and manages the PersistentEntities PRoleEntity and PGroupEntity. I now get 2 persistent event streams and i thought that it may be a nice idea to expose only 1 public AuthZEvent-Stream via Kafka - with both streams of role & group merged.

My question is now how i can merge these 2 streams. The current code (handling only PRoleEvent) is:

public Topic<AuthzEvent> roleEvents() {
    return TopicProducer.taggedStreamWithOffset(PRoleEvent.TAG.allTags(),
    (tag, offset) -> registry.eventStream(tag, offset)
        .mapAsync(1, eventAndOffset ->
         convertEvent(eventAndOffset.first())
        .thenApply(event -> Pair.create(event, eventAndOffset.second()))));
}

…but obviously this only handles events of the PRole-Stream…

greetigs,
Michael


(Alan Klikic) #2

@mm
I do not beleive this is possible with a current API.
TopicProducer is only API to get Topic<Message>. With taggedStreamWithOffset, AggregateEventTag<Event> is used to query offset for publish and enforces explicit event type to use (only one).
In your case you need to provide two event types.
registry.eventStream(tag, offset) could be used to create akka stream per event type and then to do akka stream merge but getting the offset is a problem.
Maybe I’m wrong but it looks like that.
If you require merging of two kafka topics into one you could use kafka stream: Kafka stream join
It is not Lagom specific and requires additional dependencies.


(Michael Mangeng) #3

Thank you @aklikic