Cassandra write-side Kafka producer

I am implementing a POC with a Kafka producer which grab around 10 sensor values from the external world per 5 seconds. The values are forwarded to a consumer which has a lot of stuff to do. I am happy that kafka provides retention in case of consumer failure.

Using Lagom, I understand that persistence write-side is mandatory so I am using Cassandra and it works fine. But unfortunately the Cassandra “journal” seems to drastically grow and I can’t imagine the size of my Cassandra database after many years in production. Is there a way to by-pass Cassandra persistence or optimize the data storage on the write-side ?

It seems that you are not using event sourcing am i right ? If yes, do you really need to store you data for a long time in cassandra ?

I suppose I am using event sourcing, since I can see in Cassandra the incoming values one after the others. In the same time, the data are transmitted from the producer to the consumer via Kakfa, all seems to be fine.

  1. Declaration of service:
return Service.named(getServiceName())
                .withTopics(
                        Service.topic(getTopicName(), this::dataValueTopic)
                                .withProperty(KafkaProperties.partitionKeyStrategy(),  AbstractDataValue::getTimeSeriesCode)
                )
                .withAutoAcl(true)
  1. Implementation of the service:
    public void publishDataValue(AbstractDataValue dataValue){
        PersistentEntityRef<DataValueCommand> ref =
                persistentEntityRegistry.refFor(DataValueEntity.class, dataValue.getTimeSeriesCode());
        ref.ask(new DataValueCommand.PublishDataValueMessage(dataValue));
    }

    public Topic<AbstractDataValue> dataValueTopic() {
        return TopicProducer
                .singleStreamWithOffset(
                        offset -> persistentEntityRegistry.eventStream(DataValueEventTag.INSTANCE, offset)
                                .map(this::convertEvent));
    }

    private Pair<AbstractDataValue, Offset> convertEvent(Pair<DataValueEvent, Offset> pair) {
        return new Pair<>(pair.first().getDataValue(), pair.second());
    }

publishDataValue method is used by an internal Akka actor which aims to grab the external sensors data

  1. Persistent entity:
public class DataValueEntity extends PersistentEntity<DataValueCommand, DataValueEvent, DataValueState> {

    @Override
    public Behavior initialBehavior(Optional<DataValueState> snapshotState) {
        BehaviorBuilder b = newBehaviorBuilder(
                snapshotState.orElse(DataValueState.EMPTY));

        b.setCommandHandler(DataValueCommand.PublishDataValueMessage.class,
                (cmd, ctx) ->
                ctx.thenPersist(new DataValueEvent.DataValueMessagePublished(cmd.getDataValue()),
                        // Then once the event is successfully persisted, we respond with done.
                        evt -> ctx.reply(Done.getInstance())));

        b.setEventHandler(DataValueEvent.DataValueMessagePublished.class,
                evt -> new DataValueState());

        return b.build();
    }
}

What I understood is that just after thenPersist, the Lagom framework send the event to Kafka.

This is not the right explanation and the right way ?

Event source is for distributed entity in large systems. If you just want to consumer faster as you can your data to “do a lot of things” does it seems until that you do not need an event source approach is more like an stream process pipeline than an event source one. You can use Kafka and cassandra to do the both ones but the main difference is the data storage time. Usually in event sourcing you need to storage the data changes for a given entity and persist the incoming events that change the entity. In the other hand you can just process your data and after do something. Also you can apply an event source after the “process” pipeline.

Obviously i don’t know the business logic of your system but this come to my mind.

Lagom uses cassadnra for two main reasons:

  • Offset storage - To handle kafka offset
  • Entity state - To handle event source

Maybe you do not need event sourcing to do what you want =) so the storage problem will not exist