ReadsideProcessor Cassandra failes with AskTimeoutException


#1

HI, I have started in prod a lagom system. All works fine. I created about 100 Items but the cassandra event processor stored only 10. So I created a new EventProcoessor. Other offset, other table. I thought all created items would rerun and the database would grow. But I get exception!

One very unclear exception:

[error] akka.actor.OneForOneStrategy [sourceThread=application-akka.actor.default-dispatcher-15, akkaTimestamp=14:13:23.316UTC, akkaSource=akka://application/system/sharding/MainStorageItemEventProcessor/de.****.storageManagement.impl.storageItem.event.ItemEvent1/de.****.storageManagement.impl.storageItem.event.ItemEvent1, sourceActorSystem=application] - Ask timed out on [Actor[akka://application/user/readSideGlobalPrepare-MainStorageItemEventProcessor-singletonProxy#591393528]] after [40000 ms]. Message of type [com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTaskActor$Execute$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://application/user/readSideGlobalPrepare-MainStorageItemEventProcessor-singletonProxy#591393528]] after [40000 ms]. Message of type [com.lightbend.lagom.internal.persistence.cluster.ClusterStartupTaskActor$Execute$]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
	at akka.pattern.PromiseActorRef$.$anonfun$defaultOnTimeout$1(AskSupport.scala:635)
	at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:650)
	at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:205)
	at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:866)
	at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
	at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
	at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:864)
	at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:328)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.executeBucket$1(LightArrayRevolverScheduler.scala:279)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.nextTick(LightArrayRevolverScheduler.scala:283)
	at akka.actor.LightArrayRevolverScheduler$$anon$3.run(LightArrayRevolverScheduler.scala:235)
	at java.lang.Thread.run(Thread.java:748)

I do not understand this error. I used the online-auction as template for this.

readSide.register[ItemEvent](new MainStorageItemEventProcessor(cassandraSession,cassandraReadSide))

this is called in the serviceImpl!

Here now my readprocessor

