I have a small problem , I want to read data from ...
# troubleshooting
a
I have a small problem , I want to read data from kafka which is a simple string value and write it filesystem as parquet The converting is the problem Check the pictures FlinkKafkaConsumer<String> myConsumer = hew FlinkKafkaConsumer<> ( topic: "test2", new SimpleStringSchema(), properties); // Convert stream to GenericRecord using the Avro schema DataStream<GenericRecord> avroStream = stream. map (value -> { GenericRecord record = new GenericData. Record (schema) ; record. put key: "value", value); return record;)) StreamingFileSink<GenericRecord> parquetSink = StreamingFileSink •forBulkFormat (new Path ( pathString: "/home/aly/FLinkConsumerDS/"), ParquetAvroWriters. forGenericRecord(schema)) •withOutputFileConfig(fileConfig) • build);
d
First off how are you specifying dependencies?
Copy code
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-parquet</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>
have you specified compatible versions for each of the above?
btw, are you committed to Avro vs. Parquet or other format?
Ensure that you have the necessary permissions to write to the specified directory
Copy code
/home/aly/FlinkConsumerDS/
a
Yes the version is 1.12
d
Fully initialize the StreamExceptionEnvironment if you have not done so.
a
Yes , the app has the permission as before writing parquet i write text
I wrote text and it succeeded, but when trying to convert the string into parquet it failed
d
Ok so the schema needs to be properly defined and reflects the structure of the data you are working with.
a
The schema is just one field string and I defined it
d
check key/values. In your example, you’re putting a hard-coded key-value pair into the GenericRecord. You should replace “key” and “value” with actual field names from your Avro schema and use value e.g. Kafka message according to your schema.
a
IMG_1482.jpg
d
can you post that in a code block instead?
Copy code
# your code here
1
a
```package org.orangeFlinkDS;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws Exception {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Register Avro's GenericRecord with Avro Serializer
// Define the schema for Avro records with only a value field
String schemaString = "{ \"type\": \"record\", \"name\": \"KafkaRecord\", \"fields\": [ "
+ "{ \"name\": \"value\", \"type\": \"string\" } ] }";
Schema schema = new Schema.Parser().parse(schemaString);
// Set up the properties for Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test_aly");
// Create a new Flink Kafka Consumer
FlinkKafkaConsumer<String> myConsumer =
new FlinkKafkaConsumer<>("test2", new SimpleStringSchema(), properties);
// Add the consumer to the data stream
DataStream<String> stream = env.addSource(myConsumer).setParallelism(2);
// Convert stream to GenericRecord using the Avro schema
DataStream<GenericRecord> avroStream = stream.map(value -> {
GenericRecord record = new GenericData.Record(schema);
record.put("value", value);
return record;
});
// Enable checkpointing
env.enableCheckpointing(10000); // Enable checkpointing every 10 seconds
// Configure file sinks
OutputFileConfig fileConfig = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".parquet")
.build();
StreamingFileSink<GenericRecord> parquetSink = StreamingFileSink
.forBulkFormat(new Path("/home/aly/FlinkConsumerDS/"),
ParquetAvroWriters.forGenericRecord(schema))
.withOutputFileConfig(fileConfig)
.build();
// Add sinks to the streams
avroStream.addSink(parquetSink).setParallelism(1);
// Execute the Flink job
env.execute("Flink Kafka Consumer Example");
}
}```
d
Can you share full stack trace as well?
a
Sure
2024-08-08 130337 com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: reserved (org.apache.avro.Schema$Field) fieldMap (org.apache.avro.Schema$RecordSchema) schema (org.apache.avro.generic.GenericData$Record) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.UnsupportedOperationException at java.base/java.util.Collections$UnmodifiableCollection.add(Collections.java:1060) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106) ... 29 more
2024-08-08 130337
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException
at java.base/java.util.Collections$UnmodifiableCollection.add(Collections.java:1060)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 29 more
And this is the producer code
```while (true) {
// Generate random data
String key = "key-" + random.nextInt(1000);
String value = "value-" + random.nextInt(1000);
// Create a ProducerRecord
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);```
This is pom.xml
```<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.12.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.12.0</version>
</dependency>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<!-- Add any additional dependencies here -->
</dependencies>```
d
Ok, stack trace is interesting.
a
I also using Flink 1.12
d
The stack trace indicates that Kryo is attempting to serialize an org.apache.avro.Schema$RecordSchema object, this fails due to an UnsupportedOperationException which is because Kryo tries to modify the schema’s fieldMap (a collection of fields in the record schema), which is not modifiable.
To resolve this issue, you should try to exclude the Schema and GenericRecord classes from Kryo serialization because you’re already handling their serialization separately using Avro-specific mechanisms when writing to Parquet
a
Ok I will try
Thank you so much
d
sure, I think you can do it via the configuration env
a
But the logic behind converting string to parquet ( use map and so on ) because this is the first fime for me
d
I think it’s done something like this
Copy code
env.getConfig().addDefaultKryoSerializer(Schema.class, UnregisteredTypeSerializer.class);
env.getConfig().addDefaultKryoSerializer(GenericRecord.class, UnregisteredTypeSerializer.class);
also maybe register with the appropriate serializers so Flink knows how to handle these types
Copy code
// register Avro-specific serializers if you haven't already
env.getConfig().registerTypeWithKryoSerializer(Schema.class, SpecificRecordSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(GenericRecord.class, AvroGenericRecordSerializer.class);
Actually for Flink 1.12 you might not do this registration …
I think this is enough
Copy code
// exclude Schema and GenericRecord from Kryo serialization
env.getConfig().getAutoTypeRegistration().disableAutoRegistration();
env.getConfig().addDefaultKryoSerializer(Schema.class, UnregisteredTypeSerializer.class);
env.getConfig().addDefaultKryoSerializer(GenericRecord.class, UnregisteredTypeSerializer.class);
Something like that
This tells Flink not to attempt to automatically serialize Schema and GenericRecord instances using Kryo. Since your using AvroParquetWriters to write GenericRecord instances to Parquet, and your Kafka source should deserializes strings into records correctly, managing these specific Avro types with Kryo isn’t necessary and can lead to issues like you are seeing.
a
I got you , but the IDE said Cannot resolve method 'getAutoTypeRegistration' in 'ExecutionConfig'
Cannot resolve symbol 'UnregisteredTypeSerializer'
d
yes, dont do the registration since your on Flink 1.12
a few more things related to the code …
Try disabling generic types. By calling
Copy code
env.getConfig().disableGenericTypes();
This prevents Kyro from automatically trying to serialize types it encounters
a
So now should also change GenericRecord
In this
```DataStream<GenericRecord> avroStream = stream.map(value -> {
GenericRecord record = new GenericData.Record(schema);
record.put("value", value);
return record;
});```
d
yes, that looks right and you might also check all dependencies are compatible with Flink 1.12
a
If I convert it to Table API , Should be easier ?
d
You may not need to change this code
it looks ok we are just adjusting how serializers get applied
It might be better to use Table API but I think the issue can be resolved by adjusting how and when Kyro is applied
a
I can't disable generic types without stop using generic record
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Generic types have been disabled in the ExecutionConfig and type org.apache.avro.generic.GenericRecord is treated as a generic type.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type org.apache.avro.generic.GenericRecord is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:836)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:320)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:305)
at org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:63)
d
did you try doing it without the exclusion code?
do you still get this error if you dont do the exclusion?
a
Yes , This is the code
```
public class Main {
public static void main(String[] args) throws Exception {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableGenericTypes();
// Define the schema for Avro records with only a value field
String schemaString = "{ \"type\": \"record\", \"name\": \"KafkaRecord\", \"fields\": [ "
+ "{ \"name\": \"value\", \"type\": \"string\" } ] }";
Schema schema = new Schema.Parser().parse(schemaString);
// Set up the properties for Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test_aly");
// Create a new Flink Kafka Consumer
FlinkKafkaConsumer<String> myConsumer =
new FlinkKafkaConsumer<>("test2", new SimpleStringSchema(), properties);
// Add the consumer to the data stream
DataStream<String> stream = env.addSource(myConsumer).setParallelism(2);
// Convert stream to GenericRecord using the Avro schema
DataStream<GenericRecord> avroStream = stream.map(value -> {
GenericRecord record = new GenericData.Record(schema);
record.put("value", value);
return record;
});
// Enable checkpointing
env.enableCheckpointing(10000); // Enable checkpointing every 10 seconds
// Configure file sinks
OutputFileConfig fileConfig = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".parquet")
.build();
StreamingFileSink<GenericRecord> parquetSink = StreamingFileSink
.forBulkFormat(new Path("/home/aly/FlinkConsumerDS/"),
ParquetAvroWriters.forGenericRecord(schema))
.withOutputFileConfig(fileConfig)
.build();
// Add sinks to the streams
avroStream.addSink(parquetSink).setParallelism(1);
// Execute the Flink job
env.execute("Flink Kafka Consumer Example");
}
}```
d
Ok I think you are right that you cannot disable generics …
For Flink 1.12 (assuming you dont want to upgrade) you can consider to register a custom serializer for this scenario
Copy code
Configuration config = new Configuration();
config.setInteger("taskmanager.numberOfTaskSlots", 1);

final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config);
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironment(config);
env.getConfig().registerTypeWithKryoSerializer(GenericRecord.class, new GenericRecordAvroSerializer(yourSchema).getClass());
streamEnv.getConfig().registerTypeWithKryoSerializer(GenericRecord.class, new GenericRecordAvroSerializer(yourSchema).getClass());
This is how it would be applied. But you would need maybe to write a custom serializer.
a
Ok thanks , I will try
d
Yeah if you want to stay with DataStreams API that would seem to be the way forward
But I am not saying its better than Table API or SQL API which would use Flink’s built in support for Avro formats when reading/writing data. It would be handled more or less transparently in that case.
that’s a very viable approach as well.
a
I made a two POC , one for data stream and another for table api .. I will try tomorrow on table API to see if this will be solved
d
Keep in mind that Table API approach would be implemented slightly differently for Flink 1.14 and above than previous versions.
👍 1
a
Very much thanks
d
Good luck! and let us know your results!