Alpakka - AMQP: how to transport CommitableReadResult through the stream (also in case of error handling)

Hello everyone

I am having trouble how transport the CommitableReadResult through the stream, so I can ack and nack the message accordingly. In my case I want to write RabbitMQ events to Elasticsearch.

I’ve managed to lay out the happy path, including ack

private val source = RabbitMq.prepareAmqpSource // little helper which creates a Source with CommitableReadResult
private val sink = Elasticsearch.prepareElasticsearchSink // also a little helper

val convertStage = Flow[CommittableReadResult]
		.map(readResult => RabbitMq.convertFromReadResult(readResult))
		...some more mapping steps
                .map(x=> WriteMessage.createUpsertMessage(x.hashCode(), x)) // this is whats expected of the Elasticsearch-Sink

val rabbitToElasticStage = Flow[CommittableReadResult].alsoTo(convertStage.to(sink))

val done: Future[Done] = source
	.via(rabbitToElasticStage)
	.mapAsync(1) {
		cm => cm.ack()
	}
	.runWith(Sink.ignore)

The alsoTo Method was the only way I could find to keep a reference to the CommitableReadResult, so I can ack it. Is this even the correct way to do something like this?

What I currently cant figure out is the error-handling which can occur when writting to the sink. Using recover only gives me the Throwable, but I just need the CommitableReadResult to call nack. I could not figure out how a supervision-strategy would help me here.

Turns out that my understanding of Akka Streams was not sufficient. I thought that I always need a sink to write some data, but after having a closer look at the Alpakka-Elasticsearch-API I realized that I can model the writes also as a flow which produces a WriteResult which can be used to decide for ack or nack. The last missing piece was then how to transport a value from the source through my stream.

I found that a new type was added recently called FlowWithContext, which is exactly what I needed. But since not all methods are supported I just moved back to always return a tuple in my steps.

Furthermore Alpakka-Elasticsearch allows to create a flow with a passthrough-value, which would be my CommitableReadResult from RabbitMQ.

val source = AmqpSource.committableSource(settings, 50)
val writeFlow = ElasticsearchFlow.createWithPassThrough[ElasticsearchData, CommitableReadResult]("streampoc1", typeName = "_doc", writeSettings)

val convertStage = Flow[CommittableReadResult]
	.map(readResult => (RabbitMq.convertFromReadResult(readResult), readResult))
	....more steps always returning Tuple with readResult
	.map {
		case (eUmsatz: ElasticsearchUmsatz, cm: CommittableReadResult) => WriteMessage.createUpsertMessage(eUmsatz.getKontoId, eUmsatz).withPassThrough(cm)
	}
	
val rabbitToElasticStage = Flow[CommittableReadResult].via(convertStage).via(writeFlow).map { writeResult =>
		if (writeResult.success) {
			writeResult.message.passThrough.ack()
		} else {
			writeResult.message.passThrough.nack(requeue = true)
		}
	}

source.via(rabbitToElasticStage).runWith(Sink.ignore)
1 Like

Hi Marc,

You are right, Sinks are not as important as most people intuitively assume.

Thank you for describing what you learned. We intend to improve the documentation around withContext and when it should be used.

Cheers,
Enno.