Hello Team, I am trying to consume some data from...
# troubleshooting
t
Hello Team, I am trying to consume some data from Kafka with Flink 1.17. Topics with Confluent Avro schema work fine. But with embedded schema getting error. Seems like its having problems to deserialize those Avro messages. Any idea how to troubleshoot further?
Copy code
tEnv.executeSql("CREATE TABLE table (\n" +
                "  `field` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                "  'topic' = 'dat.topic',\n" +
                "  'properties.bootstrap.servers' = '" + brokers + "',\n" +
                "  'properties.group.id' = 'testGroup3',\n" +
                "  'scan.startup.mode' = 'earliest-offset',\n" +
                "  'format' = 'avro',\n" +
                "  'properties.security.protocol' = 'SSL',\n" +
                "  'properties.ssl.key.password' = '" + keyPassword+ "',\n" +
                "  'properties.ssl.keystore.location' = \'" + keyStoreLocation + "\',\n" +
                "  'properties.ssl.keystore.password' = '" + keyPassword+ "',\n" +
                "  'properties.ssl.truststore.location' = \'" + trustStoreLocation + "\',\n" +
                "  'properties.ssl.truststore.password' = '" + keyPassword+ "'\n" +
                ")");

        Table table2 = tEnv.from("table");
        table2.limit(10).execute().print();


Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = dat.topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 169392, serialized key size = 24, serialized value size = 27836, headers = RecordHeaders(headers = [RecordHeader(key = RaType, value = [76]), RecordHeader(key = kafkaSerializationType, value = [65, 86, 82, 79])], isReadOnly = false), key = [C@56dcb40d, value = [C@04f4fc48).
    at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
    at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
    ... 14 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
f
Hi Tomasz! Can you provide the full stacktrace? This might help clarify: https://stackoverflow.com/questions/66065158/failed-to-deserialize-avro-record-apache-flink-sql-cli
👍 1
t
But, they use there Confluent Avro, in my case I have no problems to deserialize topics with such schema. The problem I have with Non-Confluent embedded schemas.
14:21:35,539 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: table[1] (1/1)#0 (8bfd86cf5460a82bbe3b8fbd8284f9c1_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FAILED with failure cause:
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144) ~[flink-connector-base-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-runtime-1.17.0.jar:1.17.0]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = dat.topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1693927525534, serialized key size = 24, serialized value size = 27836, headers = RecordHeaders(headers = [RecordHeader(key = RaType, value = [76]), RecordHeader(key = kafkaSerializationType, value = [65, 86, 82, 79])], isReadOnly = false), key = [R@4f29fa5, value = [R@10d99c0e).
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
... 14 more
Caused by: java.io.IOException: Failed to deserialize Avro record.
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:106) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-core-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
... 14 more
Caused by: java.lang.ArrayIndexOutOfBoundsException: Index -40 out of bounds for length 2
at <http://org.apache.avro.io|org.apache.avro.io>.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) ~[avro-1.11.1.jar:1.11.1]
at <http://org.apache.avro.io|org.apache.avro.io>.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) ~[avro-1.11.1.jar:1.11.1]
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) ~[avro-1.11.1.jar:1.11.1]
at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:103) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:46) ~[flink-avro-1.17.0.jar:1.17.0]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-core-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
... 14 more
14:21:35,545 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for Source: table[1] (1/1)#0 (8bfd86cf5460a82bbe3b8fbd8284f9c1_bc764cd8ddf7a0cff126f51c16239658_0_0).
14:21:35,550 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task Source: table[1] (1/1)#0 8bfd86cf5460a82bbe3b8fbd8284f9c1_bc764cd8ddf7a0cff126f51c16239658_0_0.
14:21:35,559 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: table[1] (1/1) (8bfd86cf5460a82bbe3b8fbd8284f9c1_bc764cd8ddf7a0cff126f51c16239658_0_0) switched from RUNNING to FAILED on 0122673e-2432-48ff-b43e-fe3cddba30f2 @ localhost (dataPort=-1).
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[flink-connector-kafka-1.17.0.jar:1.17.0]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:144) ~[flink-connector-base-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-streaming-java-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) ~[flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) ~[flink-runtime-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) ~[flink-runtime-1.17.0.jar:1.17.0]
at java.lang.Thread.run(Thread.java:829) ~[?:?]
m
You’re specifying the wrong format
It shouldn’t be
avro
but
avro-confluent
IIRC
t
@Martijn Visser I did follow this documentation https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/avro/ where seems like they are the properties for non confluent avro schema. This specific topic which I consume has embedded schema in the message and was serialised not by confluent avro. I can deserialise “Confluent avro topics” with no problem, but that one is problematic 🙂
m
I misread, I thought you wanted to read Confluent AVRO 😅
In most cases, the AVRO message from Kafka actually isn’t as defined in the table definition
t
yeah I thoughts so, thanks Martijn 🙂