Cassandra read side AND a jdbc readside event processor.. possible in Lagom 1.4?

scala

(Tom Bishop) #1

Hi,

Was just wondering… is it possible to mix both Cassandra readside and JDBC readside processors together in the same Lagom application using the built-in persistence components?

If so, how would storing offsets work? Would offsets need to be stored in both databases for exactly once processing? Would it be feasible / desirable to manually implement offsetStore that is defined in both the ReadSideJdbcPersistenceComponents and CassandraPersistenceComponents?

Many thanks,
Tom


(Tim Moore) #2

I think it should be possible with some effort. You should use the corresponding offset store for each database. The built-in persistence components already handle binding the correct offset store to each type of read-side builder.

You’ll probably need to explicitly override the definition of the offsetStore value to resolve the ambiguity of inheriting both the JDBC and Cassandra definitions. This value is used by the Kafka topic producer to store its own offsets, and it doesn’t really matter which one you choose.


(Tom Bishop) #3

OK, thanks, Tim, I’ll let you know how I get on.


(Tom Bishop) #4

I made a new trait:

trait CustomReadSideJdbcPersistenceComponents extends ReadSideJdbcPersistenceComponents {
  lazy val myOffsetStore = offsetStore
}

and then tried mixing that in to the application class:

abstract class SurveyApplication(context: LagomApplicationContext)
  extends LagomApplication(context)
    with CustomReadSideJdbcPersistenceComponents
    with ReadSideCassandraPersistenceComponents
    with WriteSideCassandraPersistenceComponents
    with AhcWSComponents
...

and then tried to reassign the offsetStore:

override lazy val offsetStore = myOffsetStore

This compiles but results in a very long stackoverflow error when run:

Exception in thread "ForkJoinPool-1-worker-2" java.lang.StackOverflowError
	at com.explori.survey.impl.SurveyApplication.offsetStore(SurveyApplicationLoader.scala:53)
	at com.explori.survey.impl.CustomReadSideJdbcPersistenceComponents.myOffsetStore(SurveyApplicationLoader.scala:40)
	at com.explori.survey.impl.CustomReadSideJdbcPersistenceComponents.myOffsetStore$(SurveyApplicationLoader.scala:40)
	at com.explori.survey.impl.SurveyApplication.myOffsetStore$lzycompute(SurveyApplicationLoader.scala:43)
	at com.explori.survey.impl.SurveyApplication.myOffsetStore(SurveyApplicationLoader.scala:43)
	at com.explori.survey.impl.SurveyApplication.offsetStore$lzycompute(SurveyApplicationLoader.scala:53)
	at com.explori.survey.impl.SurveyApplication.offsetStore(SurveyApplicationLoader.scala:53)
	at com.explori.survey.impl.CustomReadSideJdbcPersistenceComponents.myOffsetStore(SurveyApplicationLoader.scala:40)
	at com.explori.survey.impl.CustomReadSideJdbcPersistenceComponents.myOffsetStore$(SurveyApplicationLoader.scala:40)
	at com.explori.survey.impl.SurveyApplication.myOffsetStore$lzycompute(SurveyApplicationLoader.scala:43)
...

If I try using a non lazy value in my CustomReadSideJdbcPersistenceComponents trait then I get a null pointer:

java.lang.NullPointerException: null
	at com.lightbend.lagom.internal.broker.kafka.Producer$TaggedOffsetProducerActor$$anonfun$receive$1.applyOrElse(Producer.scala:87)
	at akka.actor.Actor.aroundReceive(Actor.scala:517)
	at akka.actor.Actor.aroundReceive$(Actor.scala:515)
	at com.lightbend.lagom.internal.broker.kafka.Producer$TaggedOffsetProducerActor.aroundReceive(Producer.scala:68)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at akka.actor.ActorCell.invoke(ActorCell.scala:559)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

My knowledge of lazy vals is not great but am guessing there must be a process behind the scenes that iteratively checks for the value to be defined and that is what is causing the stack overflow…

Am I doing something stupid here?


(Tim Moore) #5

A lazy val compiles down to a method that computes the value on the first access. The problem is that offsetStore and myOffsetStore are defined in terms of each other, so each method invokes the other recursively until the stack blows up.

If you’re interested in the low-level details, this article explains it in depth https://blog.codecentric.de/en/2016/02/lazy-vals-scala-look-hood/

Resolving this is a little bit tricky it turns out, because you can’t make super references in the definition of a val (lazy or otherwise).

I tried a few things and couldn’t find a clean way to do this without changing the definition of the component traits in Lagom. Here is an “unclean” way to work around this :smile::

package com.lightbend.lagom.scaladsl.persistence.jdbc {
  trait CustomReadSideJdbcPersistenceComponents extends ReadSideJdbcPersistenceComponents {
    def myOffsetStore = slickOffsetStore
  }
}

Putting the class into a lagom package is necessary because slickOffsetStore is declared private[lagom]. Obviously this trick requires breaking encapsulation and might break in a future version of Lagom, but it should get you past this obstacle.

If you could, please raise a feature request in GitHub to support this in a clean and documented way.


(Tom Bishop) #6

Excellent response, Tim, and, yes, that does make sense regarding the lazy vals. I’ll put in a feature request for this as I think it would be useful. Thanks, Tom


(Tim Moore) #7

For reference, that feature request is here https://github.com/lagom/lagom/issues/1346