It would be great to have the Alpakka MongoDB documentation updated with some example on transaction handling. I’m pretty new to Akka-Streams and crafted that so far:
private <T> Source<T, NotUsed> inTransaction(Function1<ClientSession, Source<T, NotUsed>> withSession) {
return MongoSource.create(client.startSession()).flatMapConcat(session -> {
session.startTransaction();
return withSession.apply(session)
.concat(MongoSource.create(session.commitTransaction()).flatMapConcat(__ -> Source.empty()))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<T, NotUsed>>().match(Throwable.class, e ->
MongoSource.create(session.abortTransaction()).flatMapConcat(__ -> Source.<T>empty()).concat(Source.failed(e))
).build());
});
}
And usage like:
inTransaction(session -> {
final Source<Success, NotUsed> src1 = MongoSource.create(collection1.insertOne(session, ...));
final Source<UpdateResult, NotUsed> src2 = MongoSource.create(collection2.updateMany(session, ...));
return src1.zip(src2);
});
But as I do not have much experience with akka-streams - I doubt that that solution is the most efficient.
Any help is much appreciated