Jasmin Redzepovic
09/21/2023, 11:41 AMacks = all
, <http://delivery.timeout.ms|delivery.timeout.ms> = 5 seconds
and the <http://request.timeout.ms|request.timeout.ms> = 4 seconds
, just to simulate a situation where Kafka doesn't have enough time/resources to process the produce request. Kafka output topic has 1 partition, replication factor 3 and min-insync-replicas 2.
At the beginning, the job is doing fine, but then an exception is thrown:
org.apache.flink.runtime.taskmanager.Task [] - Source: kafka-source -> Flat Map -> Sink: Writer -> Sink: Committer (1/1)#0 switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka
...
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 9 record(s) for test-topic-7:5001 ms has passed since batch creation
...
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink ETL Base (8814299a8dd39fb5c4d6396c742ef6d8) switched from state RUNNING to RESTARTING.
...
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
...
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink ETL Base (8814299a8dd39fb5c4d6396c742ef6d8) switched from state RESTARTING to RUNNING.
...
org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: kafka-source -> Flat Map -> Sink: Writer -> Sink: Committer (1/1) switched from INITIALIZING to RUNNING.
...
The job continues to run smoothly for a short period of time, but then encounters the same exceptions as mentioned earlier. This pattern repeats itself.
We observe the same set of exceptions when running the job on a cluster with the <http://delivery.timeout.ms|delivery.timeout.ms>
parameter set to 120 seconds and the <http://request.timeout.ms|request.timeout.ms>
parameter set to 30 seconds, which are the default values. Checkpoint interval is set to 30 seconds.
Can someone please share advice on what to do in this situation? I believe this is just a misconfiguration, and I have no idea why this is happening. The job is configured to work in at-least-once delivery mode, so I am curious why checkpoints are successful and offsets are committed to Kafka even in the case of delivery exceptions.
Thanks in advance π