Hi team, can I get some help for this? I am upgrad...
# troubleshooting
l
Hi team, can I get some help for this? I am upgrading our flink version from 1.16 to 1.17.1, and we are facing this issue...
Copy code
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: Customer Product Summary Selected' (operator 8a00af3a47acf8f1b77c838e57cd0df6).
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:387) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234) ~[?:?]
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more
Caused by: java.lang.NoSuchMethodError: 'org.apache.kafka.common.KafkaFuture org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames()'
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44) ~[?:?]
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) ~[?:?]
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219) ~[?:?]
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more
2023-09-15 19:20:50,667 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 91 tasks will be restarted to recover from a global failure.
2023-09-15 19:20:50,667 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkException: Global failure triggered by OperatorCoordinator for 'Source: Customer Arrived Source' (operator 08087f84aed7e9badd413a1d7f5039b4).
    at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder$LazyInitializedCoordinatorContext.failJob(OperatorCoordinatorHolder.java:600) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator$QuiesceableContext.failJob(RecreateOnResetOperatorCoordinator.java:237) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.failJob(SourceCoordinatorContext.java:374) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.runtime.source.coordinator.SourceCoordinatorContext.handleUncaughtExceptionFromAsyncCall(SourceCoordinatorContext.java:387) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:42) ~[flink-dist-1.17.1.jar:1.17.1]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to list subscribed topic partitions due to
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.checkPartitionChanges(KafkaSourceEnumerator.java:234) ~[?:?]
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$null$1(ExecutorNotifier.java:83) ~[flink-dist-1.17.1.jar:1.17.1]
    at org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more
Caused by: java.lang.NoSuchMethodError: 'org.apache.kafka.common.KafkaFuture org.apache.kafka.clients.admin.DescribeTopicsResult.allTopicNames()'
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriberUtils.getTopicMetadata(KafkaSubscriberUtils.java:44) ~[?:?]
    at org.apache.flink.connector.kafka.source.enumerator.subscriber.TopicListSubscriber.getSubscribedTopicPartitions(TopicListSubscriber.java:52) ~[?:?]
    at org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getSubscribedTopicPartitions(KafkaSourceEnumerator.java:219) ~[?:?]
    at org.apache.flink.runtime.source.coordinator.ExecutorNotifier.lambda$notifyReadyAsync$2(ExecutorNotifier.java:80) ~[flink-dist-1.17.1.jar:1.17.1]
    ... 6 more
f
Caused by: java.lang.NoSuchMethodError: 'org.apache.kafka.common.KafkaFuture'
Looks like you have a dependency mismatch Flink is expecting a specific version of Kafka API but an older version was provided. Try cleaning up the cache, the compilation target folder, and rebuild the project. If this is not working check for any explicit kafka api version dependency or analyze the dependency tree and exclude the unwanted version
l
Thank you @Flaviu Cicio! It seems confusing. The only kafka dependency we are using is
org.apache.flink:flink-connector-kafka
, and the version is using flinkVersion as 1.17.1. The only reason that I can think of is that maybe we are referencing flink operator with version 1.17.0, is this the cause?
f
It is possible, all flink dependencies should have the same version Although, I'm skeptical that the kafka dependency changed between minor versions