class MainStorageItemEventProcessor(session: CassandraSession,
                                    readSide: CassandraReadSide)(implicit ec: ExecutionContext) extends ReadSideProcessor[ItemEvent] {

  val logger = Logger(this.getClass)

  private val insertItemPromise = Promise[PreparedStatement]
  private def insertItem: Future[PreparedStatement] = insertItemPromise.future

  private val itemEmbeddedPromise = Promise[PreparedStatement]
  private def itemEmbedded = itemEmbeddedPromise.future

  private val externalReferenceSetPromise = Promise[PreparedStatement]
  private def externalReferenceSet = externalReferenceSetPromise.future

  private val itemCommittedPromise = Promise[PreparedStatement]
  private def itemCommitted = itemCommittedPromise.future

  private val newTransferTargetSetPromise = Promise[PreparedStatement]
  private def newTransferTargetSet = newTransferTargetSetPromise.future

  private val disposabilityStatusChangedPromise = Promise[PreparedStatement]
  private def disposabilityStatusChanged = disposabilityStatusChangedPromise.future

  private val transitStatusChangedPromise = Promise[PreparedStatement]
  private def transitStatusChanged = transitStatusChangedPromise.future

  private val storageStatusChangedPromise = Promise[PreparedStatement]
  private def storageStatusChanged = storageStatusChangedPromise.future

  private def processInsertItem(event: ItemCreated, entityId: String) = insertItem.map{ps β‡’
    val bind = ps.bind()
    bind.setString(MainStorageItemEventProcessor.entityId,entityId)
    bind.setString(MainStorageItemEventProcessor.externalReference,event.externalReference.orNull)
    bind.setString(MainStorageItemEventProcessor.manufactor,event.manufactor.orNull)
    bind.setString(MainStorageItemEventProcessor.incomingOrderId,event.orderId.orNull)
    bind.setString(MainStorageItemEventProcessor.currentPosition,event.currentPosition)
    bind.setInt(MainStorageItemEventProcessor.storageYear, LocalDateTime.ofInstant(event.storageDate,ZoneOffset.UTC).getYear )
    bind.setInt(MainStorageItemEventProcessor.storageMonthOfYear, LocalDateTime.ofInstant(event.storageDate,ZoneOffset.UTC).getMonthValue )
    bind.setToNull(MainStorageItemEventProcessor.storageStatus)
    bind.setToNull(MainStorageItemEventProcessor.transitStatus)
    bind.setToNull(MainStorageItemEventProcessor.disposabilityStatus)
    bind.setToNull(MainStorageItemEventProcessor.holder)
    bind.setToNull(MainStorageItemEventProcessor.weight)
    bind.setToNull(MainStorageItemEventProcessor.length)
    bind.setToNull(MainStorageItemEventProcessor.height)
    bind.setToNull(MainStorageItemEventProcessor.width)
    bind.setToNull(MainStorageItemEventProcessor.newTargetKind)
    bind.setToNull(MainStorageItemEventProcessor.newTarget)
    List(bind)
  }

  private def processStorageStatusChanged(x: StorageStatusChanged, entityId: String) = storageStatusChanged.map{ps β‡’
    val bind = ps.bind()
    bind.setString(MainStorageItemEventProcessor.storageStatus, x.newStatus.toString)
    bind.setString(MainStorageItemEventProcessor.entityId, entityId)
    List(bind)
  }

  private def processDisposabilityStatusChanged(x: DisposabilityStatusChanged, entityId: String) = disposabilityStatusChanged.map{ps β‡’
    val bind = ps.bind()
    bind.setString(MainStorageItemEventProcessor.disposabilityStatus, x.newStatus.toString)
    bind.setString(MainStorageItemEventProcessor.entityId, entityId)
    List(bind)
  }

  private def processTransitStatusChanged(x: TransitStatusChanged, entityId: String) = transitStatusChanged.map{ps β‡’
    val bind = ps.bind()
    bind.setString(MainStorageItemEventProcessor.transitStatus, x.newStatus.toString)
    bind.setString(MainStorageItemEventProcessor.entityId, entityId)
    List(bind)
  }

  private def processItemCommitted(x: ItemCommitted, entityId: String) = itemCommitted.map{ps β‡’
    val bind = ps.bind()
    bind.setString(MainStorageItemEventProcessor.holder, x.receivingOperator.id)
    bind.setString(MainStorageItemEventProcessor.entityId, entityId)
    List(bind)
  }

  private def processItemEmbedded(x: ItemEmbedded, entityId: String) = itemEmbedded.map{ps β‡’
    val bind = ps.bind()
    bind.setToNull(MainStorageItemEventProcessor.holder)
    bind.setToNull(MainStorageItemEventProcessor.newTarget)
    bind.setToNull(MainStorageItemEventProcessor.newTargetKind)
    bind.setString(MainStorageItemEventProcessor.currentPosition,x.storagePosition)
    bind.setString(MainStorageItemEventProcessor.entityId,entityId)
    List(bind)
  }

  private def processNewTransferTargetSet(x: NewTransferTargetSet, entityId: String) = newTransferTargetSet.map{ps β‡’
    val bind = ps.bind()
    bind.setString(MainStorageItemEventProcessor.newTarget,x.target.operator.map(_.id).getOrElse(x.target.operator.get.id))
    bind.setString(MainStorageItemEventProcessor.newTargetKind,x.targetKind.toString)
    bind.setString(MainStorageItemEventProcessor.entityId,entityId)
    List(bind)
  }

  private def processExternalReferenceSet(x: ExternalReferenceSet, entityId: String) = externalReferenceSet.map{ps β‡’
    val bind = ps.bind()
    bind.setString(MainStorageItemEventProcessor.externalReference,x.externalReference)
    bind.setString(MainStorageItemEventProcessor.entityId, entityId)
    List(bind)
  }

private def prepareStatements(): Future[Done] = {
    val insertItemFuture = session.prepare(
      s"""
         |INSERT INTO ${MainStorageItemEventProcessor.TABLE_NAME}
         |(${MainStorageItemEventProcessor.entityId},${MainStorageItemEventProcessor.externalReference},${MainStorageItemEventProcessor.manufactor},${MainStorageItemEventProcessor.incomingOrderId},${MainStorageItemEventProcessor.currentPosition},${MainStorageItemEventProcessor.storageYear},${MainStorageItemEventProcessor.storageMonthOfYear},${MainStorageItemEventProcessor.storageStatus},${MainStorageItemEventProcessor.transitStatus},${MainStorageItemEventProcessor.disposabilityStatus},${MainStorageItemEventProcessor.holder},${MainStorageItemEventProcessor.weight},${MainStorageItemEventProcessor.length},${MainStorageItemEventProcessor.height},${MainStorageItemEventProcessor.width},${MainStorageItemEventProcessor.newTarget},${MainStorageItemEventProcessor.newTargetKind})
         |VALUES
         |(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
       """.stripMargin
    )
    insertItemPromise.completeWith(insertItemFuture)

    val itemEmbeddedFuture = session.prepare(
      s"""
         |UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET
         |${MainStorageItemEventProcessor.holder} = ?,
         |${MainStorageItemEventProcessor.currentPosition} = ?,
         |${MainStorageItemEventProcessor.newTarget} = ?,
         |${MainStorageItemEventProcessor.newTargetKind} = ?
         |WHERE
         |${MainStorageItemEventProcessor.entityId} = ?
       """.stripMargin
    )
    itemEmbeddedPromise.completeWith(itemEmbeddedFuture)

    val externalReferenceSetFuture = session.prepare(
      s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.externalReference} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
    )
    externalReferenceSetPromise.completeWith(externalReferenceSetFuture)

    val itemCommittedFuture = session.prepare(
      s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.holder} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
    )
    itemCommittedPromise.completeWith(itemCommittedFuture)

    val newTransferTargetSetFuture = session.prepare(
      s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.newTarget} = ?, ${MainStorageItemEventProcessor.newTargetKind} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
    )
    newTransferTargetSetPromise.completeWith(newTransferTargetSetFuture)

    val disposabilityStatusChangedFuture = session.prepare(
      s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.disposabilityStatus} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
    )
    disposabilityStatusChangedPromise.completeWith(disposabilityStatusChangedFuture)

    val transitStatusChangedFuture = session.prepare(
      s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.transitStatus} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
    )
    transitStatusChangedPromise.completeWith(transitStatusChangedFuture)

    val storageStatusChangedFuture = session.prepare(
      s"UPDATE ${MainStorageItemEventProcessor.TABLE_NAME} SET ${MainStorageItemEventProcessor.storageStatus} = ? WHERE ${MainStorageItemEventProcessor.entityId} = ?"
    )
    storageStatusChangedPromise.completeWith(storageStatusChangedFuture)

    (for{
      _ ← insertItemFuture
      _ ← itemEmbeddedFuture
      _ ← externalReferenceSetFuture
      _ ← itemCommittedFuture
      _ ← newTransferTargetSetFuture
      _ ← disposabilityStatusChangedFuture
      _ ← transitStatusChangedFuture
      _ ← storageStatusChangedFuture
    } yield Done).recover{
      case t β‡’ throw new RuntimeException(s"HILFE: $t",t)
    }

  }

