Aly Ayman
08/08/2024, 9:51 AMD. Draco O'Brien
08/08/2024, 10:00 AM<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
D. Draco O'Brien
08/08/2024, 10:00 AMD. Draco O'Brien
08/08/2024, 10:01 AMD. Draco O'Brien
08/08/2024, 10:04 AM/home/aly/FlinkConsumerDS/
Aly Ayman
08/08/2024, 10:05 AMD. Draco O'Brien
08/08/2024, 10:05 AMAly Ayman
08/08/2024, 10:05 AMAly Ayman
08/08/2024, 10:06 AMD. Draco O'Brien
08/08/2024, 10:06 AMAly Ayman
08/08/2024, 10:07 AMD. Draco O'Brien
08/08/2024, 10:07 AMAly Ayman
08/08/2024, 10:09 AMD. Draco O'Brien
08/08/2024, 10:11 AM# your code here
Aly Ayman
08/08/2024, 10:12 AM```package org.orangeFlinkDS;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) throws Exception {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Register Avro's GenericRecord with Avro Serializer
// Define the schema for Avro records with only a value field
String schemaString = "{ \"type\": \"record\", \"name\": \"KafkaRecord\", \"fields\": [ "
+ "{ \"name\": \"value\", \"type\": \"string\" } ] }";
Schema schema = new Schema.Parser().parse(schemaString);
// Set up the properties for Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test_aly");
// Create a new Flink Kafka Consumer
FlinkKafkaConsumer<String> myConsumer =
new FlinkKafkaConsumer<>("test2", new SimpleStringSchema(), properties);
// Add the consumer to the data stream
DataStream<String> stream = env.addSource(myConsumer).setParallelism(2);
// Convert stream to GenericRecord using the Avro schema
DataStream<GenericRecord> avroStream = stream.map(value -> {
GenericRecord record = new GenericData.Record(schema);
record.put("value", value);
return record;
});
// Enable checkpointing
env.enableCheckpointing(10000); // Enable checkpointing every 10 seconds
// Configure file sinks
OutputFileConfig fileConfig = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".parquet")
.build();
StreamingFileSink<GenericRecord> parquetSink = StreamingFileSink
.forBulkFormat(new Path("/home/aly/FlinkConsumerDS/"),
ParquetAvroWriters.forGenericRecord(schema))
.withOutputFileConfig(fileConfig)
.build();
// Add sinks to the streams
avroStream.addSink(parquetSink).setParallelism(1);
// Execute the Flink job
env.execute("Flink Kafka Consumer Example");
}
}```
D. Draco O'Brien
08/08/2024, 10:19 AMAly Ayman
08/08/2024, 10:20 AMAly Ayman
08/08/2024, 10:20 AMAly Ayman
08/08/2024, 10:20 AM2024-08-08 130337
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:262)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:69)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:193)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:179)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException
at java.base/java.util.Collections$UnmodifiableCollection.add(Collections.java:1060)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
... 29 more
Aly Ayman
08/08/2024, 10:21 AM```while (true) {
// Generate random data
String key = "key-" + random.nextInt(1000);
String value = "value-" + random.nextInt(1000);
// Create a ProducerRecord
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);```
Aly Ayman
08/08/2024, 10:22 AM```<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_2.11</artifactId>
<version>1.12.7</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-avro -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>1.12.0</version>
</dependency>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.12.0</version>
</dependency>
<!-- Add any additional dependencies here -->
</dependencies>```
D. Draco O'Brien
08/08/2024, 10:23 AMAly Ayman
08/08/2024, 10:23 AMD. Draco O'Brien
08/08/2024, 10:25 AMD. Draco O'Brien
08/08/2024, 10:26 AMAly Ayman
08/08/2024, 10:27 AMAly Ayman
08/08/2024, 10:27 AMD. Draco O'Brien
08/08/2024, 10:28 AMAly Ayman
08/08/2024, 10:29 AMD. Draco O'Brien
08/08/2024, 10:30 AMenv.getConfig().addDefaultKryoSerializer(Schema.class, UnregisteredTypeSerializer.class);
env.getConfig().addDefaultKryoSerializer(GenericRecord.class, UnregisteredTypeSerializer.class);
D. Draco O'Brien
08/08/2024, 10:34 AM// register Avro-specific serializers if you haven't already
env.getConfig().registerTypeWithKryoSerializer(Schema.class, SpecificRecordSerializer.class);
env.getConfig().registerTypeWithKryoSerializer(GenericRecord.class, AvroGenericRecordSerializer.class);
D. Draco O'Brien
08/08/2024, 10:36 AMD. Draco O'Brien
08/08/2024, 10:37 AM// exclude Schema and GenericRecord from Kryo serialization
env.getConfig().getAutoTypeRegistration().disableAutoRegistration();
env.getConfig().addDefaultKryoSerializer(Schema.class, UnregisteredTypeSerializer.class);
env.getConfig().addDefaultKryoSerializer(GenericRecord.class, UnregisteredTypeSerializer.class);
D. Draco O'Brien
08/08/2024, 10:37 AMD. Draco O'Brien
08/08/2024, 10:41 AMAly Ayman
08/08/2024, 10:42 AMAly Ayman
08/08/2024, 10:42 AMD. Draco O'Brien
08/08/2024, 10:46 AMD. Draco O'Brien
08/08/2024, 10:46 AMD. Draco O'Brien
08/08/2024, 10:47 AMenv.getConfig().disableGenericTypes();
D. Draco O'Brien
08/08/2024, 10:48 AMAly Ayman
08/08/2024, 10:50 AMAly Ayman
08/08/2024, 10:51 AM```DataStream<GenericRecord> avroStream = stream.map(value -> {
GenericRecord record = new GenericData.Record(schema);
record.put("value", value);
return record;
});```
D. Draco O'Brien
08/08/2024, 10:55 AMAly Ayman
08/08/2024, 10:58 AMD. Draco O'Brien
08/08/2024, 10:58 AMD. Draco O'Brien
08/08/2024, 10:59 AMD. Draco O'Brien
08/08/2024, 11:00 AMAly Ayman
08/08/2024, 11:01 AMorg.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Generic types have been disabled in the ExecutionConfig and type org.apache.avro.generic.GenericRecord is treated as a generic type.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type org.apache.avro.generic.GenericRecord is treated as a generic type.
at org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:836)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:320)
at org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:305)
at org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator.translateInternal(AbstractOneInputTransformationTranslator.java:63)
D. Draco O'Brien
08/08/2024, 11:02 AMD. Draco O'Brien
08/08/2024, 11:02 AMAly Ayman
08/08/2024, 11:03 AM```
public class Main {
public static void main(String[] args) throws Exception {
// Set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableGenericTypes();
// Define the schema for Avro records with only a value field
String schemaString = "{ \"type\": \"record\", \"name\": \"KafkaRecord\", \"fields\": [ "
+ "{ \"name\": \"value\", \"type\": \"string\" } ] }";
Schema schema = new Schema.Parser().parse(schemaString);
// Set up the properties for Kafka
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test_aly");
// Create a new Flink Kafka Consumer
FlinkKafkaConsumer<String> myConsumer =
new FlinkKafkaConsumer<>("test2", new SimpleStringSchema(), properties);
// Add the consumer to the data stream
DataStream<String> stream = env.addSource(myConsumer).setParallelism(2);
// Convert stream to GenericRecord using the Avro schema
DataStream<GenericRecord> avroStream = stream.map(value -> {
GenericRecord record = new GenericData.Record(schema);
record.put("value", value);
return record;
});
// Enable checkpointing
env.enableCheckpointing(10000); // Enable checkpointing every 10 seconds
// Configure file sinks
OutputFileConfig fileConfig = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".parquet")
.build();
StreamingFileSink<GenericRecord> parquetSink = StreamingFileSink
.forBulkFormat(new Path("/home/aly/FlinkConsumerDS/"),
ParquetAvroWriters.forGenericRecord(schema))
.withOutputFileConfig(fileConfig)
.build();
// Add sinks to the streams
avroStream.addSink(parquetSink).setParallelism(1);
// Execute the Flink job
env.execute("Flink Kafka Consumer Example");
}
}```
D. Draco O'Brien
08/08/2024, 11:05 AMD. Draco O'Brien
08/08/2024, 11:05 AMD. Draco O'Brien
08/08/2024, 11:06 AMConfiguration config = new Configuration();
config.setInteger("taskmanager.numberOfTaskSlots", 1);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(config);
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.createLocalEnvironment(config);
env.getConfig().registerTypeWithKryoSerializer(GenericRecord.class, new GenericRecordAvroSerializer(yourSchema).getClass());
streamEnv.getConfig().registerTypeWithKryoSerializer(GenericRecord.class, new GenericRecordAvroSerializer(yourSchema).getClass());
D. Draco O'Brien
08/08/2024, 11:07 AMAly Ayman
08/08/2024, 11:07 AMD. Draco O'Brien
08/08/2024, 11:09 AMD. Draco O'Brien
08/08/2024, 11:11 AMD. Draco O'Brien
08/08/2024, 11:11 AMAly Ayman
08/08/2024, 11:12 AMD. Draco O'Brien
08/08/2024, 11:14 AMAly Ayman
08/08/2024, 11:15 AMD. Draco O'Brien
08/08/2024, 11:15 AM