Hi all, it's me again! Based on <the previous post...
# troubleshooting
j
Hi all, it's me again! Based on the previous post, I've been diving deep into the docs, reading almost every blog post I can find, and even got my hands on a Kafka book. I've come up with a scenario that I think has happened. Can someone please confirm (or disprove πŸ˜„) the following steps? 1. KafkaSource operator reads data at very high speed and sends it to KafkaSink operator 2. Lets assume that checkpointing in optimal conditions lasts for 50ms and is triggered every 30s, job is configured with
AT_LEAST_ONCE
delivery guarantee 3. On a checkpoint, KafkaSource will write its state somewhere (but checkpoint is not yet completed, it must wait the state from KafkaSink) 4. On a checkpoint, KafkaSink (producer) will wait for all outstanding records in the Kafka buffers to be acknowledged (similar to flush) 5. If there is a super slow networking in Kafka and
<http://delivery.timeout.ms|delivery.timeout.ms>
is set to 2 mins, checkpoint will block and last up to 2 mins and then it will continue because of Kafka timeout 6. Kafka delivery will timeout and some records will expire, but Kafka buffers will be considered as acknowledged 7. Job manager will take states from both KafkaSource and KafkaSink, and will write them into checkpoint, and consumer offsets will be committed to Kafka afterwards 8. This will result in data loss Flink will assume that we are fine with the data loss since we didn't increase delivery timeout (the maximum time we are willing to wait for messages to be delivered to Kafka) and Flink will continue with the processing and checkpointing. I was hoping that checkpoint would fail in case of a delivery timeout, but looks like it wouldn’t. (but I'm not sure about anything now. πŸ˜…)
m
Synchronization point failures like flushing will fail the checkpoint
πŸ‘ 1
j
Thanks for the answer Martijn πŸ™Œ Then it looks like Flink is not the reason for the data loss. Do you maybe know if it is possible that Kafka somehow drops the messages in case there is a slow networking (e.g. when brokers are doing rebalance of a large data set)? If yes, I will then try to investigate it further. (there is a data loss somewhere, but I have no clue what could it be 🀷)
Hi @Martijn Visser! After spending some more time debugging, I am now pretty sure that there is a bug in the Flink 1.16.1 implementation. This is what I did: I have created a sink topic with 8 partitions, a replication factor of 3, and a minimum in-sync replicas of 2. The consumer properties are set to their default values. For the producer, I made changes to the
<http://delivery.timeout.ms|delivery.timeout.ms>
and
<http://request.timeout.ms|request.timeout.ms>
properties, setting them to 5000ms and 4000ms respectively. (
acks
are set to -1 by default, which is equals to all I guess) KafkaSink is configured with an AT_LEAST_ONCE delivery guarantee. The job parallelism is set to 1 and the checkpointing interval is set to 2000ms. I started a Flink Job and monitored its logs. Additionally, I was consuming the
__consumer_offsets
topic in parallel to track when offsets are committed for my consumer group. The problematic part occurs during checkpoint 5. Its duration was 5009ms, which exceeds the delivery timeout for Kafka (5000ms). Although it was marked as completed, I believe that the output buffer of KafkaSink was not fully acknowledged by Kafka. As a result, Flink proceeded to trigger checkpoint 6 but immediately encountered a Kafka TimeoutException: Expiring N records. I suspect that this exception originated from checkpoint 5 and that checkpoint 5 should not have been considered successful. The job then failed but recovered from checkpoint 5. Some time after checkpoint 7, consumer offsets were committed to Kafka, and this process repeated once more at checkpoint 9. Since the offsets of checkpoint 5 were committed to Kafka, but the output buffer was only partially delivered, there has been data loss. I confirmed this when sinking the topic to the database. Sorry for reaching out to you directly, but I noticed that you are quite active here and influential in the Flink community. I hope that you can help me get in touch with the right people regarding this matter. πŸ™ (p.s. attached job logs and kafka consumer offsets, they are not too long)
m
Additionally, I was consuming the __consumer_offsets topic in parallel to track when offsets are committed for my consumer group.
Flink doesn't rely on consumer offsetting for its fault tolerance... πŸ™‚
j
Yes, but checkpoint 5 completed successfully, but I'm pretty sure it shouldn't have
m
Although it was marked as completed, I believe that the output buffer of KafkaSink was not fully acknowledged by Kafka.
If the output buffer isn't acknowledged by Kafka, that shouldn't succeed the checkpoint. But I do think that the duration of 5009ms (which I think is the duration of the entire checkpoint) is directly correlated with the deliver timeout for Kafka (5000 ms). Because in that 5009ms, there's more activity happening then just delivering towards Kafka. I don't think that the KafkaClient reported an error, so therefore the checkpoint is marked as succesfull
From what I can find on a quick search, people indicate this as a misconfiguration on the Kafka broker side, not in the Flink side. If Flink doesn't get a signal that something is wrong, then it can't fail
j
I just don't understand how checkpoint 6 can fail 1 second after being triggered because of an exception from Kafka saying 5000ms has passed 🀷
Copy code
21:46:10,761 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 6
...
21:46:11,739 WARN  org.apache.flink.runtime.taskmanager.Task
...
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 9 record(s) for dataops.test.jasmin.ticket-base-sb-pl-online.v1-0:5001 ms has passed
forgot to mention that source topic I used for this test had only ~4000 records, and I was running job locally on M2 mac
I did one more test - I run everything with Flink version 1.15.4 There were 0 issues, 0 exceptions, max checkpoint duration was ~100ms and all data was there - 0 missing records. I repeated it few time and got same results So, the Kafka is the same, configurations are the same, only difference is Flink version Running with version 1.16.1 caused again kafka timeout exceptions, long checkpoints and missing records. Do you know if something might have changed since 1.15.4 that is related to checkpointing and Kafka connector?
m
the Kafka is the same,
No, because that uses a different version of the Kafka Client πŸ˜…
j
ahh πŸ˜…
m
I do find it interesting though and I want to get it double checked
But I'll have to sync with some people offline
j
I appreciate your help, thanks a lot
Hi @Martijn Visser, any news on this?
m
No, I've had other priorities to look at unfortunately
πŸ‘ 1
j
Should I maybe put this on Jira, as Slack has limited message retention?
m
That would be a good idea
t
hey @Jasmin Redzepovic thanks for looking into this and filing the ticket.
I am now pretty sure that there is a bug in the Flink 1.16.1 implementation
was this ever tested against 1.16.2 or 1.17.1? I’m asking because at a first glance the reported issue with
checkpoint 5
exceeding
<http://delivery.timeout.ms|delivery.timeout.ms>
(meaning likely the Kafka buffer was not successfully fully flushed) but actually succeeded, should be addressed via https://issues.apache.org/jira/browse/FLINK-31305 already. That fix is merged in 1.16.2 and 1.17.1.
If you have also reproduced this / can reproduce this in those versions, then we should definitely look a bit deeper.
j
Hi @Tzu-Li (Gordon) Tai, thank you for the quick response. These are the Flink versions I've tested the issue against: β€’ 1.15.4 worked today without data loss 🟒 β€’ 1.16.1 experienced data loss today πŸ”΄ β€’ 1.16.2 I couldn't reproduce it today, but I'm pretty sure I did before ~3weeks, today it worked without data loss 🟑 β€’ 1.17.1 experienced data loss today πŸ”΄ If you need any additional input or help from my side, I will gladly help πŸ™‚ edit: ps. we could maybe completely transfer this conversation to jira πŸ˜…
m
So with Flink 1.17.1, you didn't test the Kafka 3.0.0-1.17 version?
j
No, I've tested this one:
Copy code
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka</artifactId>
  <version>1.17.1</version>
</dependency>
(didn't specify 3.0.0- prefix though)
m
Hmm then I'll defer back to @Tzu-Li (Gordon) Tai for his opinion on this
πŸ‘ 1