private def createTable(): Future[Done] = {
    for{
      _ ← session.executeCreateTable(
        s"""
           |CREATE TABLE IF NOT EXISTS ${MainStorageItemEventProcessor.TABLE_NAME} (
           |${MainStorageItemEventProcessor.entityId} TEXT PRIMARY KEY,
           |${MainStorageItemEventProcessor.manufactor} TEXT,
           |${MainStorageItemEventProcessor.externalReference} TEXT,
           |${MainStorageItemEventProcessor.incomingOrderId} TEXT,
           |${MainStorageItemEventProcessor.storageStatus} TEXT,
           |${MainStorageItemEventProcessor.transitStatus} TEXT,
           |${MainStorageItemEventProcessor.disposabilityStatus} TEXT,
           |${MainStorageItemEventProcessor.currentPosition} TEXT,
           |${MainStorageItemEventProcessor.holder} TEXT,
           |${MainStorageItemEventProcessor.weight} INT,
           |${MainStorageItemEventProcessor.length} INT,
           |${MainStorageItemEventProcessor.height} INT,
           |${MainStorageItemEventProcessor.width} INT,
           |${MainStorageItemEventProcessor.newTarget} TEXT,
           |${MainStorageItemEventProcessor.newTargetKind} TEXT,
           |${MainStorageItemEventProcessor.storageYear} INT,
           |${MainStorageItemEventProcessor.storageMonthOfYear} INT
           |)
         """.stripMargin)
      _ ← {
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.manufactor})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.externalReference})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.incomingOrderId})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.storageStatus})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.transitStatus})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.disposabilityStatus})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.currentPosition})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.holder})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.weight})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.length})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.height})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.width})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.newTarget})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.newTargetKind})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.storageYear})")
        session.executeCreateTable(s"CREATE INDEX IF NOT EXISTS ON ${MainStorageItemEventProcessor.TABLE_NAME} (${MainStorageItemEventProcessor.storageMonthOfYear})")
      }
    } yield Done
  }

