Clarifications about read-side persistence w.r.t. loadOffset

The doc on defining a read side processor, uses a hypothetical MyDatabase trait to explain important guidelines:

trait MyDatabase {
  ...
 /**
  * Load the offset of the last event processed.
  */
 def loadOffset(tag: AggregateEventTag[BlogEvent]): Future[Offset]
...
}

My question is about the guidelines for the loadOffset method:

The loadOffset method reads the last Offset that was processed by this read side processor for the particular tag. Typically this will be stored in a table that has the tag name and the eventProcessorId as a compound primary key. Offsets come in two varieties, a akka.persistence.query.Sequence offset represented using a long , and a akka.persistence.query.TimeBasedUUID offset represented using a UUID . Your database will need to be able to persist both of these types. If there is no offset stored for a particular tag, such as when the processor runs for the very first time, then you can return akka.persistence.query.NoOffset .

Where does the eventProcessorId come from?

It would be very helpful to have an non-trivial example of this MyDatabase.
I found in the source a trivial implementation here:

object MyDatabase extends MyDatabase {
 def createTables(): Future[Done]                                  = Future.successful(Done)
 def loadOffset(tag: AggregateEventTag[BlogEvent]): Future[Offset] = Future.successful(NoOffset)
 def handleEvent(event: BlogEvent, offset: Offset): Future[Done]   = Future.successful(Done)
}

Unfortunately, this trivial implementation doesn’t shed any light on where the eventProcessorId comes from.

If we’re sharding events, then we will have multiple instances of the same read-side processor. I makes sense for each to store their own offset but it is unclear where the eventProcessorId comes from in the sharded case.

  • Nicolas.

If you’re using CassandraPersistenceComponents you have an instance of an offset store available in your LagomApplication instance. It’s used by the Kafka event publishers if you use those. You can hook into it by getting a handle on the OffsetStore’s OffsetDao instance.

Here’s a typed abstract base class for making an EntityEventProcessor. All you need to do is extend it and override the readSideName so that it’s distinct in the Cassandra offsets table. Implement the handleEvent method to process your event stream. Optionally add specific error handling by overriding handleError.


import akka.persistence.query.{NoOffset, Offset}
import akka.stream.scaladsl.Flow
import akka.{Done, NotUsed}
import com.lightbend.lagom.scaladsl.persistence.ReadSideProcessor.ReadSideHandler
import com.lightbend.lagom.scaladsl.persistence.{AggregateEvent, AggregateEventTag, EventStreamElement, ReadSideProcessor}
import com.lightbend.lagom.spi.persistence.{OffsetDao, OffsetStore}
import org.slf4j.Logger

import scala.concurrent.{ExecutionContext, Future}

/** Base class for creating a persistent entity event journal processor.
 *
 * @param offsetStore OffsetStore
 * @param executionContext ExecutionContext - Scala execution context
 * @tparam T Type - The type of the PersistentEntity journal event.  Must extend [[AggregateEvent]]
 */
abstract class EntityEventProcessor[T <: AggregateEvent[T]](offsetStore: OffsetStore)(implicit executionContext: ExecutionContext) extends ReadSideProcessor[T] {

  val logger: Logger

  private var offsetDao: OffsetDao = _
  var tagLabel: String = _

  override def buildHandler(): ReadSideProcessor.ReadSideHandler[T] =
    new ReadSideHandler[T] {

      override def prepare(tag: AggregateEventTag[T]): Future[Offset] = offsetStore.prepare(readSideName, tag.tag).map { dao =>
        offsetDao = dao
        tagLabel = tag.tag

        logger.info(s"Starting entity event processor ${readSideName}[${tagLabel}] at offset ${offsetDao.loadedOffset}")

        offsetDao.loadedOffset
      }.recover {
        case _ =>
          logger.info(s"Starting entity event processor ${readSideName}[${tag.tag}] at NoOffset")
          NoOffset
      }

      override def handle(): Flow[EventStreamElement[T], Done, NotUsed] = Flow[EventStreamElement[T]].mapAsync(1) { eventElement =>

        handleEvent(eventElement.event, eventElement.offset).recover {
          case t: Throwable =>
            handleError(eventElement.event, eventElement.offset, t)
            Done.done()
        }.andThen {
          case _ =>
            offsetDao.saveOffset(eventElement.offset)
        }
      }
    }

  /** Processes an event to produce read-side projection.
   *
   * Exceptions thrown from this method will by default be logged with ERROR level.
   * If particular error handling is required override [[handleError()]].
   *
   * @param event            T - A PersistentEntity journal event
   * @param offset           Offset - The read-side processor offset
   * @param executionContext ExecutionContext - Scala execution context(implicit)
   * @return Future[Done]
   */
  def handleEvent(event: T, offset: Offset)(implicit executionContext: ExecutionContext): Future[Done]

  /** Handler for exceptions thrown from [[handleEvent()]].
   *
   * Default imlementation logs with level ERROR.
   *
   * Override this if specific error handling is required.
   *
   * @param event  T - A PersistentEntity journal event
   * @param offset Offset - The read-side processor offset
   * @param throwable Throwable - The throwable produce dby [[handleEvent()]]
   */
  def handleError(event: T, offset: Offset, throwable: Throwable): Unit = {
    logger.error(s"Exception handling event at offset ${offset}: ${event}", throwable)
  }

}

Hi,

stepping back from any implementation details, what a projection needs in order to restart where it last stopped is projection name and shard identifier. If the source is not sharded, then the shard identifier bit can be ignored once read.

