RestartFlow supported for FlowWithContext

Hi,

I have couple of flow with context. When some exeception happens i need to restart the flow . So is FlowWithContext supported along with RestartFlow?

FLOW:

val messageProcessFlow: FlowWithContext[CtxIn, CtxIn, messageOut, CtxIn, NotUsed] =
FlowWithContext[CtxIn, CtxIn]
.map(readResult =>
Try(readNotification) match {
case Success(lines) => logger.debug(s"Incoming Message as Notification lines") (lines, readResult) case Failure(f) => throw new InvalidMessageException(readResult, f.getMessage) }) .map(notificationObj => { logger.debug(s"Notification tuplet for processing {notificationObj._1}")
processNotificationPayload(notificationObj._1, notificationObj._2)
})
.mapContext(myCtxIn => {
myCtxIn
})

val amqpSinkAsFlow: FlowWithContext[messageOut, CtxIn, WriteResult, CtxOut, Future[Done]] =
AmqpFlowWithContext.withConfirm(sinkSettings)
.map(abc => abc)
.mapContext(myCtxIn => {
myCtxIn.ack()
logger.debug(s"ACK message ${myCtxIn.message}")
myCtxIn.message.bytes
})

val amqpMessageGraph = amqpSourceContext.via(messageProcessFlow).via(amqpSinkAsFlow).toMat(sinkAs)(Keep.right)

processNotificationPayload(notificationObj._1, notificationObj._2) can throw multiple exception. For some of the exception i need to restart the flow.

How to achive this?

Thanks

Hi @iyappansg,

There is no specific variant of RestartFlow for use with FlowWithContext in current Akka.

You could drop down to the crude tuple representation of FlowWithContext with asFlow() and wrap that in the RestartFlow. Later, you can switch back with asFlowWithContext.

Cheers,
Enno.