Isn't maintaining events both in Cassandra and Kafka redundant?

I had been experimenting with Lagom with a small project : https://github.com/codingkapoor/lagom-scala-slick.

Q1. What I have noticed that lagom after persisting events in Cassandra writes the same to Kafka too. I understand that using Kafka decouples services but isn’t maintaining events both in Cassandra and Kafka redundant?

From the README of the above mentioned project I have been working on:

Verify Kafka

$ /opt/kafka_2.12-1.0.0/bin/kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
employee

$ /opt/kafka_2.12-1.0.0/bin/kafka-console-consumer.sh --topic employee --bootstrap-server localhost:9092 --from-beginning
{"gender":"M","name":"Shivam","pfn":"PFKN110","id":"128","type":"EmployeeAdded","doj":"2017-01-16"}

Verify Cassandra

$ /opt/apache-cassandra-3.11.4/bin/cqlsh localhost 4000
cqlsh> USE lagom_scala_slick;
cqlsh:simplelms> select * from messages ;

 persistence_id                                 | partition_nr | sequence_nr | timestamp                            | timebucket | used | event                                                                                                                                                                                                                                                                                                                                      | event_manifest | message | meta | meta_ser_id | meta_ser_manifest | ser_id  | ser_manifest                                              | tag1                                                      | tag2 | tag3 | writer_uuid
------------------------------------------------+--------------+-------------+--------------------------------------+------------+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------+---------+------+-------------+-------------------+---------+-----------------------------------------------------------+-----------------------------------------------------------+------+------+--------------------------------------
                  EmployeePersistenceEntity|128 |            0 |           1 | 9757cab0-cb0c-11e9-91c9-d1a5d92dc8d6 |   20190830 | True |                                                                                                                                                                                 0x7b226964223a22313238222c226e616d65223a2253686976616d222c2267656e646572223a224d222c22646f6a223a22323031372d30312d3136222c2270666e223a2250464b4e313130227d |                |    null | null |        null |              null | 1000004 | com.codingkapoor.employee.persistence.write.EmployeeAdded | com.codingkapoor.employee.persistence.write.EmployeeEvent | null | null | 5ecbca45-9b79-46c5-a858-5c97d9ed1da2
    /sharding/kafkaProducer-employeeCoordinator |            0 |           1 | 947e2530-cb09-11e9-91c9-d1a5d92dc8d6 |   20190830 | True |                                                                                                                           0x0a65616b6b612e7463703a2f2f656d706c6f7965652d696d706c2d6170706c69636174696f6e403132372e302e302e313a33383833352f73797374656d2f7368617264696e672f6b61666b6150726f64756365722d656d706c6f79656523333832323635343635 |                |    null | null |        null |              null |      13 |                                                        AB |                                                      null | null | null | c39f782f-5a96-4628-9568-060d01ad6a93
    /sharding/kafkaProducer-employeeCoordinator |            0 |           2 | 94850300-cb09-11e9-91c9-d1a5d92dc8d6 |   20190830 | True |                                                                                                     0x0a0973696e676c65746f6e1265616b6b612e7463703a2f2f656d706c6f7965652d696d706c2d6170706c69636174696f6e403132372e302e302e313a33383833352f73797374656d2f7368617264696e672f6b61666b6150726f64756365722d656d706c6f79656523333832323635343635 |                |    null | null |        null |              null |      13 |                                                        AF |                                                      null | null | null | c39f782f-5a96-4628-9568-060d01ad6a93
    /sharding/EmployeeEventProcessorCoordinator |            0 |           1 | 947e2531-cb09-11e9-91c9-d1a5d92dc8d6 |   20190830 | True |                                                                                                                       0x0a67616b6b612e7463703a2f2f656d706c6f7965652d696d706c2d6170706c69636174696f6e403132372e302e302e313a33383833352f73797374656d2f7368617264696e672f456d706c6f7965654576656e7450726f636573736f72232d31323535303739333037 |                |    null | null |        null |              null |      13 |                                                        AB |                                                      null | null | null | b12e8f2c-485d-4c51-a7b3-7da9d66f3949
    /sharding/EmployeeEventProcessorCoordinator |            0 |           2 | 94848dd0-cb09-11e9-91c9-d1a5d92dc8d6 |   20190830 | True | 0x0a39636f6d2e636f64696e676b61706f6f722e656d706c6f7965652e70657273697374656e63652e77726974652e456d706c6f7965654576656e741267616b6b612e7463703a2f2f656d706c6f7965652d696d706c2d6170706c69636174696f6e403132372e302e302e313a33383833352f73797374656d2f7368617264696e672f456d706c6f7965654576656e7450726f636573736f72232d31323535303739333037 |                |    null | null |        nmessagull |              null |      13 |                                                        AF |                                                      null | null | null | b12e8f2c-485d-4c51-a7b3-7da9d66f3949
 /sharding/EmployeePersistenceEntityCoordinator |            0 |           1 | 947e2532-cb09-11e9-91c9-d1a5d92dc8d6 |   20190830 | True |                                                                                                                     0x0a68616b6b612e7463703a2f2f656d706c6f7965652d696d706c2d6170706c69636174696f6e403132372e302e302e313a33383833352f73797374656d2f7368617264696e672f456d706c6f79656550657273697374656e6365456e74697479232d3231303737303339 |                |    null | null |        null |              null |      13 |                                                        AB |                                                      null | null | null | d4fbac44-e315-4f76-8fe3-7184f874cb87
 /sharding/EmployeePersistenceEntityCoordinator |            0 |           2 | 975050a0-cb0c-11e9-91c9-d1a5d92dc8d6 |   20190830 | True |                                                                                                             0x0a0239351268616b6b612e7463703a2f2f656d706c6f7965652d696d706c2d6170706c69636174696f6e403132372e302e302e313a33383833352f73797374656d2f7368617264696e672f456d706c6f79656550657273697374656e6365456e74697479232d3231303737303339 |                |    null | null |        null |              null |      13 |                                                        AF |                                                      null | null | null | d4fbac44-e315-4f76-8fe3-7184f874cb87

