Hi, team! I am trying to create flink table with c...
# troubleshooting
h
Hi, team! I am trying to create flink table with connector='kafka'. Now I faced an error:
Copy code
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/ConsumerRecord
        at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
        at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
        at java.base/java.lang.Class.getDeclaredMethod(Class.java:2473)
        at java.base/java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1452)
        at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:381)
        at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:355)
        at java.base/java.security.AccessController.doPrivileged(Native Method)
        at java.base/java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:355)
        at java.base/java.io.ObjectStreamClass$Caches$1.computeValue(ObjectStreamClass.java:98)
        at java.base/java.io.ObjectStreamClass$Caches$1.computeValue(ObjectStreamClass.java:95)
        at java.base/java.io.ClassCache$1.computeValue(ClassCache.java:73)
        at java.base/java.io.ClassCache$1.computeValue(ClassCache.java:70)
        at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
        at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
        at java.base/java.lang.ClassValue.get(ClassValue.java:116)
        at java.base/java.io.ClassCache.get(ClassCache.java:84)
        at java.base/java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:336)
        at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:542)
        at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2020)
        at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:476)
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:166)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        ... 45 more
And sure I have put the kafka-client.jar to the flink/lib/
m
Are you using the
flink-sql-connector-kafka
JAR?
And which format are you using?
h
format = 'protobuf'
m
And are you using the SQL connector Kafka artifact?
h
Copy code
<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>
m
Are you using Table API or SQL?
h
Copy code
String creationSql = "CREATE TABLE metric_event_test (\n" +
                " metric_name STRING\n" +
                ") WITH (\n" +
                    " 'connector' = 'kafka',\n" +
                    " 'topic' = 'metric',\n" +
                    " 'properties.bootstrap.servers' = 'localhost:9092',\n" +
                    " 'properties.group.id' = 'testGroup',\n" +
                    " 'format' = 'protobuf',\n" +
                    " 'scan.startup.mode' = 'earliest-offset',\n" +
                    " 'protobuf.message-class-name' = 'Metric',\n" +
                    " 'protobuf.ignore-parse-errors' = 'true'" +
                ")";
        tableEnv.executeSql(creationSql);
And yes.
Copy code
import org.apache.flink.table.api.Table;
Probably the Table API?
m
Yeah that's Table API
h
Any idea how to fix it? Thanks~
And previously I set
classloader.resolve-order: parent-first
, I am not sure if it matters.
m
You shouldn't change those settings just to resolve this problem
The thing is, is that
flink-connector-kafka
doesn't contain any Kafka dependencies, so you'll need to add all of them yourself
h
I have added kafka-clients to lib/. But no luck the error still the same
m
Did you restart your cluster after doing that?
Or are you running this from your IDE?
h
Yes, I restarted the cluster after added the jar.
Or are you running this from your IDE?
Yes
m
Have you set to your IDE to include dependencies that are in "Provided" scope?
I wasn't expecting you to run it from your IDE, if you have your own cluster running too
h
Actually I don't set any of the dependencies to "provided" scope. Here is dependency settings in pom.xml
Copy code
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.4</version>
        </dependency>

        <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>aws-java-sdk-config</artifactId>
            <version>1.12.481</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>3.3.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>1.2.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.3.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-core</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-aws</artifactId>
            <version>1.1.0</version>
            <exclusions>
                <exclusion>
                    <groupId>software.amazon.awssdk</groupId>
                    <artifactId>s3</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-1.14</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime-1.14</artifactId>
            <version>1.1.0</version>
        </dependency>

        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>s3</artifactId>
            <version>2.17.131</version>
        </dependency>

        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-common</artifactId>
            <version>1.2.1</version>
        </dependency>
m
That won't work.
Have a look at some of the recipes at https://github.com/immerok/recipes - There is a variety of dependencies that should be set to provided. By not setting a scope, everything will be compiled and your artifact will balloon.
👍 1
Flink is currently using Kafka Clients 3.2.3, not 3.4.0. It could be that the classes have moved in a different Kafka Clients version, especially since it's a different minor version
h
Thanks a lot. I change the version to 3.2.3. Now sometimes the wired thing is jar is working well without any exception, sometimes the ClassNotFoundException error occurs.
Hi @Martijn Visser, I try to run the
ReadProtobuf.java
in the recipe in my local IDE, but faced an error:
Copy code
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: get com.immerok.cookbook.Transaction descriptors error!
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.IllegalArgumentException: get com.immerok.cookbook.Transaction descriptors error!
        at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:126)
        at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:58)
        at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:45)
        at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:32)
        at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:568)
        at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:244)
        at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466)
        at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
        at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599)
        at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738)
        at com.immerok.cookbook.ReadProtobuf.defineWorkflow(ReadProtobuf.java:57)
        at com.immerok.cookbook.ReadProtobuf.runJob(ReadProtobuf.java:44)
        at com.immerok.cookbook.ReadProtobuf.main(ReadProtobuf.java:20)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 9 more
Caused by: java.lang.ClassCastException: class com.google.protobuf.Descriptors$Descriptor cannot be cast to class com.google.protobuf.Descriptors$Descriptor (com.google.protobuf.Descriptors$Descriptor is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @b7c4869; com.google.protobuf.Descriptors$Descriptor is in unnamed module of loader 'app')
        at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:123)
        ... 43 more
How can I resolve it?
m
Looks like you have two different versions of Protobuf on the classpath.
h
I got the same error when I submit job to the cluster with application mode
m
When Flink or it’s dependencies use Protobuf, the deployment mode doesn’t matter