Hello guys, i'm trying to deserialize data from a...
# troubleshooting
c
Hello guys, i'm trying to deserialize data from a kafka topic to an specific class. At the begining y was using the kafka consumer like this:
Copy code
KafkaSource<InputData> kafkaSource = KafkaSource.<InputData>builder()
                .setBootstrapServers(kafkaServers)
                .setTopics(topic)
                .setGroupId(consumerGroup)
                .setProperties(kafkaExtraProperties)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(new JsonDeserializationSchema<>(InputData.class))
                .build();
and it works perfetc, but with the valueOnlyDeserializer as the name says i can get just the value (json). To get the other params i have to use:
Copy code
KafkaSource<InputData> kafkaSource = KafkaSource.<InputData>builder()
                .setBootstrapServers(kafkaServers)
                .setTopics(topic)
                .setGroupId(consumerGroup)
                .setProperties(kafkaExtraProperties)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setDeserializer(new InputDataDeserializationSchema())
                .build();
Note that i change the .setOnlyValueDeserializer for .setDeserializer, this needs an implementation of KafkaRecordDeserializationSchema so i make a class for this:
Copy code
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class InputDataDeserializationSchema implements KafkaRecordDeserializationSchema<InputData> {
    private static final Logger LOG = LoggerFactory.getLogger(InputDataDeserializationSchema.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    @Override
    public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<InputData> out) throws IOException {
        InputData data = objectMapper.readValue(record.value(), InputData.class);
        data.setOffSet(record.offset());
        data.setPartition(record.partition());
        data.setTimestamp(String.valueOf(record.timestamp()));
        LOG.warn("Deserialized data: {}", data.getTimestamp());
        out.collect(data);
    }

    @Override
    public TypeInformation<InputData> getProducedType() {
        return TypeInformation.of(InputData.class);
    }

}
This is like the documentation. My problem is when i run the job im getting this error:
Copy code
java.lang.AbstractMethodError: Receiver class flink.Entities.InputDataDeserializationSchema does not define or inherit an implementation of the resolved method 'abstract void deserialize(org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord, org.apache.flink.util.Collector)' of interface org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema.
I'm stuck with this, because i am using the override of deserialize just like the documentation and the error says. If anyone knows how i can solve this, i would appreciate a lot.
d
This could be due to a mismatch between the Flink version used in your project dependency vs. the version of Flink in your execution environment. Thats one of the more common causes of this type of error.
c
Yes, you’re right. I was writing the data in an iceberg table when y have the issue, so for test i changed the sink for an CSV table and it works. So yeah, it seems to be a dependency issue. Thank you man!