Akka Persistence CassandraReadJournal: Unable to get events by tags

I am unable to get events by tags, I am using akka 2.5.23, akka-persistence-cassandra 0.98, and Cassandra 3.6

This is part of my config file (I removed persistence snapshot config for brevity):

cassandra {
      host = "localhost"
      host = ${?CASSANDRA_HOST}
      port = 9042
      port = ${?CASSANDRA_PORT}
      number-of-retries = 500
    }
    
    cassandra-journal {
      contact-points = [${cassandra.host}]
      port = ${cassandra.port}
      tags = {
        account = 1
        accountcreated = 2
      }
    
      event-adapters{
        proto = "MyProtobufAdapter"
      }
    
      event-adapter-bindings{
        "com.google.protobuf.Message" = proto
      }
    }
    
    akka {
      actor {
        serializers {
          proto = "akka.remote.serialization.ProtobufSerializer"
        }
    
        serialization-bindings {
          "com.google.protobuf.Message" = proto
        }
      }
    
      persistence {
        journal.plugin = "cassandra-journal"
      }
    }

The events are successfully persisted in Cassandra:

cqlsh:akka> select persistence_id, tags from messages;
    
     persistence_id                       | tags
    --------------------------------------+-------------------------------
     b34f6158-b693-44af-a01d-be4f9e0463d9 | {'account', 'accountcreated'}

I can successfully read all persistenceIds:

 import akka.persistence.query.{EventEnvelope, PersistenceQuery, Sequence}
    import akka.persistence.cassandra.query.scaladsl.CassandraReadJournal
    
    val journal: CassandraReadJournal = PersistenceQuery(system)
        .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
    
    journal.persistenceIds.runWith(Sink.foreach(println(_)))

But when I try to get a live stream of events by tags, it is empty, but as you can see from the cqlsh output, I have tagged messages.

journal.eventsByTag("account", Sequence(0L)).runWith(Sink.foreach(envelope => println(envelope.persistenceId)))

I am unable to find the problem, please help.

If you check the result of the event by tag stream it will have failed as the Cassandra plugin doesn’t use Sequence offsets. Use NoOffset instead.

1 Like

Thank you so much for your reply. It worked with NoOffset, I got this log info:

[INFO] [07/23/2019 07:35:51.032] [accounts-system-akka.actor.default-dispatcher-7] [EventsByTagStage(akka://accounts-system)] [c619a08b-b3f2-4c5d-a8f4-9c77216b69a3]: EventsByTag query [account] starting with EC delay 5000ms: fromOffset [a4698000-8f19-11e5-8080-808080808080 (2015-11-20 00:00:00:000)] toOffset [None]

But, if I can not specify an offset, how can I create a Resumable Projection ?
I am reading the journal from an actor in charge of populating a Read database (Elasticsearch), for CQRS purpose. Am I forced to read all events every time with Cassandra ?

I found the issue.

There are 3 types of Offset:

  • akka.persistence.query.Offset.sequence(value: Long),
  • akka.persistence.query.Offset.timeBasedUUID(value: UUID)
  • and finally NoOffset if not offset should be used.

But Cassandra does not support the first type (Sequence). So I have to use a time based UUID as offset:

val offset = com.datastax.driver.core.utils.UUIDs.startOf(0L)

journal.eventsByTag("account", offset)
   .runWith(Sink.foreach(envelope => println(envelope.persistenceId)))

This works.
Please let me know if there is a better way.

Thanks.

Thank you, this is what I needed.