Hi all, I have recently begun using the exactly-o...
# random
h
Hi all, I have recently begun using the exactly-once delivery guarantee feature with Kafka sink. I've observed that the sink generates a new
transactional.id
for every checkpoint, and I'm finding it challenging to comprehend its purpose. Why isn't there a single transaction (per task) for writing new data and multiple transactions for finalizing old transactions (to support multiple concurrent checkpoints)? My concern arises from the fact that I have a producer writing to 100 topics, each with an average of 20 partitions. In this setup, a single transaction record in Kafka's Transactional Coordinator amounts to approximately 1MB in size. With 32 tasks and a checkpoint occurring every 2 minutes, this leads to nearly 1 GB of required RAM per hour on a broker. Kafka typically retains transactions for 7 days by default, implying a need for approximately 160 GB just for transaction mapping. After understanding how transactions work internally in Kafka and how the Kafka sink approaches implementing the exactly-once delivery feature, I still cannot figure out the reasoning behind using so many transactions.
m
Kafka typically retains transactions for 7 days by default
That's not the case, the default is 15 mins https://docs.confluent.io/platform/current/installation/configuration/broker-configs.html#transaction-max-timeout-ms
1
I would recommend reading https://cwiki.apache.org/confluence/display/KAFKA/KIP-939%3A+Support+Participation+in+2PC and https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710 to understand how this currently works in Kafka and in Flink, these two are proposal to improve both technologies in the future
h
I have timeout set for 5 minutes and records for transactions on brokers are there kept even after the timeout. The expiration on brokers is controlled by
<http://transactional.id.expiration.ms|transactional.id.expiration.ms>
When I decreased the
<http://transactional.id.expiration.ms|transactional.id.expiration.ms>
to a single day, brokers relased almost instantly all the memory.
m
I think https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/ is still the best overview of how e2e exactly once processing works in Flink, even though there's now a new Sink API in Flink that Kafka uses
h
I understand how two-phase commits work, but I'm specifically curious about their implementation in the Kafka sink. I'd like to know the underlying rationale behind using multiple `transaction.id`s so that when I eventually work on implementing app using Kafka transactions myself in the future, I'll have a better understanding of potential issues that might arise.