Hello, I can see in the documentation that I can e...
# troubleshooting
j
Hello, I can see in the documentation that I can extract the timestamp from kafka metadata when reading from kafka in sql like so
Copy code
`ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
But how do I set the timestamp in kafka when writing to kafka?
d
Are you using Flink SQL or Table API?
j
@D. Draco O'Brien Flink SQL
d
Ok, in that case you want to define a sink table
This will points to the topic you want to write to. In Flink SQL, you don’t directly set the timestamp for Kafka messages within table definition. Instead, ensure the timestamp column is part of the data you’re inserting.
So definition for sink table would be something like this if you’re using json format.
Copy code
CREATE TABLE kafka_sink (
    data STRING,
    processing_time TIMESTAMP(3),  -- or event_time depending on your use case
    -- Other fields...
) WITH (
    'connector' = 'kafka',
    'topic' = 'your-sink-topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'key.format' = 'json',  
    'value.format' = 'json', 
    -- Additional properties as needed
);
👍 1
Then just insert the data into the sink table so that the timestamp column carries the timestamp you want to associate with the Kafka message.