I have an app that uses Kafka sink. The app was ru...
# troubleshooting
z
I have an app that uses Kafka sink. The app was running fine initially. I just found the below error logs from the task manager. I don’t understand why it suddenly started to complained about the version compatibility.
Copy code
2023-06-07 16:34:45,026 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: data enrichment counters source -> Flat Map -> Process -> (Sink: Writer -> Sink: Committer, Sink: Writer -> Sink: Committer) (1/1)#75129 (8a8c64ebda8a014a3032ac201948250e_cbc357ccb763df2852fee8c4fc7d55f2_0_75129) switched from INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=238443, epoch=2, transactionalId=team-data-de-kong-counters-0-13868}
	at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
	at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:253)
	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:292)
	at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:143)
	at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:72)
	at java.base/java.util.Optional.orElseGet(Unknown Source)
	at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:72)
	... 16 more
Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
	at java.base/java.lang.Class.getDeclaredField(Unknown Source)
	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:262)
	... 22 more
running Flink 1.16, Scala 2.12, java 11.
anyone sees similar issues?
m
I would check if your Kafka brokers got upgraded
z
shouldn’t be the case. I have two apps running, both using the same version of Flink, and I am only seeing the error from one of the apps.
m
Are they both running on the same Flink cluster?
z
Yes.
m
I meant Flink, not Kafka 😅
z
Not the same Flink. I am running the application mode, so each app uses its own Flink.
m
Are you including a different version of Kafka Clients on the classpath?
z
shouldn’t be the case
m
Then I’m out of options
Haven’t seen this before
z
Copy code
"org.apache.flink" % "flink-core" % flinkVersion % "provided",
    "org.apache.flink" % "flink-clients" % flinkVersion % "provided",
    "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
        "org.apache.flink" % "flink-connector-kafka" % flinkVersion,
        "org.apache.kafka" % "kafka-clients" % "3.4.0",
flink version is
1.16.0
m
If I’m not mistaken, that Flink version doesn’t support 3.4.0
But 3.2.3 or something
z
ah. k. interesting it was working fine initially.
thanks for pointing it out. I will switch to the lower version of the kafka client.