Using multiple sources with Transactional.sink

Hello,

I’ve noticed that it’s impossible to use Transactional.sink with data coming from different sources (different topics from the same kafka cluster) because there’s a check in TransactionalProducerStage that ensures that "Transaction batch must contain messages from a single source".

I wonder why does this check exist. Because kafka definitely supports transactions spanning multiple topics. I’ve written a simple implementation of multi-topic transactional sink (not really a sink, it’s really a .groupedWithin().foreach()), just as a proof of concept, and it seems to work fine, producing correct results.

What’s the catch here? What’s the reason for this restriction? Is there a way to have a Transactional.sink() that takes messages from different topics/partitions in the same cluster?

1 Like

Hi @Vasily. A Transactional.source can use a topic partition subscription that includes any number of topics and partitions. The restriction you highlight in the TransactionalProducerStage is meant to prevent users from using multiple Akka Streams Transactional.sources, by checking that the same committed marker is used. The use of the CommittedMarker for the constraint is a bit misleading, basically it’s checking that the same upstream KafkaConsumerActor is used for all messages.

You can only have one Transactional.source and one Transactional.sink (or flow) in your graph.

Hi @seglo,

Thank you for your answer.

I’d like to understand why is there this limitation. Is it some fundamental kafka limitation, or is because of the way alpakka TransactionalProducerStage is written, and it should be possible to write a similar stage without this limitation.

What I am ultimately trying to do is to perform a sorted merge based on the timestamp from different topics containing different types of messages coming from different systems. It is impossible to guarantee ordering between those systems, but I know that for each partition the messages are in order, so it’s possible to perform this merge. This merge of course can’t handle all possible cases, but it can fix ordering within a certain configurable period.

Because of this I can’t use a single Transactional.source with multiple topic/partition subscriptions, as I can “advance” in one partition, while some other partition stays at the current message, and I think that a simple Transactional.source can’t do that.

I think I have a solution for that, and it seems to work correctly, but I feel like I am missing something as it doesn’t seem very complex, yet out of the box alpakka doesn’t allow you to do that.

Here’s what my PoC code does:

case class TransactionalMessageWithTimestamp(
  transactionalMessage: ConsumerMessage.TransactionalMessage[ByteString, ByteString],
  timestamp: ZonedDateTime
)

// Each of these sources is created with a call to Transactional.source()
val sources: Seq[Source[TransactionalMessageWithTimestamp, Consumer.Control]] = //....

var producer = new KafkaProducer[ByteString, ByteString](producerProperties.asJava)

val doneFuture =
       // basically Source.combine
        CombinedSource(sources: _*)(MergeSortedWithMaxDelay.strategy(maxDelay))
          .map { msg =>
            val partition = computePartition(/*......*/)
            ProducerMessage.single(
              new ProducerRecord(
                outputTopic,
                partition,
                msg.transactionalMessage.record.key(),
                msg.transactionalMessage.record.value()
              ),
              msg.transactionalMessage.partitionOffset
            )
          }
          .groupedWithin(50, 1.second) // prepare a batch of messages for one transaction
          .map { batch =>
            producer.beginTransaction()
            val offsets = mutable.Map.empty[TopicPartition, Long]
            
            // basically go through every message in a batch
            batch.foreach { message =>
              val (records, passThrough) =
                message match {
                  case ProducerMessage.Message(record, pt) =>
                    (immutable.Seq(record), pt)
                  case ProducerMessage.MultiMessage(recs, pt) =>
                    (recs, pt)
                  case ProducerMessage.PassThroughMessage(pt) =>
                    (immutable.Seq.empty, pt)
                }

              // send every message to the producer
              records.foreach(producer.send)
              val topicPartition = passThrough.key.topicPartition
              val offset = passThrough.offset
              // keep track of the highest encountered offset for each source partition in a batch
              offsets(topicPartition) = math.max(offsets.getOrElse(topicPartition, offset), offset)
            }
            producer.sendOffsetsToTransaction(
              offsets.mapValues(offset => new OffsetAndMetadata(offset + 1)).asJava,
              groupId
            )
            producer.commitTransaction()
          }
          .runWith(Sink.ignore)

The code is crude, it buffers messages in the memory for the whole duration of the transaction, and blocks the flow during whole transaction, but it seems to be working correctly. It seems it wouldn’t be that hard to rewrite it as a proper sink that keeps track for the highest encountered offset for each partition since the beginning of the last transaction.

Do you think that would work?

Is there a simpler way to achieve this?

It’s not a limitation of Kafka. It’s possible to have multiple Kafka consumers provide messages to a single producer which creates and commits a single transaction, as long as messages are all from the same Kafka cluster. The initial implementation doesn’t support this, because it’s been a challenge to guarantee EoS semantics across various use cases. The limitation is imposed by Alpakka Kafka.

If I understand correctly, you want to take a batch of messages across several topics and partitions and merge sort them all using a timestamp field. However, in the quote from above you mention:

I can “advance” in one partition, while some other partition stays at the current message

I don’t understand why this matters if you were doing a merge across partitions anyway.

It sounds like you might benefit from a partitioned transactional source (a transactional source per partition). We have implemented the groundwork for implementing this, but it’s not yet available, and won’t be available in 2.0.0.