[Flink<>Kafka data loss] Hi all! We have e...
# random
j
[Flink<>Kafka data loss] Hi all! We have encountered random data loss three times so far. The first time, we didn't know the reason, but the last two times, we realized it was related to the Kafka cluster being overloaded and having low bandwidth. We were producing messages at much higher rate than Kafka could process them, but we weren't the cause of Kafka overload. The biggest puzzle is that the job is configured to work in at-least-once delivery mode, but somehow messages are getting lost. Checkpointing is working fine and consumer group offset advances. Here are more details of the issue that I was able to reproduce locally. The job is quite simple - it reads events from one Kafka topic, applies a flat map function, and sinks the messages to another Kafka topic. The parallelism is set to 1 and the checkpoint interval is set to 2 seconds. For the producer, I set the
acks = all
,
<http://delivery.timeout.ms|delivery.timeout.ms> = 5 seconds
and the
<http://request.timeout.ms|request.timeout.ms> = 4 seconds
, just to simulate a situation where Kafka doesn't have enough time/resources to process the produce request. Kafka output topic has 1 partition, replication factor 3 and min-insync-replicas 2. At the beginning, the job is doing fine, but then an exception is thrown:
Copy code
org.apache.flink.runtime.taskmanager.Task [] - Source: kafka-source -> Flat Map -> Sink: Writer -> Sink: Committer (1/1)#0 switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka
...
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 9 record(s) for test-topic-7:5001 ms has passed since batch creation
...
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink ETL Base (8814299a8dd39fb5c4d6396c742ef6d8) switched from state RUNNING to RESTARTING.
...
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
...
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink ETL Base (8814299a8dd39fb5c4d6396c742ef6d8) switched from state RESTARTING to RUNNING.
...
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: kafka-source -> Flat Map -> Sink: Writer -> Sink: Committer (1/1) switched from INITIALIZING to RUNNING.
...
The job continues to run smoothly for a short period of time, but then encounters the same exceptions as mentioned earlier. This pattern repeats itself. We observe the same set of exceptions when running the job on a cluster with the
<http://delivery.timeout.ms|delivery.timeout.ms>
parameter set to 120 seconds and the
<http://request.timeout.ms|request.timeout.ms>
parameter set to 30 seconds, which are the default values. Checkpoint interval is set to 30 seconds. Can someone please share advice on what to do in this situation? I believe this is just a misconfiguration, and I have no idea why this is happening. The job is configured to work in at-least-once delivery mode, so I am curious why checkpoints are successful and offsets are committed to Kafka even in the case of delivery exceptions. Thanks in advance πŸ™‚