✅ Persistence Query (Cassandra)–triggering a read with a fake write


(Yawar Amin) #1

Hi, I have a project that is supposed to publish all events written into a Cassandra table (events). Currently, the service that’s supposed to write the events is not up and running yet. So I’m trying to test my publisher by writing fake events into the database and trying to get Akka Persistence Query (Cassandra) to pick them up and publish them.

The journal read looks like this:

  val readJournal: CassandraReadJournal with EventsByTagQuery = PersistenceQuery(system)
    .readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)

    readJournal
      .eventsByTag(settings.publisher.eventTag, startFromOffset)
      .map { envelope: EventEnvelope =>
        publisher ! PublishEvent(envelope)
      }
      .runWith(Sink.ignore)

To my understanding, Akka Persistence Query polls my keyspace.tag_views table and should publish whatever records it finds in there. But, when I manually insert rows into tag_views, they are not showing up in the readJournal.eventsByTag stream, and are not getting published.

Can I get some advice on what conditions I need to fulfill to ‘trick’ the persistence query journal into finding and picking up my event? For reference, the tag_views table looks like this:

cassandra@cqlsh> describe table keyspace.tag_views;

CREATE TABLE keyspace.tag_views (
    tag_name text,
    timebucket bigint,
    timestamp timeuuid,
    persistence_id text,
    tag_pid_sequence_nr bigint,
    event blob,
    event_manifest text,
    meta blob,
    meta_ser_id int,
    meta_ser_manifest text,
    sequence_nr bigint,
    ser_id int,
    ser_manifest text,
    writer_uuid text,
    PRIMARY KEY ((tag_name, timebucket), timestamp, persistence_id, tag_pid_sequence_nr)
) WITH CLUSTERING ORDER BY (timestamp ASC, persistence_id ASC, tag_pid_sequence_nr ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'bucket_high': '1.5', 'bucket_low': '0.5', 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'enabled': 'true', 'max_threshold': '32', 'min_sstable_size': '50', 'min_threshold': '4', 'tombstone_compaction_interval': '86400', 'tombstone_threshold': '0.2', 'unchecked_tombstone_compaction': 'false'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

So far, I’ve just inserted a couple of records into this table, making sure the tag_name field is set to the same value as settings.publisher.eventTag in the code above.


(Patrik Nordwall) #2

I think it will be far easier to create a PersistentActor that persists the tagged events than trying to insert them into the table yourself. You can wrap the events with akka.persistence.Tagged to assign the tags.


(Yawar Amin) #3

Thanks, I ended up doing that. It turned out exactly as you say–it didn’t work until I explicitly tagged the events.