James Watkins
06/08/2023, 10:12 AMtransactions
stream with a users
stream, both Kafka source, using the avro-confluent
value.format so that I can deserialise the Avro message using a schema registry. Looks like this format is only available in the Table API, so I have defined my source connectors using that.
I’m trying to join the streams using an Event-Time Temporal Table Join. I added a primary key to the users
table but I get the following error message:
The Kafka table 'default_catalog.default_database.users' with 'avro-confluent' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.
Is it possible to use an Event-Time Temporal Table Join with the ‘avro-confluent’ data format?James Watkins
06/12/2023, 3:01 PMJames Watkins
06/14/2023, 6:32 PMTemporalTableFunction
instead to see if gives me a bit more flexibility, but I’m getting the following error:
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot call table function here: 'versioned_users'
Not even sure if this is going to help, but I’m running out of ideas on this one otherwise. Does anyone know if Event-Time Temporal Joins are possible using the avro-confluent
format? I’m having an issue with it not supporting primary keys.
copying the programme code in the next message so it doesn’t fill up the main page:James Watkins
06/14/2023, 6:32 PMval transactionSourceQuery = """
CREATE TABLE flink_transaction (
transaction_id INT,
transaction_amount FLOAT,
--transaction_country -- there is no enum data type in Flink??
merchant_name VARCHAR,
user_id INTEGER,
created_at VARCHAR,
`kafka_ts` TIMESTAMP(3) METADATA FROM 'timestamp'
)
WITH (
'connector'= 'kafka',
'topic' = 'transaction',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<schema-registry-url>',
'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'value.avro-confluent.basic-auth.user-info' = '<credentials>',
'scan.startup.mode' = 'earliest-offset'
);
"""
val userSourceQuery = """
CREATE TABLE flink_user (
id INT,
first_name VARCHAR,
last_name VARCHAR,
email VARCHAR,
phone VARCHAR,
`kafka_ts` TIMESTAMP(3) METADATA FROM 'timestamp'
)
WITH (
'connector' = 'kafka',
'topic' = 'user',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<schema-registry-url>',
'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
'value.avro-confluent.basic-auth.user-info' = '<credentials>',
'scan.startup.mode' = 'earliest-offset'
);
"""
// register Flink table
tableEnv.executeSql(transactionSourceQuery)
// register Flink table
tableEnv.executeSql(userSourceQuery)
// Define a TemporalTableFunction
val users: TemporalTableFunction = tableEnv
.from("flink_user")
.createTemporalTableFunction(`$`("kafka_ts"), `$`("id")) // "versioning" field, and key
// register function
tableEnv.createTemporarySystemFunction("versioned_users", users)
tableEnv.executeSql("""
SELECT flink_transaction.*
FROM
flink_transaction,
LATERAL TABLE (versioned_users(kafka_ts))
WHERE
flink_transaction.transaction_id = versioned_users.id
""").print()
David Anderson
06/14/2023, 6:58 PMMatthias Broecheler
06/14/2023, 7:18 PMJames Watkins
06/14/2023, 7:47 PMkafka_ts
field but I still get the same errorJames Watkins
06/14/2023, 7:54 PMPRIMARY KEY (id) NOT ENFORCED
but I get the same error:
The Kafka table 'default_catalog.default_database.flink_user' with 'avro-confluent' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.
Wouldn’t deduping the versioned table first defeat the object of the Temporal join? My understanding is that type of join is used to retrieve the event that closely matches the event time of the record in the left table, which I’m not going to be able to do easily with a window functionMatthias Broecheler
06/14/2023, 8:42 PMMatthias Broecheler
06/14/2023, 8:43 PMJames Watkins
06/14/2023, 9:06 PMdeduped_users
though somehow? because the Temporal join requires a primary key to be defined in the versioned table, and I also need it to reference in the join, which might look something like this:
tableEnv.executeSql("""
INSERT INTO enriched_transactions
SELECT
ft.transaction_id
, ft.transaction_amount
, ft.merchant_name
, ft.user_id
, ft.created_at
, fu.first_name
, fu.last_name
, fu.email
, fu.phone
FROM transactions as ft
LEFT JOIN deduped_users FOR SYSTEM_TIME AS OF ft.kafka_ts as fu
ON ft.user_id = fu.id
"""
)
_* note, I’m joining a Flink table called deduped_users
here_Matthias Broecheler
06/14/2023, 9:11 PMJames Watkins
06/14/2023, 9:18 PMMatthias Broecheler
06/14/2023, 9:20 PMJames Watkins
06/14/2023, 9:26 PMtableEnv.executeSql("""
CREATE TABLE deduped_users (
id INT,
first_name VARCHAR,
last_name VARCHAR,
email VARCHAR,
phone VARCHAR,
`kafka_ts` TIMESTAMP(3)
);
""")
tableEnv.sqlQuery("""
INSERT INTO deduped_users
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY id ORDER BY kafka_ts ASC) AS row_num
FROM flink_user)
WHERE row_num = 1
""")
but I got this error:
Table options do not contain an option key 'connector' for discovering a connector. Therefore, Flink assumes a managed table.
The thing is I don’t particularly want to set it up as a connector, as it’s just a temporary/ intermediate Flink table that I really need.Matthias Broecheler
06/14/2023, 9:29 PMJames Watkins
06/14/2023, 9:57 PMuser_id
(either the first or last), but what if the user has changed details 4 times, for example? Every transaction in the transactions stream will join the same record for that user. I thought one of the reasons that we would use a temporal join is so that we could have, say, 4 transactions for that user, and the relevant user record will be joined to each transaction depending on the event time. What do you think?Matthias Broecheler
06/14/2023, 10:31 PMJames Watkins
06/14/2023, 10:35 PMJames Watkins
06/20/2023, 8:04 PMtransactions
) and the table on the build side (users
). I have not defined a delay interval on those watermarks (as it does not show one in this example):
`kafka_ts` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR kafka_ts AS kafka_ts
This is the Event-Time Temporal Join:
SELECT *
FROM transactions as ft
LEFT JOIN deduped_users FOR SYSTEM_TIME AS OF ft.kafka_ts as fu
ON ft.user_id = fu.id
This join is returning no results.
I think for this to work, there should be records in the left table with a kafka_ts
timestamp that is higher than the corresponding kafka_ts
timestamp in the right table. Is that correct? I’m wondering if the watermark is not progressing or something. I also tried adding a 5 second delay on the watermarks but that also didn’t help.
Would really appreciate some help to try and understand this!