Hello there, I'm trying to implement a conditiona...
# troubleshooting
n
Hello there, I'm trying to implement a conditional upsert in a MySQL table from Flink. Currently, Flink outputs rows in a kafka topic and then kafka connect will upsert the rows in a MySQL table. What we want to achieve is to have a conditional upsert based on a timestamp
updateTime
field to only have the latest value (this way we could handle late messages properly). As far as I understood the kafka connector to MySQL doesn't handle conditional upsert nor stored procedure. It's not clear to me either the Flink kafka upsert table sink does handle conditional upsert or not? Would you know if it does? Would you have any other ideas on how to proceed?
Of course, we could handle this in Flink with a state storing the last
updateTime
but this solution wouldn't be robust in the case of blue-green deployement where 2 Flink clusters are updating the table.
s
It doesn't it out of the box, but I've implemented it for Postgres
However, it's not enough. If you process changelog data, you'd need to also patch several executor files, e.g. this one: https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/[…]jdbc/internal/executor/TableBufferReducedStatementExecutor.java The default implementation could lead to the data loss, since the "reducer" won't take your condition into account.
n
Hi! Thx a lot for those pointers, I'm taking a look !
And would you know if I can natively do a conditional upsert using the upsert Kafka Connector? If I understand it correctly, I would need to : • convert my datastream into a Flink table environment let's say
MyDataStreamTable
using for example
fromDataStream
• Create an upsert-kafka table
Copy code
CREATE TABLE pageviews_per_region (
  region STRING,
  view_count BIGINT,
  last_update TIMESTAMP, 
  PRIMARY KEY (region) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'pageviews_per_region',
  'properties.bootstrap.servers' = '...',
  'key.format' = 'csv',
  'value.format' = 'avro'
);
• Do the insert :
Copy code
INSERT INTO pageviews_per_region
SELECT
  region,
  view_count,
   last_update
FROM MyDataStreamTable
WHERE MyDataStreamTable.last_update > pageviews_per_region.last_update
What I'm not sure is either the
Copy code
WHERE MyDataStreamTable.last_update > pageviews_per_region.last_update
is supported or not
s
Semantically it may work, but I’d double check if it performs any kind of buffering internally.
🙏 1