@NicolasRouquette if you are not using the provided implementations (Cassandra and RDBMS) then the mechanism to store the last offset read and later retrieve (when restarting a projection) is unspecified. I agree that the hypothetical trait falls short and the API it presents is incomplete. Take a look, for example, at the JDBC-specific API in the new akka-projection to readOffset. the argument is a ProjectionId which represents the tuple “projection name”, “shard name”.

Cheers,

@PkPwc Thanks for the suggestion; however, in case we have multiple read-side persistence processors, it is unclear how to override readSideName to include a reproducible ID for a given processor.

@ignasi35 The new akka projection is interesting. It is unclear to me whether the use cases I described are in scope of the current capability; that is:

  1. Can the projectors be sharded?

In principle, we should be able to have N projection shards listening to the same event journal.
Each projection shard needs to have its own offset database – that could perhaps be configurable (e.g., https://doc.akka.io/docs/akka-projection/current/slick.html#configuration or https://doc.akka.io/docs/akka-projection/current/cassandra.html#configuration) as long as each of the N shards has its dedicated configuration. With the current Lagom 1.6 framework, it’s currently unclear whether that’s the case or not.

  1. Can a service have multiple read-side “readers” hooked to different read-side projection databases?

In my application, we have 3 services: A, B, C

  • A has ES+CQRS and there is a read-side persistence that updates a persistent database PA.
  • B has ES+CQRS and uses a read-side “reader” that queries PA; additionally, B has a read-side persistence that updates a persistent database PB.
  • C is 100% stateless but it uses 2 read-side “readers” to query PA and PB.

Currently, doing this in Lagom 1.6 is kind of tricky because the documentation and library were optimized for the case of a single read-side persistence “writer” + “reader” in the same service.

In my application, the PA “writer” is in the A service but there are 2 PA “readers”, one in B and the other in C.
In addition, C has 2 different read-side “readers” for PA and PB.

Getting the configurations to make this work is definitely very tricky.
I hope it will be easier w/ Akka projections to have the flexibility of composing projection readers/writers in a much more flexible way than we can currently do w/ Lagom 1.6.

I wouldn’t mind contributing these use cases but I’m not sure where to do that.

  • Nicolas.

Hi @NicolasRouquette,

let me try to answer to all the points (hope I don’t forget any detail):

Can the projectors be sharded?

Absolutely. that is a first level requiremnt in both Lagom and akka-projections. Please refer to the docs and also look at Sharded Daemon Process.

Can a service have multiple read-side “readers” hooked to different read-side projection databases?

Hmm, this is a bit vague a question but my first answer will be no. And the reason is not that it’s not technically possible to achieve what you are asking but “services should not share the database”. I mean, in the case you describe:

In my application, we have 3 services: A, B, C

A has ES+CQRS and there is a read-side persistence that updates a persistent database PA.
B has ES+CQRS and uses a read-side “reader” that queries PA; additionally, B has a read-side persistence that updates a persistent database PB.
C is 100% stateless but it uses 2 read-side “readers” to query PA and PB.

If B is a separate service, then it should not read the journal of PA directly. That will introduce coupling between services A and B. Instead, Service A could publish events into a broker (Kafka/Pulsar/…) with a well-known, stable API which B can then consume.
The case of C is identical. It must not read the database of A nor the database of B.

Getting the configurations to make this work is definitely very tricky.

Hmmm, I never thought of that, but I’m not surprised. You are probably having issue setting that up, especially in service B, because you are trying to make service B read from the journal of service A while having service B write on it’s own journal. Lagom will make that difficult if not impossible on it’s API.
Note that akka-projection is less rstrictive so you could achieve that but I still think it’s not a good idea.

In general, Lagom supported some scenarios where the database with the journal is not the same instance or technology used for the read-side. Akka projections is even more flexible and allows for any combination.

I wouldn’t mind contributing these use cases but I’m not sure where to do that.

I think any discussion around Real Life ™️ use cases is extremely valuable. Here is a good place, but the issue tracker in akka-projection is a great place too.

Cheers,

Thanks @ignasi35 for your detailed and informative answer as usual!

Regarding sharding projections: Thanks for the pointer in the doc. That’s what I was looking for.
I suspect a similar trick applies for Lagom as well; though I seems that this will require some experimentation.

Regarding a service with multiple read-side “readers”: If my description conveyed the idea of a service reading another service’s journal, then I apologize for the confusion.

I’m aware that the event & snapshot journals are private to each service.
However, when we need to communicate information from a service A to a service B,
we’re introducing a coupling of some sort. There are several ways to do this with different pros/cons including:

  1. A calls B’s APIs
  2. B calls A’s APIs
  3. A subscribes to messages published by B
  4. A reads a projected model created by B’s read-side projection
  5. combinations of the above

In my K8S application, we’re using option 4 with Lagom 1.6 and JDBC-based read-side persistence backed by AWS RDS databases.

The challenge here was that the documentation really doesn’t discuss the use case of a service with multiple readers for different projected models produced by other services’s read-side projections.

With JDBC read-side persistence, there is an additional twist in that the SlickProvider initialization (private in Lagom) uses the actor system’s config. I had to create an actor system for each separate reader and a corresponding reader-specific configuration.

  • Nicolas.

I think there’s still I’m not understanding. What does the following mean:

Does it mean:

  1. B projects data into some database, then A reads that database
  2. something else

If it mean 1., then you still have a shared database. It’s not only the journal and the snapshot databases that should be considered private.
If it means 2., could you elaborate?

Cheers,