Kevin Lam
08/08/2024, 2:25 PMtopic
metadata column in the context of serialization using the Flink SQL Kafka Connector.
I'm currently having difficulty understanding how to pass and process metadata columns in a EncodingFormat. How can I make a given metadata column (in this case topic
) available in a format used by KafkaDynamicSink?
I noticed that currently the DynamicKafkaRecordSerializationSchema strips all but the requested physical columns and only passes projected rows to the serializers. Is there additional work required to support passing metadata from DynamicTableSink to the EncodingFormat? If not, how should the EncodingFormat specify what metadata columns to consume, and make them available in the row passed to serialize()?
Thanks in advance for your help!D. Draco O'Brien
08/08/2024, 3:00 PMD. Draco O'Brien
08/08/2024, 3:03 PMD. Draco O'Brien
08/08/2024, 3:03 PMD. Draco O'Brien
08/08/2024, 3:06 PMD. Draco O'Brien
08/08/2024, 3:09 PMD. Draco O'Brien
08/08/2024, 3:10 PMD. Draco O'Brien
08/08/2024, 3:11 PMKevin Lam
08/08/2024, 5:00 PMKevin Lam
08/08/2024, 5:02 PMYou will include the topic metadata alongside the data that gets passed to the actual serialization process.
Within your custom serialization you will need to access CollectingTableSink or Datastream to where you can manipulate the records before they get serialized. Maybe use a wrapper object that contains the original data row with the metadataDo you know if there are existing code examples for other sink connectors that pass metadata?
D. Draco O'Brien
08/08/2024, 5:15 PMD. Draco O'Brien
08/08/2024, 5:16 PMpublic class MetadataAwareKafkaRecordSerializationSchema
extends KafkaDynamicRecordSerializationSchema {
// constructor to pass necessary configurations
@Override
public ProducerRecord<byte[], byte[]> serialize(RowData rowData, @Nullable Long timestamp) {
// Extract topic from metadata (assuming you have a way to access it)
String topic = ...; // implement logic to fetch topic from rowData or context
// serialize your data (use the super class method or customize based on your needs)
byte[] data = super.serialize(rowData, timestamp).value();
// create a ProducerRecord that includes the topic metadata
return new ProducerRecord<>(topic, null, timestamp, null, data);
}
}
D. Draco O'Brien
08/08/2024, 5:18 PMD. Draco O'Brien
08/08/2024, 5:18 PMKevin Lam
08/08/2024, 6:56 PMD. Draco O'Brien
08/08/2024, 7:07 PM