Kafka topic not getting created

I am trying to write a small microservice using lagom framework with read side implemented to support mysql.

The objective of this service is to expose apis to create, update, and read employees.

Upon execution, however, the project is not creating kafka topic and publishing messages to it. I tried debugging, reading docs and referring couple of other similar projects but no luck so far.

Since I am not really sure which part of the code really has an issue, I have created a fairly trimmed down version of the project in a separate branch named ‘kafka’. https://github.com/codingkapoor/simple-lms-platform/tree/kafka

Lagom documentations and similar projects are the only sources that’s available to find any help for such fairly newer tech. I really need help to debug and understand this issue. Let me know if this the right platform to ask for such a help.

Steps I take to create an employee and possibly see kafka topic created are as follows:

#1. sbt runAll

#2. curl -X POST \
  http://localhost:9000/api/employees \
  -H 'Content-Type: application/json' \
  -d '{
    "id": "128",
    "name": "Shivam",
    "gender": "M",
    "doj": "2017-01-16",
    "pfn": "PFKN110"
}'

#3. /opt/kafka_2.12-2.3.0/bin/kafka-topics.sh --list --zookeeper localhost:2181

#4. /opt/kafka_2.12-2.3.0/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic employee --from-beginning

runAll logs

[info] (Service started, press enter to stop and go back to the console...)
23:24:46.420 [warn] akka.cluster.sharding.ShardRegion [sourceThread=employee-impl-application-akka.actor.default-dispatcher-3, akkaTimestamp=17:54:46.418UTC, akkaSource=akka.tcp://employee-impl-application@127.0.0.1:42561/system/sharding/kafkaProducer-employee, sourceActorSystem=employee-impl-application] - kafkaProducer-employee: Trying to register to coordinator at [ActorSelection[Anchor(akka://employee-impl-application/), Path(/system/sharding/kafkaProducer-employeeCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(address = akka.tcp://employee-impl-application@127.0.0.1:42561, status = Up)] is reachable.]
23:24:46.527 [warn] akka.cluster.sharding.ShardRegion [sourceThread=employee-impl-application-akka.actor.default-dispatcher-17, akkaTimestamp=17:54:46.526UTC, akkaSource=akka.tcp://employee-impl-application@127.0.0.1:42561/system/sharding/EmployeeEventProcessor, sourceActorSystem=employee-impl-application] - EmployeeEventProcessor: Trying to register to coordinator at [ActorSelection[Anchor(akka://employee-impl-application/), Path(/system/sharding/EmployeeEventProcessorCoordinator/singleton/coordinator)]], but no acknowledgement. Total [1] buffered messages. [Coordinator [Member(address = akka.tcp://employee-impl-application@127.0.0.1:42561, status = Up)] is reachable.]

I have also asked this question on Stackoverflow. https://stackoverflow.com/questions/57639640/lagom-framework-kafka-topic-not-getting-created

Hi @codingkapoor,

it looks like your broker has topic auto creation disabled (see allow.auto.create.topics https://kafka.apache.org/documentation/#topicconfigs). I noticed on your snippets you use a local broker of Kafka 2.3.0 and I wonder if the default value of that setting was switched (or maybe your admin/installer did)?

Cheers,

Thanks for you response. I was able to figure out the issue and have updated the mentioned stackoverflow post with the answer.

  1. First issue was because of the order in which the traits ReadSideJdbcPersistenceComponents and WriteSideCassandraPersistenceComponents are extended to create EmployeeApplication . Due to a bug in Lagom the ordering in which you mix in those two traits is relevant and only if you mix in ReadSideJdbcPersistenceComponents before WriteSideCassandraPersistenceComponents you will be able to use this combination.Please see this README from lagom-samples.

  2. Also, I was not implementing polymorphic event stream properly as explained in the lagom documentation here.