Schema evolution and "ser_manifest" in messages table

Lagom: 1.4.15
Scala: 2.12.8
Persistence: Cassandra

Following the documentation for the schema evolution, entities in one of the two keyspaces in the scope of the application have been migrated successfully. Fetching entity from the other keyspace throws below error:

2021-01-11 13:27:59,776 [error] com.lightbend.lagom.internal.scaladsl.persistence.PersistentEntityActor [73] - Persistence failure when replaying events for persistenceId [SomeEntity|SomeEntityId]. Last known sequence number [0]
java.io.InvalidClassException: com.some.package.impl.entity.SomeEntity; local class incompatible: stream classdesc serialVersionUID = 76280542
69364365264, local class serialVersionUID = -799129383219407853

at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1939)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1805)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2096)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1624)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at akka.serialization.JavaSerializer.$anonfun$fromBinary$1(Serializer.scala:336)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:336)
at akka.serialization.Serialization.$anonfun$deserializeByteArray$1(Serialization.scala:235)
at akka.serialization.Serialization.withTransportInformation(Serialization.scala:172)
at akka.serialization.Serialization.deserializeByteArray(Serialization.scala:230)
at akka.serialization.Serialization.$anonfun$deserialize$5(Serialization.scala:220)
at scala.util.Try$.apply(Try.scala:213)
at akka.serialization.Serialization.deserialize(Serialization.scala:212)
at akka.persistence.cassandra.journal.CassandraJournal$EventDeserializer.deserializeEvent(CassandraJournal.scala:531)
at akka.persistence.cassandra.query.EventsByPersistenceIdStage$$anon$1.extractEvent(EventsByPersistenceIdStage.scala:420)
at akka.persistence.cassandra.query.EventsByPersistenceIdStage$$anon$1.tryPushOne(EventsByPersistenceIdStage.scala:364)
at akka.persistence.cassandra.query.EventsByPersistenceIdStage$$anon$1.$anonfun$newResultSetCb$1(EventsByPersistenceIdStage.scala:173)
at akka.persistence.cassandra.query.EventsByPersistenceIdStage$$anon$1.$anonfun$newResultSetCb$1$adapted(EventsByPersistenceIdStage.scala:160)
at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:452)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:476)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:574)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:742)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:757)
at akka.actor.Actor.aroundReceive(Actor.scala:539)
at akka.actor.Actor.aroundReceive$(Actor.scala:537)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:664)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:610)
at akka.actor.ActorCell.invoke(ActorCell.scala:579)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
at akka.dispatch.Mailbox.run(Mailbox.scala:229)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Trying to figure out the reason as to why the process is not even entering the migration code, it was found that “ser_manifest” column in the messages table for the events of SomeEntity is EMPTY while in another messages table for which schema evolution ran successfully, it carries the value as fully qualified class name of the event that row contains. Hence, the deserialization process might be requiring the exact binary form of the class as it was at the time of the event. But the class of course has changed. That’s why migration is being written. So, help is sought to figure out below questions:

  • How is it that the value in the “ser_manifest” column is empty in one of the messages tables in the application and carries the value as the class name of the event in another?
  • Secondly, now when the old messages do not carry any value in the “ser_manifest” column, what is the correct way to evolve schema?

Which schema migration approach are you taking? There are several. This is when the bytes are deserialized and that happens before things like event adapters, if that is what you are using.

For Java serialization there are not many options, other than keeping a class that is compatible.

Using the approach mentioned here, wherein it is required to implement migrations method of com.lightbend.lagom.scaladsl.playjson.JsonSerializerRegistry in application’s SerializationRegistry class (which extends the prior mentioned JsonSerializerRegistry)

This event was serialized with Java serialization, as you can see in the stack trace. Doesn’t the migration approach you refer to require json serialization?

@patriknw That’s the point. I think that is happening because “ser_manifest” column is empty. As per my knowledge it should carry the fully qualified class name for the payload.

Look at the ser_id column to see which serializer it was stored with.

ser_id is an integer column it seems and carries value 1 in the messages table that I’m facing issues with.

@patriknw any further suggestions on this?

Serializer id 1 is the akka.serialization.JavaSerializer and that is not using the manifest, so ser_manifest column is expected to be empty. When using JavaSerializer you must keep the class compatible. That’s one of the reasons why we recommend so strongly against using Java serialization.

You have changed the class and now it can’t deserialize the old bytes to the new class format:

Your only option is to undo the change of SomeEntity, and hopefully you haven’t stored anything with the new class format because then you would not be able to read that later.

Then you can create a new class and use another type of serialization for it. Lagom has support for Jackson.

Thanks @patriknw

Hi
i tried rename field schema evolution steps mentioned in lagom documentation but i get deserailaze exception while reading old data for the field which i changed to new field of case class
i tried both imperative migration code and transformer based migration but still get exception can u suggest why migration code is not kicking off while deserialization

akkaTimestamp=03:36:24.379UTC] Persistence failure when replaying events for persistenceId [DocumentEntity|3333334]. Last known sequence number [0]
com.lightbend.lagom.scaladsl.playjson.JsonSerializationFailed: Failed to de-serialize bytes with manifest [com.abc.document.impl.DocumentCommentsSaved]
errors:
/comments(0)/type: JsonValidationError(List(error.path.missing),WrappedArray())
at com.lightbend.lagom.scaladsl.playjson.PlayJsonSerializer.fromBinary(PlayJsonSerializer.scala:158)
at akka.serialization.Serialization.$anonfun$deserializeByteArray$1(Serialization.scala:218)
at akka.serialization.Serialization.withTransportInformation(Serialization.scala:157)
at akka.serialization.Serialization.deserializeByteArray(Serialization.scala:216)
at akka.serialization.Serialization.$anonfun$deserialize$5(Serialization.scala:206)
at scala.util.Try$.apply(Try.scala:213)
at akka.serialization.Serialization.deserialize(Serialization.scala:198)
at akka.persistence.cassandra.journal.CassandraJournal$EventDeserializer.deserializedEvent$1(CassandraJournal.scala:815)
at akka.persistence.cassandra.journal.CassandraJournal$EventDeserializer.deserializeEvent(CassandraJournal.scala:823)
at akka.persistence.cassandra.query.EventsByPersistenceIdStage$Extractors$.akka$persistence$cassandra$query$EventsByPersistenceIdStage$Extractors$$extractPersistentRepr(EventsByPersistenceIdStage.scala:168)
at akka.persistence.cassandra.query.EventsByPersistenceIdStage$Extractors$$anon$2.extract(EventsByPersistenceIdStage.scala:134)
at akka.persistence.cassandra.query.scaladsl.CassandraReadJournal.$anonfun$eventsByPersistenceId$5(CassandraReadJournal.scala:640)
at akka.stream.impl.fusing.MapAsyncUnordered$$anon$31.onPush(Ops.scala:1418)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:541)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:495)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:502)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:600)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:775)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:762)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:791)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:691)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)