Why TransactionalSource needs isolation.level to be read_committed?

Hi there,

I was wondering why the TransactionalSource is forcing isolation.level to ‘read_committed’:

  /**
   * We set the isolation.level config to read_committed to make sure that any consumed messages are from
   * committed transactions. Note that the consuming partitions may be produced by multiple producers, and these
   * producers may either use transactional messaging or not at all. So the fetching partitions may have both
   * transactional and non-transactional messages, and by setting isolation.level config to read_committed consumers
   * will still consume non-transactional messages.
   */
  private val txConsumerSettings = consumerSettings.withProperty(
    ConsumerConfig.ISOLATION_LEVEL_CONFIG,
    IsolationLevel.READ_COMMITTED.toString.toLowerCase(Locale.ENGLISH)
  )

I understand why it is safe and a good practice to use transactions in all the steps of a streaming flow,
but I don’t understand why the isolation.level is forcibly set to ‘read_committed’ instead of just as a default value.
Why shouldn’t I be able to use transactions when I’m consuming non-transactional messages?


I have to admit that the comment is unclear, especially the following part

by setting isolation.level config to read_committed consumers will still consume non-transactional messages

Hi @laymain ,

The reason this consumer property is enforced is to ensure that this transactional workload will not consume any messages that were produced by a previous upstream transaction that was aborted. For example, if the upstream transactional workload produced a message, but has not yet committed it and then it was consumed by this transactional workload then there is no guarantee that the message will be part of a successfully committed or aborted transaction. In other words, using read_committed is required to enforce Kafka exactly-once semantics guarantees across a pipeline of transactional workloads in serial.

This is consistent with Kafka Streams when using the EoS (Exactly-once Semantics) processing.guarantee config.

Additionally, consumers are configured with isolation.level="read_committed" and producers are configured with retries=Integer.MAX_VALUE , enable.idempotence=true , and max.in.flight.requests.per.connection=1 per default.

The Kafka Streams EoS KIP design document provides a more complete explanation.

Stream thread’s embedded consumer should set the isolation.level config to read_committed to make sure that any consumed messages are from committed transactions. Note that the consuming partitions may be produced by multiple producers, and these producers may either use transactional messaging (for example, if they are Streams’ embedded producers) or not at all. So the fetching partitions may have both transactional and non-transactional messages, and by setting isolation.level config to read_committed consumers will still consume non-transactional messages.

I have to admit that the comment is unclear, especially the following part

by setting isolation.level config to read_committed consumers will still consume non-transactional messages

In other words, messages produced by producers that are not part of a transaction can be consumed when read_committed is used because those messages are immutable and there’s no risk of them being “rolled back” as part of an aborted transaction.

Hope that helps!

More details about read_committed from the Exactly-once Semantics KIP design doc: https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit#bookmark=id.nogouvgyxtup

Thanks for you answer.

That’s what I was missing! So, we can say that messages in a topic have 3 potential “status”:

  • uncommitted
  • committed
  • non-transactional

I think I have an issue with my producers that feed my input topic, they’re not using transactions
but I can’t consume the produced messages using a TransactionalSource (no problem using
a CommittableSource).

Is there any way to see the “status” of messages in a topic?