Akka Projection Slick High Eventual Consistency Delay

Hello,

I have a question about the delay from when the persistent entity (typed actor) persists an event to when the source provider and shared daemon process projection. I have a simple service that takes a Create command and emits a Created event. after about 6 to 10 seconds the projection will emit its logs for persisting the event to the read side (in this case postgresql).

My source provider is using CassandraReadJournal as I am using WriteSideCassandraPersistenceComponents for entity persistence. My projection is a SlickProject using exactly once.

  def sourceProvider(tag: String, system: ActorSystem[_]): SourceProvider[Offset, EventEnvelope[Post.Event]] =
    EventSourcedProvider
      .eventsByTag[Post.Event](
        system = system,
        readJournalPluginId = CassandraReadJournal.Identifier,
        tag = tag
      )

  def projection(tag: String): ExactlyOnceProjection[Offset, EventEnvelope[Event]] =
    SlickProjection
      .exactlyOnce[Offset, EventEnvelope[Post.Event], PostgresProfile](
        projectionId = ProjectionId("Posts", tag),
        sourceProvider(tag, actorSystem.toTyped),
        dbConfig,
        handler = () => new PostProjectionProcessor(postRepository)
      )


  ShardedDaemonProcess(actorSystem.toTyped)
    .init[ProjectionBehavior.Command](
      name = "posts",
      numberOfInstances = Post.Event.tags.size,
      behaviorFactory = (i: Int) => ProjectionBehavior(projection(Event.tags(i))),
      stopMessage = ProjectionBehavior.Stop
    )

The logs show:

[2021-06-29 16:36:24,487] [INFO] [post] [] [posts-impl-application-akka.actor.default-dispatcher-31] - Entity received Create command. MDC: {persistencePhase=replay-evt, akkaAddress=akka://posts-impl-application@127.0.0.1:46031, akkaSource=akka://posts-impl-application/system/sharding/Post/946/a4b21293-4e43-47e4-b34b-336b118d0f8c, sourceActorSystem=posts-impl-application, persistenceId=Post|a4b21293-4e43-47e4-b34b-336b118d0f8c}

[2021-06-29 16:36:24,502] [INFO] [post] [] [posts-impl-application-akka.actor.default-dispatcher-31] - Entity applying Event. MDC: {persistencePhase=running-cmd, akkaAddress=akka://posts-impl-application@127.0.0.1:46031, persistenceId=Post|a4b21293-4e43-47e4-b34b-336b118d0f8c, akkaSource=akka://posts-impl-application/system/sharding/Post/946/a4b21293-4e43-47e4-b34b-336b118d0f8c, sourceActorSystem=posts-impl-application}

[2021-06-29 16:36:35,407] [INFO] [org.ygoty.posts.impl.readside.PostProjectionProcessor] [] [posts-impl-application-akka.actor.default-dispatcher-27] - Created(Posted(a4b21293-4e43-47e4-b34b-336b118d0f8c,5c6ae850-977f-43e2-aac3-f8ae52ede4bf,testing the body,None,2021-06-29T16:36:23.888025Z,None)) MDC: {}

[2021-06-29 16:36:35,408] [INFO] [org.ygoty.posts.impl.readside.PostProjectionProcessor] [] [posts-impl-application-akka.actor.default-dispatcher-27] - Creating post -> Posted(a4b21293-4e43-47e4-b34b-336b118d0f8c,5c6ae850-977f-43e2-aac3-f8ae52ede4bf,testing the body,None,2021-06-29T16:36:23.888025Z,None) MDC: {}

application.conf files is:

akka {
    loglevel="INFO"
    actor {
        serialization-bindings {
            "org.ygoty.posts.impl.entities.Post$CborSerializer" = jackson-cbor
        }
        enable-additional-serialization-bindings = on
    }
    cluster {
        sharding {
            state-store-mode = ddata
        }
    }
    extensions = [akka.persistence.Persistence]
    # use Cassandra to store both snapshots and the events of the persistent actors
    persistence {
      journal {
        plugin = cassandra-journal
      }

      snapshot-store {
        plugin = cassandra-snapshot-store
      }
    }
    projection.slick {
      profile = "slick.jdbc.PostgresProfile$"
      # add here your Slick db settings
      db {
        url = "jdbc:postgresql://localhost:5432/post"
        driver = "org.postgresql.Driver"
        # connectionPool = disabled
        keepAliveConnection = on
        user = "postgres"
        password = "postgres"
      }
      offset-store {
        # set this to your database schema if applicable, empty by default
        schema = ""
        # the database table name for the offset store
        table = "akka_projection_offset_store"

        # the database table name for the projection manangement data
        management-table = "akka_projection_management"

        # Use lowercase table and column names.
        # This is mostly useful for H2 and Postgres databases. MySQL and SQL Server are case insensitive.
        # Oracle schema is case sensitive and is defined with uppercase, this property is therefore ignore when using Oracle
        use-lowercase-schema = true
      }
    }
}

post-service.cassandra.keyspace = "postservice"
cassandra-journal.keyspace = ${post-service.cassandra.keyspace}
cassandra-snapshot-store.keyspace = ${post-service.cassandra.keyspace}
cassandra-query-journal.eventual-consistency-delay = 200ms
cassandra-query-journal.delayed-event-timeout = 30s
cassandra-query-journal.eventual-consistency-delay = 1s

play {
    application {
        loader = org.ygoty.posts.impl.PostApplicationLoader
    }
    server.pidfile.path=/dev/null
}

I have been trying to find documentation that clarifies how to configure the read side eventual consistency delay. I also could not find the reference.conf values under projections/akka.persistence.

How can I improve the response time of the projection?

Kindly,
Robert

Hi @Reklund3,

The delay is defined by the Cassandra plugin. More precisely, the delay happens when querying the journal through eventsByTag.

Check the reference file in the plugin.

You posted some settings here, but they don’t match the current version. I don’t remember how it was in the previous version. Make sure you look the file for the right version of the plugin. And preferably, upgrade to the latest version if possible.

Also check the first-time-bucket settings as it might have an impact when starting the query for the first time.

Cheers,

Renato

Cool, I believe I had some settings for the for that category. I had noticed in with the log level set to debug that the events-by-tag query was quite verbos.

I will give this a shot and deeper dive.

Thanks,
Robert