Nicolas P
06/07/2023, 8:53 AMupdateTime
field to only have the latest value (this way we could handle late messages properly).
As far as I understood the kafka connector to MySQL doesn't handle conditional upsert nor stored procedure.
It's not clear to me either the Flink kafka upsert table sink does handle conditional upsert or not? Would you know if it does? Would you have any other ideas on how to proceed?Nicolas P
06/07/2023, 9:04 AMupdateTime
but this solution wouldn't be robust in the case of blue-green deployement where 2 Flink clusters are updating the table.sap1ens
06/07/2023, 3:39 PMsap1ens
06/07/2023, 3:41 PMsap1ens
06/07/2023, 3:43 PMNicolas P
06/08/2023, 6:59 AMNicolas P
06/08/2023, 8:41 AMMyDataStreamTable
using for example fromDataStream
• Create an upsert-kafka table
CREATE TABLE pageviews_per_region (
region STRING,
view_count BIGINT,
last_update TIMESTAMP,
PRIMARY KEY (region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = '...',
'key.format' = 'csv',
'value.format' = 'avro'
);
• Do the insert :
INSERT INTO pageviews_per_region
SELECT
region,
view_count,
last_update
FROM MyDataStreamTable
WHERE MyDataStreamTable.last_update > pageviews_per_region.last_update
Nicolas P
06/08/2023, 8:41 AMWHERE MyDataStreamTable.last_update > pageviews_per_region.last_update
is supported or notsap1ens
06/08/2023, 3:53 PM