:wave: Hi there, I'm looking to add some function...
# troubleshooting
k
👋 Hi there, I'm looking to add some functionality to a custom Format that reads the
topic
metadata column in the context of serialization using the Flink SQL Kafka Connector. I'm currently having difficulty understanding how to pass and process metadata columns in a EncodingFormat. How can I make a given metadata column (in this case
topic
) available in a format used by KafkaDynamicSink? I noticed that currently the DynamicKafkaRecordSerializationSchema strips all but the requested physical columns and only passes projected rows to the serializers. Is there additional work required to support passing metadata from DynamicTableSink to the EncodingFormat? If not, how should the EncodingFormat specify what metadata columns to consume, and make them available in the row passed to serialize()? Thanks in advance for your help!
d
What version of Flink are you using?
Incorporating metadata like topic from a DynamicTableSink into an EncodingFormat, especially when using Flink’s SQL Kafka Connector, you would need to customize the serialization process slightly since, as you’ve noticed, the standard implementation focuses primarily on physical data columns.
Steps may very depending on version of Flink so as with all questions its best to indicate your version.
You’ll need to create a custom version of KafkaDynamicSink or actually its serialization component KafkaDynamicRecordSerializationSchema. You will include the topic metadata alongside the data that gets passed to the actual serialization process.
Within your custom serialization you will need to access CollectingTableSink or Datastream to where you can manipulate the records before they get serialized. Maybe use a wrapper object that contains the original data row with the metadata
For the encoding you might also need to create a custom Deserialization schema as well.
So it’s a bit of work and I would suggest you will want to also implement unit testing to make sure it’s all working as expected for the range of data you may receive.
👍 1
k
👋 Thanks Draco! We're using Flink 1.18 at the moment with the 3.01 version of the Kafka Connector. Hmm, I think I follow what you're saying, will take a deeper look. Since it requires a change to Flink code, I'd also be interested in understanding if this is something the community would want to take on as a FLIP or Flink Jira Issue
You will include the topic metadata alongside the data that gets passed to the actual serialization process.
Within your custom serialization you will need to access CollectingTableSink or Datastream to where you can manipulate the records before they get serialized. Maybe use a wrapper object that contains the original data row with the metadata
Do you know if there are existing code examples for other sink connectors that pass metadata?
d
You can open an issue on flink repo to see if this is an issue that can be resolved this way or interest within the community in addressing it as FLIP (Flink Improvement Proposal) or not. I think you might also find other workarounds this way.
Not sure about examples in other connectors for this. At a highel you would need to create the following:
Copy code
public class MetadataAwareKafkaRecordSerializationSchema 
    extends KafkaDynamicRecordSerializationSchema {

    // constructor to pass necessary configurations
    
    @Override
    public ProducerRecord<byte[], byte[]> serialize(RowData rowData, @Nullable Long timestamp) {
        // Extract topic from metadata (assuming you have a way to access it)
        String topic = ...; // implement logic to fetch topic from rowData or context
        
        // serialize your data (use the super class method or customize based on your needs)
        byte[] data = super.serialize(rowData, timestamp).value();
        
        // create a ProducerRecord that includes the topic metadata
        return new ProducerRecord<>(topic, null, timestamp, null, data);
    }
}
Then probably create a custom DynamicTableSink that sets up connection to Kafka and configures serialization logic. Finally you would register the DynamicTableSink with Table API or SQL so it can be referenced in your SQL statements.
Those are the basic steps.
gratitude thank you 1
👍 1
k
I didn't see an issue, so created https://issues.apache.org/jira/browse/FLINK-36017
d
ok great.