Hangyu Wang
06/06/2023, 9:31 AMCaused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/ConsumerRecord
at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2473)
at java.base/java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1452)
at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:381)
at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:355)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:355)
at java.base/java.io.ObjectStreamClass$Caches$1.computeValue(ObjectStreamClass.java:98)
at java.base/java.io.ObjectStreamClass$Caches$1.computeValue(ObjectStreamClass.java:95)
at java.base/java.io.ClassCache$1.computeValue(ClassCache.java:73)
at java.base/java.io.ClassCache$1.computeValue(ClassCache.java:70)
at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
at java.base/java.lang.ClassValue.get(ClassValue.java:116)
at java.base/java.io.ClassCache.get(ClassCache.java:84)
at java.base/java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:336)
at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:542)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2020)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:476)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:166)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 45 more
And sure I have put the kafka-client.jar to the flink/lib/Martijn Visser
06/06/2023, 9:32 AMflink-sql-connector-kafka
JAR?Martijn Visser
06/06/2023, 9:32 AMHangyu Wang
06/06/2023, 9:33 AMMartijn Visser
06/06/2023, 9:34 AMHangyu Wang
06/06/2023, 9:34 AM<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
Martijn Visser
06/06/2023, 9:34 AMHangyu Wang
06/06/2023, 9:35 AMString creationSql = "CREATE TABLE metric_event_test (\n" +
" metric_name STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'metric',\n" +
" 'properties.bootstrap.servers' = 'localhost:9092',\n" +
" 'properties.group.id' = 'testGroup',\n" +
" 'format' = 'protobuf',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'protobuf.message-class-name' = 'Metric',\n" +
" 'protobuf.ignore-parse-errors' = 'true'" +
")";
tableEnv.executeSql(creationSql);
Hangyu Wang
06/06/2023, 9:36 AMHangyu Wang
06/06/2023, 9:37 AMimport org.apache.flink.table.api.Table;
Probably the Table API?Martijn Visser
06/06/2023, 9:38 AMHangyu Wang
06/06/2023, 9:43 AMHangyu Wang
06/06/2023, 9:48 AMclassloader.resolve-order: parent-first
, I am not sure if it matters.Martijn Visser
06/06/2023, 9:48 AMMartijn Visser
06/06/2023, 9:49 AMflink-connector-kafka
doesn't contain any Kafka dependencies, so you'll need to add all of them yourselfHangyu Wang
06/06/2023, 9:51 AMMartijn Visser
06/06/2023, 9:51 AMMartijn Visser
06/06/2023, 9:51 AMHangyu Wang
06/06/2023, 9:52 AMHangyu Wang
06/06/2023, 9:52 AMOr are you running this from your IDE?Yes
Martijn Visser
06/06/2023, 9:53 AMMartijn Visser
06/06/2023, 9:54 AMHangyu Wang
06/06/2023, 9:56 AM<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-config</artifactId>
<version>1.12.481</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.3.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-aws</artifactId>
<version>1.1.0</version>
<exclusions>
<exclusion>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-1.14</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.14</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.17.131</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-common</artifactId>
<version>1.2.1</version>
</dependency>
Martijn Visser
06/06/2023, 9:57 AMMartijn Visser
06/06/2023, 10:06 AMMartijn Visser
06/06/2023, 10:07 AMHangyu Wang
06/06/2023, 10:17 AMHangyu Wang
06/07/2023, 2:47 AMReadProtobuf.java
in the recipe in my local IDE, but faced an error:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: get com.immerok.cookbook.Transaction descriptors error!
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
Caused by: java.lang.IllegalArgumentException: get com.immerok.cookbook.Transaction descriptors error!
at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:126)
at org.apache.flink.formats.protobuf.deserialize.PbRowDataDeserializationSchema.<init>(PbRowDataDeserializationSchema.java:58)
at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:45)
at org.apache.flink.formats.protobuf.PbDecodingFormat.createRuntimeDecoder(PbDecodingFormat.java:32)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:568)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:244)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:466)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:125)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3743)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2666)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2233)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2147)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2092)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:700)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:686)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3589)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:599)
at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:216)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:192)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1580)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1285)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:397)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:282)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:738)
at com.immerok.cookbook.ReadProtobuf.defineWorkflow(ReadProtobuf.java:57)
at com.immerok.cookbook.ReadProtobuf.runJob(ReadProtobuf.java:44)
at com.immerok.cookbook.ReadProtobuf.main(ReadProtobuf.java:20)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 9 more
Caused by: java.lang.ClassCastException: class com.google.protobuf.Descriptors$Descriptor cannot be cast to class com.google.protobuf.Descriptors$Descriptor (com.google.protobuf.Descriptors$Descriptor is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @b7c4869; com.google.protobuf.Descriptors$Descriptor is in unnamed module of loader 'app')
at org.apache.flink.formats.protobuf.util.PbFormatUtils.getDescriptor(PbFormatUtils.java:123)
... 43 more
How can I resolve it?Martijn Visser
06/07/2023, 4:40 AMHangyu Wang
06/07/2023, 6:02 AMMartijn Visser
06/07/2023, 6:18 AM