Jasmin Redzepovic
09/24/2023, 1:54 PMAT_LEAST_ONCE
delivery guarantee
3. On a checkpoint, KafkaSource will write its state somewhere (but checkpoint is not yet completed, it must wait the state from KafkaSink)
4. On a checkpoint, KafkaSink (producer) will wait for all outstanding records in the Kafka buffers to be acknowledged (similar to flush)
5. If there is a super slow networking in Kafka and <http://delivery.timeout.ms|delivery.timeout.ms>
is set to 2 mins, checkpoint will block and last up to 2 mins and then it will continue because of Kafka timeout
6. Kafka delivery will timeout and some records will expire, but Kafka buffers will be considered as acknowledged
7. Job manager will take states from both KafkaSource and KafkaSink, and will write them into checkpoint, and consumer offsets will be committed to Kafka afterwards
8. This will result in data loss
Flink will assume that we are fine with the data loss since we didn't increase delivery timeout (the maximum time we are willing to wait for messages to be delivered to Kafka) and Flink will continue with the processing and checkpointing.
I was hoping that checkpoint would fail in case of a delivery timeout, but looks like it wouldnβt. (but I'm not sure about anything now. π
)Martijn Visser
09/24/2023, 3:39 PMJasmin Redzepovic
09/24/2023, 9:35 PMJasmin Redzepovic
09/25/2023, 10:02 PM<http://delivery.timeout.ms|delivery.timeout.ms>
and <http://request.timeout.ms|request.timeout.ms>
properties, setting them to 5000ms and 4000ms respectively. (acks
are set to -1 by default, which is equals to all I guess)
KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job parallelism is set to 1 and the checkpointing interval is set to 2000ms.
I started a Flink Job and monitored its logs. Additionally, I was consuming the __consumer_offsets
topic in parallel to track when offsets are committed for my consumer group.
The problematic part occurs during checkpoint 5. Its duration was 5009ms, which exceeds the delivery timeout for Kafka (5000ms). Although it was marked as completed, I believe that the output buffer of KafkaSink was not fully acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 but immediately encountered a Kafka TimeoutException: Expiring N records. I suspect that this exception originated from checkpoint 5 and that checkpoint 5 should not have been considered successful. The job then failed but recovered from checkpoint 5. Some time after checkpoint 7, consumer offsets were committed to Kafka, and this process repeated once more at checkpoint 9.
Since the offsets of checkpoint 5 were committed to Kafka, but the output buffer was only partially delivered, there has been data loss. I confirmed this when sinking the topic to the database.
Sorry for reaching out to you directly, but I noticed that you are quite active here and influential in the Flink community. I hope that you can help me get in touch with the right people regarding this matter. π
(p.s. attached job logs and kafka consumer offsets, they are not too long)Martijn Visser
09/25/2023, 10:18 PMAdditionally, I was consuming the __consumer_offsets topic in parallel to track when offsets are committed for my consumer group.Flink doesn't rely on consumer offsetting for its fault tolerance... π
Jasmin Redzepovic
09/25/2023, 10:21 PMMartijn Visser
09/25/2023, 10:35 PMAlthough it was marked as completed, I believe that the output buffer of KafkaSink was not fully acknowledged by Kafka.If the output buffer isn't acknowledged by Kafka, that shouldn't succeed the checkpoint. But I do think that the duration of 5009ms (which I think is the duration of the entire checkpoint) is directly correlated with the deliver timeout for Kafka (5000 ms). Because in that 5009ms, there's more activity happening then just delivering towards Kafka. I don't think that the KafkaClient reported an error, so therefore the checkpoint is marked as succesfull
Martijn Visser
09/25/2023, 10:37 PMJasmin Redzepovic
09/25/2023, 10:54 PM21:46:10,761 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 6
...
21:46:11,739 WARN org.apache.flink.runtime.taskmanager.Task
...
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 9 record(s) for dataops.test.jasmin.ticket-base-sb-pl-online.v1-0:5001 ms has passed
Jasmin Redzepovic
09/25/2023, 11:09 PMJasmin Redzepovic
09/26/2023, 12:08 AMMartijn Visser
09/26/2023, 12:08 AMthe Kafka is the same,No, because that uses a different version of the Kafka Client π
Jasmin Redzepovic
09/26/2023, 12:09 AMMartijn Visser
09/26/2023, 12:09 AMMartijn Visser
09/26/2023, 12:09 AMJasmin Redzepovic
09/26/2023, 12:09 AMJasmin Redzepovic
10/12/2023, 10:33 AMMartijn Visser
10/12/2023, 11:43 AMJasmin Redzepovic
10/13/2023, 11:11 AMMartijn Visser
10/13/2023, 12:11 PMTzu-Li (Gordon) Tai
10/17/2023, 7:23 PMI am now pretty sure that there is a bug in the Flink 1.16.1 implementationwas this ever tested against 1.16.2 or 1.17.1? Iβm asking because at a first glance the reported issue with
checkpoint 5
exceeding <http://delivery.timeout.ms|delivery.timeout.ms>
(meaning likely the Kafka buffer was not successfully fully flushed) but actually succeeded, should be addressed via https://issues.apache.org/jira/browse/FLINK-31305 already. That fix is merged in 1.16.2 and 1.17.1.Tzu-Li (Gordon) Tai
10/17/2023, 7:23 PMTzu-Li (Gordon) Tai
10/17/2023, 7:23 PMJasmin Redzepovic
10/18/2023, 11:19 AMMartijn Visser
10/18/2023, 1:39 PMJasmin Redzepovic
10/18/2023, 1:41 PM<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
(didn't specify 3.0.0- prefix though)Martijn Visser
10/18/2023, 1:41 PM