ACK the commitablereadresult using both source and destination are RabbitMQ Queue

I need to implement rabbitmq + alpakka combination.

I have 2 queues in rabbitmq (sourceQ and destinationQ).Cosume from sourceQ and tranform the message and put out it back the new transformed message to destinationQ.
In this flow after successful posting to destinationQ, i have to acknowledge the message, so that it will be removed from sourceQ.

Below is the Flow:

private val amqpSource = AmqpSource.committableSource(
NamedQueueSourceSettings(connectionProvider, “sourceQ”)
.withDeclaration(queueDeclaration)
.withAckRequired(false),
bufferSize = 10
)

private val amqpFlow = AmqpFlow.withConfirm(AmqpWriteSettings(connectionProvider)
.withRoutingKey(destinationQ)
.withDeclaration(destinationQ)
.withBufferSize(10)
.withConfirmationTimeout(200.millis))

final val result = amqpSource
.map(flow1) // message processing cutsom logic
.map(flow2) // message processing cutsom logic
.via(amqpFlow)
.runWith(Sink.seq)

I need to ACK the message after the amqpFlow. Now please guide me, how i will get the original message for ACK after amqpFlow.

Thanks for the question, @iyappansg.

Here’s a short sketch:

Hi Johannes,

Thanks for the guidance. But still i am not able to make it work due to my early days in stream programming. Can you please help me in complete the below flow.

  1. val src: SourceWithContext[CommittableReadResult, CommittableReadResult, NotUsed] = SourceWithContext(amqpSource)

  2. def processData(committableReadResult: CommittableReadResult): String = {
    committableReadResult.message.bytes.utf8String +“New Data”
    }

def processDataAsync(str: String): WriteMessage = {
WriteMessage(bytes = ByteString(str.toString),
immediate = false,
mandatory = false)
}

  1. src
    .map(crr => {processData(crr)})
    .map(newstr=>{processDataAsync(newstr)})
    // what comes here - hopefully the amqpflow. But its not compiling
    // Then how to get the Readresult and ACK