Camilo Marin
07/29/2024, 4:08 PMKafkaSource<InputData> kafkaSource = KafkaSource.<InputData>builder()
.setBootstrapServers(kafkaServers)
.setTopics(topic)
.setGroupId(consumerGroup)
.setProperties(kafkaExtraProperties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(InputData.class))
.build();
and it works perfetc, but with the valueOnlyDeserializer as the name says i can get just the value (json). To get the other params i have to use:
KafkaSource<InputData> kafkaSource = KafkaSource.<InputData>builder()
.setBootstrapServers(kafkaServers)
.setTopics(topic)
.setGroupId(consumerGroup)
.setProperties(kafkaExtraProperties)
.setStartingOffsets(OffsetsInitializer.latest())
.setDeserializer(new InputDataDeserializationSchema())
.build();
Note that i change the .setOnlyValueDeserializer for .setDeserializer, this needs an implementation of KafkaRecordDeserializationSchema so i make a class for this:
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class InputDataDeserializationSchema implements KafkaRecordDeserializationSchema<InputData> {
private static final Logger LOG = LoggerFactory.getLogger(InputDataDeserializationSchema.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<InputData> out) throws IOException {
InputData data = objectMapper.readValue(record.value(), InputData.class);
data.setOffSet(record.offset());
data.setPartition(record.partition());
data.setTimestamp(String.valueOf(record.timestamp()));
LOG.warn("Deserialized data: {}", data.getTimestamp());
out.collect(data);
}
@Override
public TypeInformation<InputData> getProducedType() {
return TypeInformation.of(InputData.class);
}
}
This is like the documentation. My problem is when i run the job im getting this error:
java.lang.AbstractMethodError: Receiver class flink.Entities.InputDataDeserializationSchema does not define or inherit an implementation of the resolved method 'abstract void deserialize(org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord, org.apache.flink.util.Collector)' of interface org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema.
I'm stuck with this, because i am using the override of deserialize just like the documentation and the error says.
If anyone knows how i can solve this, i would appreciate a lot.D. Draco O'Brien
07/29/2024, 11:39 PMCamilo Marin
07/29/2024, 11:42 PM