Unable to publish/create topic using PubSubRegistry

Hi,
I have started using lagom recently. Based on this link my understanding is, a message should be published on the constructed topic - especially this part of the sample code I am referring to.

      final PubSubRef<Temperature> topic = pubSub.refFor(TopicId.of(Temperature.class, id));
      topic.publish(temperature);

I couldn’t build Temperature DTO to POST from rest client. So I created my on DTO which is exactly similar to HelloEvent - in my case its KafkaEvent.

I tried to use the code from here

However I did not see the topic created after performing POST operation. I did add print statements and they do appear in console.

   System.out.println("Received id:" + id);
   final PubSubRef<KafkaEvent> topic = pubSub.refFor(TopicId.of(KafkaEvent.class, id));
   topic.publish(temperature);
   System.out.println("Sent to:" + topic.toString());

I am not seeing any error in kafka server log or in my project.

Is there any step I am missing? or my understanding is wrong in usage of PubSubRegistry?

Please do let me know if further details are required.
Thanks in advance
Naveena

PubSub isn’t about Kafka Topic. It’s about intra-service communication. See details here. For work with message broker (Kafka) see the next chapter.

Regards,
Sergey

Hello Sergey,

Thank you for your response. I will go through the link you have pointed and get back if I have any queries.

Naveena

After some tinkering, I got to know that a Kafka message is published after a POST service call.

  public ServiceCall<GreetingMessage, Done> useGreeting(String id) {
    return request -> {
      // Look up the hello world entity for the given ID.
      PersistentEntityRef<HelloCommand> ref = persistentEntityRegistry.refFor(HelloEntity.class, id);
      // Tell the entity to use the greeting message specified.
      return ref.ask(new UseGreetingMessage(request.message));
    };

Basically what I am trying to achieve is
– Subscribe to topic A
– Upon receiving the message in Topic A, persist the kafka offset, execute some business logic
– Publish the result to Topic B

I am able to do above logic without lagom - with kafka client. I am trying to achieve the same in Lagom.

Even in Lagom I am able achieve above logic, but I am unable to persist the message/offset from TopicA

My code that does this is below:

public class CustomSubscriber {
	
	@Inject
	  public CustomSubscriber(HelloService helloService, StreamRepository repository,PersistentEntityRegistry persistentEntityRegistry) {
		  //persistentEntityRegistry.register(CustomEntity.class);
		  persistentEntityRegistry.register(HelloEntity.class);
	    // Create a subscriber
	    helloService
	    .helloEvents()
	    .subscribe()
	      // And subscribe to it with at least once processing semantics.
	    .atLeastOnce(
	        // Create a flow that emits a Done for each message it processes
	        Flow.<HelloEvent>create().mapAsync(1, event -> {

	          if (event instanceof HelloEvent.GreetingMessageChanged) {
	            HelloEvent.GreetingMessageChanged messageChanged = (HelloEvent.GreetingMessageChanged) event;
	            
	            System.out.println("CustomSubscriber.Received kafka message");
	            
	            //Save incoming message in database
	            //PersistentEntityRef<HelloCommand> refHello = persistentEntityRegistry.refFor(HelloEntity.class, messageChanged.getName());
	            // Tell the entity to use the greeting message specified.
	            //refHello.ask(new UseGreetingMessage(messageChanged.message));
	            
                    //Do something

	            //Send the new data
	            PersistentEntityRef<CustomCommand> ref = persistentEntityRegistry.refFor(CustomEntity.class, messageChanged.getName());
	            ref.ask(new CustomCommand.UseGreetingMessage(messageChanged.getMessage()));
	            
	            System.out.println("CustomSubscriber.Published kafka message");
	            
                    // Update the message - This stores only the message
	            repository.updateMessage(messageChanged.getName(), messageChanged.getMessage());
	            
	            
	            return CompletableFuture.completedFuture(Done.getInstance());

	          } else {
	            // Ignore all other events
	            return CompletableFuture.completedFuture(Done.getInstance());
	          }
	        })
	      );

	  }
}

If I uncomment the “Save incoming message in database” portion, there is an infinite loop of message exchange of Topic A and Topic B. In above code, the outgoing message is persisted.

Any help if highly appreciated.
Thanks in advance
Naveena

Maybe you need to use Akka Stream and Alpakka?
Or you can see our library as an example of a simple message publication.

Regards,
Sergey

Hi,

Check also this for reference.

Br,
Alan

1 Like