override def buildHandler(): ReadSideProcessor.ReadSideHandler[ItemEvent] = {
    readSide.builder[ItemEvent](MainStorageItemEventProcessor.OFFSET_TABLE)
      .setGlobalPrepare(() β‡’ createTable())
      .setPrepare(_ β‡’ prepareStatements())
      .setEventHandler[ItemCreated](a β‡’ processInsertItem(a.event,a.entityId))
      .setEventHandler[StorageStatusChanged](a β‡’ processStorageStatusChanged(a.event,a.entityId))
      .setEventHandler[DisposabilityStatusChanged](a β‡’ processDisposabilityStatusChanged(a.event,a.entityId) )
      .setEventHandler[TransitStatusChanged](a β‡’ processTransitStatusChanged(a.event,a.entityId))
      .setEventHandler[ItemCommitted](a β‡’ processItemCommitted(a.event,a.entityId))
      .setEventHandler[ItemEmbedded](a β‡’ processItemEmbedded(a.event,a.entityId) )
      .setEventHandler[NewTransferTargetSet](a β‡’ processNewTransferTargetSet(a.event,a.entityId))
      .setEventHandler[ExternalReferenceSet](a β‡’ processExternalReferenceSet(a.event,a.entityId))
      .build()
  }

  override def aggregateTags: Set[AggregateEventTag[ItemEvent]] = ItemEvent.Tag.allTags
}

Is their someone who can help me?


(Sergey Morgunov) #2

I’m not sure, but maybe this problem correlate with this issue-1715.


#3

Sounds like that. Is their a bypass possible?


(Sergey Morgunov) #4

We do not use Cassandra ReadSide, but may be @kotdv somethink suggest :wink:


(Alan Klikic) #5

Hi,

readSideGlobalPrepare is a cluster singleton actor that is used for performing setGlobalPrepare action. If you get ask timeout I would say that createTable executes longer then ask timeout.
If you are starting your service for the first time, cassandra tables need to be created and replicated throughout the cassandra cluster, this can be an expected behaviour.
Are you starting your service for the first time?
If yes, then solution is to remove table creation from createTable and perform it directly/manually on cassandra.

Br,
Alan


#6

Hi Alan,
yes this was the first time. I changed the table to a complete new name. After I detect the timeout I setup 40s. On Cassandra the tables are created. I have to say, we are using only one instance for cassandra at this moment.


(Alan Klikic) #7

Do you still have this problem after tables are created and you have restarted your service?

Br,
Alan


#8

Yes this works!


#9

Alan I have a problem with topic. Is that the same problem?

storageService.createdItemTopic.subscribe.atLeastOnce(
    Flow[StorageItemTopic].map(r β‡’ r.itemId).collect{
      case x: String β‡’ println(x)
        x
    }.mapAsync(1){t β‡’
      println(t + " FUT")
      Future.successful(Done)
    }
  ).recover{
    case r β‡’ println(r)
      System.exit(-1)
  }

And my error is:

[WARN] [03/01/2019 10:50:29.878] [gui-akka.kafka.default-dispatcher-7] [akka://gui/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 10000 milliseconds

(Alan Klikic) #10

Hi,

This is well know issue with different causes and unrelated to this topic. Alpakka Kafka is undeline implementation of Lagoma Broker API for Kafka.
Check this:
Lagom Online Auction issue 4
Alpakka Kafka Issue 235
Alpakka Kafka Issue 302

Br,
Alan


#11

Hi,
thanks. I will look what can I do. But this happens not in dev mode but in prod mode. The kafka seems to be okay. I will start a other topic or add my information to one of these existing topics.
Thanks for your help.

AndrΓ©