ReadSideProcessor not inserting record


(hhumayun) #1

I’m having trouble updating the Read Side Cassandra table. When I send a request to my service using curl, everything goes through and I get the expected response from the service. However, I can see in Cassandra that, although the events are being stored, the Read Side tables aren’t being updated.

The method processUserAdded is responsible for binding values and I know that this method is executed because I can see the logger’s output in the console. I have also checked my create database method and associated data types many times and they seem to be fine.

Here is the implementation of my ReadSide. Any help or tips on how to resolve/diagnose the problem would be much appreciated.

Thanks


private[impl] class UserEventProcessor(session: CassandraSession, readSide: CassandraReadSide)(implicit ec: ExecutionContext)
  extends ReadSideProcessor[UserEvent] {



  private val writeUserPromise = Promise[PreparedStatement] // initialized in prepare
  private def writeUser: Future[PreparedStatement] = writeUserPromise.future

  private val updateUserStatus = Promise[PreparedStatement] // initialized in prepare

  private def updateStatus: Future[PreparedStatement] = updateUserStatus.future
  override def aggregateTags: Set[AggregateEventTag[UserEvent]] =
    UserEvent.Tag.allTags

   override def buildHandler = {

    readSide.builder[UserEvent](UserRepository.userEventOffset)
      .setGlobalPrepare(createTables)
      .setPrepare(_ => prepareWriteUser)
      .setPrepare(- => prepareUpdateStatus)// this can be useful but leave for later
      .setEventHandler[InsuranceStatusUpdated](processStatusUpdated)
      .setEventHandler[UserCreated](processUserAdded)
      .build
  }

private def prepareWriteUser(): Future[Done] = {
    val f = session.prepare("INSERT INTO user_main " +
      "(id, addedon, email, fname, lname,  status) " +
      "VALUES (?, ?, ?, ?, ?, ?)")


    writeUserPromise.completeWith(f)
    f.map(_ => Done)
  }

def processStatusUpdated(eventElement: EventStreamElement[InsuranceStatusUpdated]): Future[List[BoundStatement]] = {
    updateStatus.map { ps =>
      val bindUpdateStatus = ps.bind()
      bindUpdateStatus.setString("status", eventElement.event.status.toString)
      bindUpdateStatus.setUUID("id", UUID.fromString(eventElement.entityId))
      List(bindUpdateStatus)
    }

private def processUserAdded(eventElement: EventStreamElement[UserCreated]): Future[List[BoundStatement]] = {
      
      val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss").withZone(ZoneOffset.UTC)

      LoggerFactory.getLogger("UserAdd").info("In process UserAdded")

      
      writeUser.map { ps =>
        val bindWriteUser = ps.bind()
        bindWriteUser.setUUID("id", eventElement.event.user.id)
        bindWriteUser.setString("email", eventElement.event.user.email)
        bindWriteUser.setString("addedon", formatter.format(eventElement.event.user.addedon))
        bindWriteUser.setString("fname", eventElement.event.user.fname)
        bindWriteUser.setString("lname", eventElement.event.user.lname)
        bindWriteUser.setString("status", eventElement.event.user.status.toString)
        List(bindWriteUser)
      }

    }

  private def createTables() = for {
    _ <- session.executeCreateTable("CREATE TABLE IF NOT EXISTS user_main ( " +
      "id UUID,  addedon text , email TEXT,fname TEXT, lname TEXT, status TEXT,  PRIMARY KEY (id))")
    _ <- session.executeCreateTable(
      "CREATE INDEX IF NOT EXISTS user_email_idx ON user_main (email)") // HH note secondary indexes are local so may have to go through many nodes before findding - bettter to use materialized views
    // consider using materialized view instead
  } yield Done
}


(Renato) #2

The most important part is not being shown here. Can you share the implementation of processStatusUpdated and processUsedAdded?


(hhumayun) #3

Implementation for processUserAdded is show if you scroll down just a bit. I’ll add the implementation for the update method in a bit. Thanks


(Tim Moore) #4

This is the problem:

      .setPrepare(_ => prepareWriteUser)
      .setPrepare(- => prepareUpdateStatus)// this can be useful but leave for later

The second setPrepare overwrites the first one. So prepareWriteUser won’t be called, writeUserPromise will never be completed, and processStatusUpdated will return a Future that hangs forever.

If you want multiple prepare callbacks, you’ll need to compose them yourself. Something like this should work:

setPrepare(_ => prepareWriteUser.zip(prepareUpdateStatus).map(_ => Done))

(Tim Moore) #5

There is an open issue discussing improving this, including logging a warning on overwrites: https://github.com/lagom/lagom/issues/1166

Nobody has implemented this yet, but it would make a great pull request if you’re interested in contributing to Lagom!


(hhumayun) #6

That worked!! Thanks Tim! :grinning:

Also, Thanks for the heads up on the PR. That would be very interesting. I’ll take a look.