Got error loop related to RestartWithBackoffSource - Restarting graph due to failure

Hi everyone, I got warnings during processing an event on read side. Detail of the warn is:

2018-12-10 13:12:52,485 [WARN] a.s.s.RestartWithBackoffSource - Restarting graph due to failure. stack_trace:
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
	at com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:503)
	at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:462)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.$anonfun$run$2(package.scala:25)
	at scala.util.Try$.apply(Try.scala:209)
	at akka.persistence.cassandra.package$ListenableFutureConverter$$anon$2.run(package.scala:25)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
	at com.datastax.driver.core.Responses$Error.asException(Responses.java:148)
	at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
	at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:179)
	at com.datastax.driver.core.RequestHandler.access$2400(RequestHandler.java:49)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:799)
	at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:633)
	at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1075)
	at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:998)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:647)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:582)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:461)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	... 1 more

I inspected log files and saw that globalPrepare retries the creation of the table and after that retries my XYZCreatedEvent again.

In the Lagom Documentation, it says:

If the globalPrepare fails, Lagom will retry, backing off exponentially on subsequent failures, until it succeeds.

Our build handler is something like that:

override def buildHandler(): ReadSideProcessor.ReadSideHandler[XYZEvents] = {
    readSide.builder[XYZEvents]("XYZEventsOffset")
      .setGlobalPrepare(() => createTables())
      .setPrepare(_ => prepareStatements())
      .setEventHandler[XYZCreated](e => processXYZCreated(e.event))
      .setEventHandler[ExampleEvent](e => processExampleEvent(e.event))
      .build
  }

I think our global prepare method can successfully create tables. We have a com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large here. Any ideas ?

Batch too larg indicates that a list of statements that your event handler has created exceeds cassandra limitations.
You will need to split your batch to few smaller batches.
You can do this by using directly CassandraSession in event handler and invoking multiple times executeBatch for small batches. You need to chain all executeBatches and chain it with event handler response that is an empty list.

Hope this helps.