Hi,
I have started using lagom recently. Based on this link my understanding is, a message should be published on the constructed topic - especially this part of the sample code I am referring to.
final PubSubRef<Temperature> topic = pubSub.refFor(TopicId.of(Temperature.class, id));
topic.publish(temperature);
I couldn’t build Temperature DTO to POST from rest client. So I created my on DTO which is exactly similar to HelloEvent - in my case its KafkaEvent.
After some tinkering, I got to know that a Kafka message is published after a POST service call.
public ServiceCall<GreetingMessage, Done> useGreeting(String id) {
return request -> {
// Look up the hello world entity for the given ID.
PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloEntity.class, id);
// Tell the entity to use the greeting message specified.
return ref.ask(new UseGreetingMessage(request.message));
};
Basically what I am trying to achieve is
– Subscribe to topic A
– Upon receiving the message in Topic A, persist the kafka offset, execute some business logic
– Publish the result to Topic B
I am able to do above logic without lagom - with kafka client. I am trying to achieve the same in Lagom.
Even in Lagom I am able achieve above logic, but I am unable to persist the message/offset from TopicA
My code that does this is below:
public class CustomSubscriber {
@Inject
public CustomSubscriber(HelloService helloService, StreamRepository repository,PersistentEntityRegistry persistentEntityRegistry) {
//persistentEntityRegistry.register(CustomEntity.class);
persistentEntityRegistry.register(HelloEntity.class);
// Create a subscriber
helloService
.helloEvents()
.subscribe()
// And subscribe to it with at least once processing semantics.
.atLeastOnce(
// Create a flow that emits a Done for each message it processes
Flow.<HelloEvent>create().mapAsync(1, event -> {
if (event instanceof HelloEvent.GreetingMessageChanged) {
HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) event;
System.out.println("CustomSubscriber.Received kafka message");
//Save incoming message in database
//PersistentEntityRef<HelloCommand> refHello = persistentEntityRegistry.refFor(HelloEntity.class, messageChanged.getName());
// Tell the entity to use the greeting message specified.
//refHello.ask(new UseGreetingMessage(messageChanged.message));
//Do something
//Send the new data
PersistentEntityRef<CustomCommand> ref = persistentEntityRegistry.refFor(CustomEntity.class, messageChanged.getName());
ref.ask(new CustomCommand.UseGreetingMessage(messageChanged.getMessage()));
System.out.println("CustomSubscriber.Published kafka message");
// Update the message - This stores only the message
repository.updateMessage(messageChanged.getName(), messageChanged.getMessage());
return CompletableFuture.completedFuture(Done.getInstance());
} else {
// Ignore all other events
return CompletableFuture.completedFuture(Done.getInstance());
}
})
);
}
}
If I uncomment the “Save incoming message in database” portion, there is an infinite loop of message exchange of Topic A and Topic B. In above code, the outgoing message is persisted.
Any help if highly appreciated.
Thanks in advance
Naveena