Lagom multiple TopicProducer in one service

Hi Guys,

I have lagom service, that has two topics like this:

case class Envelope(type: String, payload: String)

trait MyService extends Service {
def topic1: Topic[Envelope]
def topic2: Topic[Envelope]

named("scm")
.withTopics(topic("topic", topic1),
 topic("topic", topic2))
}
}

Also, I have 2 persistent entities here, each of them has their own AggregateEventTag. So, I want to listen to eventStreams of two different PersistentEntities and publish messages from them to kafka(to the same topic).
Is it possible in Lagom?

Hi,

Yes this is possible.
You just need to reference appropriate AggregateEventTag in each topic method implementation.

Br,
Alan

Hi Alan,

Thank you for your response!

I use appropriate AggregateEventTag in each topic method implementation, but I got error that not allow me to use the same topic name:
akka.actor.InvalidActorNameException: actor name [cluster-distribution-kafkaProducer-topic] is not unique!

Hi,

Sorry, i miss read your question.
Publishing to the same topic from a different topic definitions is not possible because topic producer name (internal Lagom component) is derived from topic name and you get this error.
Br,
Alan

One thing that you could do is to use one topic definition and merge two event stream in method implementation.
I personally did not try it.

Better check this: Merge 2 persistent event streams into 1 public event stream
(I forgot about that :))

I tried to do it like
Source.combine(source1, source2)(Concat(_))

but unfortunatly it is not working.

The problem with using Concat is that it waits for the first source to complete before sending elements from the second one. However, the event streams don’t end: they continually poll for new events, so the first source won’t complete, and you won’t get elements from the second source.

Try using source1.merge(source2) instead:
https://doc.akka.io/docs/akka/current/stream/operators/Source-or-Flow/merge.html

Thank you Tim, it works.