Hi guys, I'm starting with Flink. I need help to c...
# troubleshooting
j
Hi guys, I'm starting with Flink. I need help to consume topics from Kafka where the format of the value and key is avro, I didn't find any examples regarding that.
Copy code
package spendreport;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.kafka.clients.consumer.ConsumerConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import spendreport.Deserializer.KafkaAvroDeserializationSchema;
import spendreport.sinks.MessageSink;
/**
 * Skeleton code for the datastream walkthrough
 */
public class ConsumerJob {
    Logger logs = LoggerFactory.getLogger(ConsumerJob.class);
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "Silver");
        properties.setProperty("schema.registry.url", "<http://schema-registry:8081>");
        properties.setProperty("specific.avro.reader", "true");
        properties.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        properties.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");

        Map<String, Object> propertiesMap = new HashMap<>();
        for (String key : properties.stringPropertyNames()) {
            propertiesMap.put(key, properties.getProperty(key));
        }

        KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
            .setProperties(properties)
            .setTopics("PRODUCTS_1")
            // .setPartitions(partitionSet)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaAvroDeserializationSchema(propertiesMap)))
            .build();

        DataStream<GenericRecord> messages = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        messages
            .addSink(new MessageSink())
            .name("messages-alerts");

        env.execute("Fraud Detection");
    }
}

package spendreport.Deserializer;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.util.Map;

public class KafkaAvroDeserializationSchema implements KafkaDeserializationSchema<GenericRecord> {
    Logger logs = LoggerFactory.getLogger(KafkaAvroDeserializationSchema.class);
    private final transient Map<String, ?> properties;
    private transient KafkaAvroDeserializer deserializer;

    public KafkaAvroDeserializationSchema(Map<String, ?> properties) {
        this.properties = properties;
        initDeserializer();
    }

    private void initDeserializer() {
        this.deserializer = new KafkaAvroDeserializer();
        deserializer.configure(properties, false);
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false; // Ajusta esto según tus necesidades
    }

    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<GenericRecord> out) throws IOException {
        if (deserializer == null) {
            initDeserializer();
        }
        try {
            GenericRecord value = (GenericRecord) deserializer.deserialize(record.topic(), record.value());
            out.collect(value);
        } catch (Exception e) {
            throw new IOException("Error deserializing record", e);
        }
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeInformation.of(GenericRecord.class);
    }

    @Override
    public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
        if (deserializer == null) {
            initDeserializer();
        }
        try {
            logs.info("Mensage:");
            logs.info(record.toString());
            logs.info(record.value().toString());
            logs.info(record.key().toString());
            return (GenericRecord) deserializer.deserialize(record.topic(), record.value());
        } catch (Exception e) {
            throw new Exception("Error deserializing record", e);
        }
    }
}