JOB ALBERTH FLORES MAMANI
07/30/2024, 2:47 PMpackage spendreport;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spendreport.Deserializer.KafkaAvroDeserializationSchema;
import spendreport.sinks.MessageSink;
/**
* Skeleton code for the datastream walkthrough
*/
public class ConsumerJob {
Logger logs = LoggerFactory.getLogger(ConsumerJob.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "Silver");
properties.setProperty("schema.registry.url", "<http://schema-registry:8081>");
properties.setProperty("specific.avro.reader", "true");
properties.setProperty("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
properties.setProperty("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
Map<String, Object> propertiesMap = new HashMap<>();
for (String key : properties.stringPropertyNames()) {
propertiesMap.put(key, properties.getProperty(key));
}
KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
.setProperties(properties)
.setTopics("PRODUCTS_1")
// .setPartitions(partitionSet)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new KafkaAvroDeserializationSchema(propertiesMap)))
.build();
DataStream<GenericRecord> messages = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
messages
.addSink(new MessageSink())
.name("messages-alerts");
env.execute("Fraud Detection");
}
}
package spendreport.Deserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.flink.util.Collector;
import java.io.IOException;
import java.util.Map;
public class KafkaAvroDeserializationSchema implements KafkaDeserializationSchema<GenericRecord> {
Logger logs = LoggerFactory.getLogger(KafkaAvroDeserializationSchema.class);
private final transient Map<String, ?> properties;
private transient KafkaAvroDeserializer deserializer;
public KafkaAvroDeserializationSchema(Map<String, ?> properties) {
this.properties = properties;
initDeserializer();
}
private void initDeserializer() {
this.deserializer = new KafkaAvroDeserializer();
deserializer.configure(properties, false);
}
@Override
public boolean isEndOfStream(GenericRecord nextElement) {
return false; // Ajusta esto según tus necesidades
}
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<GenericRecord> out) throws IOException {
if (deserializer == null) {
initDeserializer();
}
try {
GenericRecord value = (GenericRecord) deserializer.deserialize(record.topic(), record.value());
out.collect(value);
} catch (Exception e) {
throw new IOException("Error deserializing record", e);
}
}
@Override
public TypeInformation<GenericRecord> getProducedType() {
return TypeInformation.of(GenericRecord.class);
}
@Override
public GenericRecord deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (deserializer == null) {
initDeserializer();
}
try {
logs.info("Mensage:");
logs.info(record.toString());
logs.info(record.value().toString());
logs.info(record.key().toString());
return (GenericRecord) deserializer.deserialize(record.topic(), record.value());
} catch (Exception e) {
throw new Exception("Error deserializing record", e);
}
}
}