Oscar Perez
05/26/2023, 1:58 PMMartijn Visser
05/26/2023, 5:04 PMOscar Perez
05/26/2023, 5:53 PMtableEnv.executeSql(
"""
CREATE TEMPORARY TABLE payments
(
userId STRING,
country STRING,
acceptedTime TIMESTAMP(3),
eventMetadata row(eventId STRING, eventTime TIMESTAMP(3)),
PRIMARY KEY (userId) NOT ENFORCED
) WITH (
'connector' = 'kafka',....
or alternatively, like this:
val paymentTable = tableEnv.fromDataStream(
payment,
Schema.newBuilder()
.column(....
assuming that the payment datastream comes already from the very same kafka topic as in #1. we use env.fromSource() for this. My question is if there is an equivalent method to create a temporary table that emulates the upsert-kafka connector. Thanks!Martijn Visser
05/26/2023, 5:55 PMOscar Perez
05/26/2023, 6:01 PMMartijn Visser
05/26/2023, 6:46 PM