Q2. How really is this message cassandra column family different from kafka topic in terms of content they maintain?

Q3. Is it necessary to implement Kafka just to implement Readside processor? Can’t we just implement the same using only kafka or cassandra??

Q4. Also, the EmployeeEvent definition under lagom-scala-slick/employee-api/src/main/scala/com/codingkapoor/employee/api/ is precisely KAFKA Event Definition while the one under lagom-scala-slick/employee-impl/src/main/scala/com/codingkapoor/employee/persistence/write/ is PERSISTENTENTITY Event Definition? Correct me if I am wrong.

Please explain.

Hi @codingkapoor,

It seems that you have written that sample but it’s asking why the sample is like that. That’s a little confusing. I will try to answer your question one by one but keep in mind that things are like that because you choose so. ;-)

Q1. What I have noticed that lagom after persisting events in Cassandra writes the same to Kafka too. I understand that using Kafka decouples services but isn’t maintaining events both in Cassandra and Kafka redundant?

Yes, if you copied the same event it’s redundant. The goal is not to expose the events of your model as is, but to extract the bits you want the consumers of your service to see. If you are exposing everything, then indeed there is duplication.

On the other hand, you can answer that question by asking another question. Do you want to distribute your Cassandra credentials to any other application that need to read your events? Events published in Kafka are not for your own consumption, but for other services consumption.

Moreover, Kafka is optimiser for having many consumers pulling events. It’s a message broker and as such designed to propagate messages to hundreds of consumers.

Q2. How really is this message cassandra column family different from kafka topic in terms of content they maintain?

As said, they are the same if you publish the same events. Or maybe you are asking: why do we need Cassandra and Kafka? (see answer above)

Q3. Is it necessary to implement Kafka just to implement Readside processor? Can’t we just implement the same using only kafka or cassandra??

No, and that’s certainly not what your app is doing. Read-side processors consume events from your event journal (in your case Cassandra). Not from Kafka.

Q4. Also, the EmployeeEvent definition under lagom-scala-slick/employee-api/src/main/scala/com/codingkapoor/employee/api/ is precisely KAFKA Event Definition while the one under lagom-scala-slick/employee-impl/src/main/scala/com/codingkapoor/employee/persistence/write/ is PERSISTENTENTITY Event Definition? Correct me if I am wrong.

Correct, that’s how you wrote it. ;-)
The events on your API level is what you’re exposing on the ServiceDescriptor. The events from the entity are what you use to model your domain and they go to the event journal.

Hello @octonato,

Many thanks for your response. It clarifies so many doubts and appologies for if my questions seem lame for now. I assure there are going to get better with time.

I tried disabling kafka and my readside processor worked just fine. I was assuming it was dependant on that kafka topic. Thanks for clarification.

I read the following in this discussion: Events with Persistence using Cassandra

One of the techniques to guarantee message delivery is persisting these events before putting them on a message bus. If failures happen, then they can be retried later.

Q1 I would like to know if it is true and where can I read more about it?

Q2 Would it be considered good or bad practice to implement the journal in Kafka itself? Does lagom supports implementing journal in kafka out of the box? Why it chose cassandra and not kafka to implement journal?

Let me preface my comment here to note that I am looking at this from the context of real systems in a business organization.

I really like @octonato’s earlier reply. A key point for me, is that one ought to draw clear distinctions between one’s internal data schemas, versus the external schemas you offer up accepting that consumers (often in other parts of the organization) may couple their systems on to.

This is because it’s typically more difficult to make changes to those external schemas that consumers couple on to, due to the logistics of getting those consumers to make changes how and when you need them to (because of organizational issues). When making changes becomes difficult, trade-offs are made and system architectures can begin to get messy - the beginning of a decay into the infamous “big ball of mud”. So we ought aim to keep system components as decoupled as possible, seeking to maintain a high degree of flexibility at least within our own sphere of influence. If you need to change your internals, you should be able to.

That said, event sourcing from Kafka seems like a feasible option and I’m sure many do that. However, as Renato pointed out, one should take care about what one includes in the external data schema / API, versus the internal data model of your service. This point is dear to my heart :)

If I were to use Kafka for both purposes, to maintain a decoupled architecture, I would at least segregate the Kafka topics, having one as my service’s “private parts” or “crown jewels”, and another topic to be used as the externally facing API. Whatever you use to store your internal private data, be it Kafka or Cassandra or an RDBMS, don’t share the credentials or details with others. If others rightfully need data that your service has, it should always be provided through an explicit external facing interface, not a backdoor to those “crown jewels”.

All that said, I’m really interested to learn more about why Lagom uses Cassandra. I’ve been reading up a little about it myself and get a picture of it as a scalable datastore with good performance characteristics, but haven’t been following the NoSQL space closely over the years to deeply understand. Is that basically it?

I did see that Lightbend and DataStax announced some collaboration together some years back, perhaps also related.

@codingkapoor, @fxgai,
You can check this topic for a similar discussion regarding using kafka as a journal storage

1 Like