Schema Evolutions

Hello, we are using Lagom 1.4.15 with Scala 2.12.8 and Cassandra. We’ve made a few changes to our domain model classes, which does not affect Cassandra table structure, but the JSON which is being stored for Events will change. I’ve followed the documentation for Schema Evolution, but my doubts are -

  • When does this schema evolution run?
  • Do we have to run it manually, or is it all automatic (during runtime)?
  • Using this technique, will the new and old events be able to coexist?

Thanks.

Hi @mmwaikar,

message migration happens any time a message is deserialized. That may be:

  1. when a node with the new codebase receives a command or response produced by an old node in the cluster
  2. when reading an event or a snapshot from the database

Case 1. happens during a very short period of time when doing a rolling update. The most common case, then, is case 2..

Events and snapshots are stored immutably on the database so, as the code evolves, new code needs to be able to read content produced by old code. Every data stored in the database includes a manifest metadata field detailing the type and revision (aka evolution). So, if you try to read a BankAccount of revision 4 but the database provides a BankAccount of revision 2, the internals will locate the necessary migrations and upcast the data read to a revision 4 applying your conversions.
Then, your running Persistent Entity will get a BankAccount of revision 4 and not know the actual data on the DB is a different format. Any new event emitted using that version of your code will have the manifest metadata indicating it’s BankAccount version 4. To answer your question: all this conversion happens automatically.

Finally, this whole process will only work as long as you keep a collection of migrations capable of upcasting any existing event to the most recent modelling.

Cheers,

Hi
i tried rename field schema evolution steps mentioned in lagom documentation but i get deserailaze exception while reading old data for the field which i changed to new field of case class
i tried both imperative migration code and transformer based migration but still get exception can u suggest why migration code is not kicking off while deserialization
I saw Akka documentation that there is option to add eventAdapter which intercepts events and then we can transform them do we have such option with Lagom ?

akkaTimestamp=03:36:24.379UTC] Persistence failure when replaying events for persistenceId [DocumentEntity|3333334]. Last known sequence number [0]
com.lightbend.lagom.scaladsl.playjson.JsonSerializationFailed: Failed to de-serialize bytes with manifest [com.abc.document.impl.DocumentCommentsSaved]
errors:
/comments(0)/type: JsonValidationError(List(error.path.missing),WrappedArray())
at com.lightbend.lagom.scaladsl.playjson.PlayJsonSerializer.fromBinary(PlayJsonSerializer.scala:158)
at akka.serialization.Serialization.$anonfun$deserializeByteArray$1(Serialization.scala:218)
at akka.serialization.Serialization.withTransportInformation(Serialization.scala:157)
at akka.serialization.Serialization.deserializeByteArray(Serialization.scala:216)
at akka.serialization.Serialization.$anonfun$deserialize$5(Serialization.scala:206)
at scala.util.Try$.apply(Try.scala:213)
at akka.serialization.Serialization.deserialize(Serialization.scala:198)
at akka.persistence.cassandra.journal.CassandraJournal$EventDeserializer.deserializedEvent$1(CassandraJournal.scala:815)
at akka.persistence.cassandra.journal.CassandraJournal$EventDeserializer.deserializeEvent(CassandraJournal.scala:823)
at akka.persistence.cassandra.query.EventsByPersistenceIdStage$Extractors$.akka$persistence$cassandra$query$EventsByPersistenceIdStage$Extractors$$extractPersistentRepr(EventsByPersistenceIdStage.scala:168)
at akka.persistence.cassandra.query.EventsByPersistenceIdStage$Extractors$$anon$2.extract(EventsByPersistenceIdStage.scala:134)
at akka.persistence.cassandra.query.scaladsl.CassandraReadJournal.$anonfun$eventsByPersistenceId$5(CassandraReadJournal.scala:640)
at akka.stream.impl.fusing.MapAsyncUnordered$$anon$31.onPush(Ops.scala:1418)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:495)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:775)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:762)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:791)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)