Hi,
I have couple of flow with context. When some exeception happens i need to restart the flow . So is FlowWithContext supported along with RestartFlow?
FLOW:
val messageProcessFlow: FlowWithContext[CtxIn, CtxIn, messageOut, CtxIn, NotUsed] =
FlowWithContext[CtxIn, CtxIn]
.map(readResult =>
Try(readNotification) match {
case Success(lines) => logger.debug(s"Incoming Message as Notification lines")
(lines, readResult)
case Failure(f) => throw new InvalidMessageException(readResult, f.getMessage)
})
.map(notificationObj => {
logger.debug(s"Notification tuplet for processing {notificationObj._1}")
processNotificationPayload(notificationObj._1, notificationObj._2)
})
.mapContext(myCtxIn => {
myCtxIn
})
val amqpSinkAsFlow: FlowWithContext[messageOut, CtxIn, WriteResult, CtxOut, Future[Done]] =
AmqpFlowWithContext.withConfirm(sinkSettings)
.map(abc => abc)
.mapContext(myCtxIn => {
myCtxIn.ack()
logger.debug(s"ACK message ${myCtxIn.message}")
myCtxIn.message.bytes
})
val amqpMessageGraph = amqpSourceContext.via(messageProcessFlow).via(amqpSinkAsFlow).toMat(sinkAs)(Keep.right)
processNotificationPayload(notificationObj._1, notificationObj._2) can throw multiple exception. For some of the exception i need to restart the flow.
How to achive this?
Thanks