Transaction handling with Alpakka MongoDB

It would be great to have the Alpakka MongoDB documentation updated with some example on transaction handling. I’m pretty new to Akka-Streams and crafted that so far:

private <T> Source<T, NotUsed> inTransaction(Function1<ClientSession, Source<T, NotUsed>> withSession) {
    return MongoSource.create(client.startSession()).flatMapConcat(session -> {
        session.startTransaction();
        return withSession.apply(session)
            .concat(MongoSource.create(session.commitTransaction()).flatMapConcat(__ -> Source.empty()))
            .recoverWithRetries(1, new PFBuilder<Throwable, Source<T, NotUsed>>().match(Throwable.class, e ->
                MongoSource.create(session.abortTransaction()).flatMapConcat(__ -> Source.<T>empty()).concat(Source.failed(e))
            ).build());
    });
}

And usage like:

inTransaction(session -> {
	final Source<Success, NotUsed> src1 = MongoSource.create(collection1.insertOne(session, ...));
	final Source<UpdateResult, NotUsed> src2 = MongoSource.create(collection2.updateMany(session, ...));
	return src1.zip(src2);
});

But as I do not have much experience with akka-streams - I doubt that that solution is the most efficient.
Any help is much appreciated