Tomasz Krol
09/19/2023, 11:58 AMtEnv.executeSql("CREATE TABLE table (\n" +
" `field` STRING\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'dat.topic',\n" +
" 'properties.bootstrap.servers' = '" + brokers + "',\n" +
" 'properties.group.id' = 'testGroup3',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'format' = 'avro',\n" +
" 'properties.security.protocol' = 'SSL',\n" +
" 'properties.ssl.key.password' = '" + keyPassword+ "',\n" +
" 'properties.ssl.keystore.location' = \'" + keyStoreLocation + "\',\n" +
" 'properties.ssl.keystore.password' = '" + keyPassword+ "',\n" +
" 'properties.ssl.truststore.location' = \'" + trustStoreLocation + "\',\n" +
" 'properties.ssl.truststore.password' = '" + keyPassword+ "'\n" +
")");
Table table2 = tEnv.from("table");
table2.limit(10).execute().print();
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = dat.topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 169392, serialized key size = 24, serialized value size = 27836, headers = RecordHeaders(headers = [RecordHeader(key = RaType, value = [76]), RecordHeader(key = kafkaSerializationType, value = [65, 86, 82, 79])], isReadOnly = false), key = [C@56dcb40d, value = [C@04f4fc48).
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
... 14 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
Flaviu Cicio
09/19/2023, 12:17 PMTomasz Krol
09/19/2023, 12:20 PMTomasz Krol
09/19/2023, 12:29 PM14:21:35,539 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: table[1] (1/1)#0 (8bfd86cf5460a82bbe3b8fbd8284f9c1_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FAILED with failure cause:
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144) ~[flink-connector-base-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-runtime-1.17.0.jar:1.17.0]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = dat.topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1693927525534, serialized key size = 24, serialized value size = 27836, headers = RecordHeaders(headers = [RecordHeader(key = RaType, value = [76]), RecordHeader(key = kafkaSerializationType, value = [65, 86, 82, 79])], isReadOnly = false), key = [R@4f29fa5, value = [R@10d99c0e).
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
... 14 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-core-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
... 14 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -40 out of bounds for length 2
at <http://org.apache.avro.io|org.apache.avro.io>.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) ~[avro-1.11.1.jar:1.11.1]
at <http://org.apache.avro.io|org.apache.avro.io>.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.1.jar:1.11.1]
at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-core-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
... 14 more
14:21:35,545 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for Source: table[1] (1/1)#0 (8bfd86cf5460a82bbe3b8fbd8284f9c1_bc764cd8ddf7a0cff126f51c16239658_0_0).
14:21:35,550 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: table[1] (1/1)#0 8bfd86cf5460a82bbe3b8fbd8284f9c1_bc764cd8ddf7a0cff126f51c16239658_0_0.
14:21:35,559 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: table[1] (1/1) (8bfd86cf5460a82bbe3b8fbd8284f9c1_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FAILED on 0122673e-2432-48ff-b43e-fe3cddba30f2 @ localhost (dataPort=-1).
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144) ~[flink-connector-base-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-runtime-1.17.0.jar:1.17.0]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
Martijn Visser
09/19/2023, 7:22 PMMartijn Visser
09/19/2023, 7:22 PMavro
but avro-confluent
IIRCTomasz Krol
09/20/2023, 8:24 AMMartijn Visser
09/20/2023, 8:29 AMMartijn Visser
09/20/2023, 8:30 AMTomasz Krol
09/20/2023, 8:42 AM