Hello, I’m trying to enrich a `transactions` strea...
# troubleshooting
j
Hello, I’m trying to enrich a
transactions
stream with a
users
stream, both Kafka source, using the
avro-confluent
value.format so that I can deserialise the Avro message using a schema registry. Looks like this format is only available in the Table API, so I have defined my source connectors using that. I’m trying to join the streams using an Event-Time Temporal Table Join. I added a primary key to the
users
table but I get the following error message:
Copy code
The Kafka table 'default_catalog.default_database.users' with 'avro-confluent' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.
Is it possible to use an Event-Time Temporal Table Join with the ‘avro-confluent’ data format?
Is anyone able to help with this?
I have tried to register a
TemporalTableFunction
instead to see if gives me a bit more flexibility, but I’m getting the following error:
Copy code
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot call table function here: 'versioned_users'
Not even sure if this is going to help, but I’m running out of ideas on this one otherwise. Does anyone know if Event-Time Temporal Joins are possible using the
avro-confluent
format? I’m having an issue with it not supporting primary keys. copying the programme code in the next message so it doesn’t fill up the main page:
Copy code
val transactionSourceQuery = """
        CREATE TABLE flink_transaction (
            transaction_id INT,
            transaction_amount FLOAT,
            --transaction_country -- there is no enum data type in Flink??
            merchant_name VARCHAR,
            user_id INTEGER,
            created_at VARCHAR,
            `kafka_ts` TIMESTAMP(3) METADATA FROM 'timestamp'
        )
        WITH (
        'connector'= 'kafka',
        'topic' = 'transaction',
        'properties.bootstrap.servers' = 'localhost:9092',
        'value.format' = 'avro-confluent',
        'value.avro-confluent.url' = '<schema-registry-url>',
        'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
        'value.avro-confluent.basic-auth.user-info' = '<credentials>',
        'scan.startup.mode' = 'earliest-offset'
        );
        """

    val userSourceQuery = """
        CREATE TABLE flink_user (
            id INT,
            first_name VARCHAR,
            last_name VARCHAR,
            email VARCHAR,
            phone VARCHAR,
            `kafka_ts` TIMESTAMP(3) METADATA FROM 'timestamp'
        )
        WITH (
        'connector' = 'kafka',
        'topic' = 'user',
        'properties.bootstrap.servers' = 'localhost:9092',
        'value.format' = 'avro-confluent',
        'value.avro-confluent.url' = '<schema-registry-url>',
        'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
        'value.avro-confluent.basic-auth.user-info' = '<credentials>',
        'scan.startup.mode' = 'earliest-offset'
        );
        """

    // register Flink table
    tableEnv.executeSql(transactionSourceQuery)

    // register Flink table
    tableEnv.executeSql(userSourceQuery)

    // Define a TemporalTableFunction
    val users: TemporalTableFunction = tableEnv
        .from("flink_user")
        .createTemporalTableFunction(`$`("kafka_ts"), `$`("id"))  // "versioning" field, and key

    // register function
    tableEnv.createTemporarySystemFunction("versioned_users", users)

    tableEnv.executeSql("""
        SELECT flink_transaction.*
        FROM
          flink_transaction,
          LATERAL TABLE (versioned_users(kafka_ts))
        WHERE
          flink_transaction.transaction_id = versioned_users.id
    """).print()
d
The kafka_ts field needs to have watermarks defined on it.
m
@James Watkins Regarding primary key: have you tried declaring it as "not enforced" (see https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/). If that doesn't work, try deduplication by primary column before you do the temporal join: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/deduplication/ (not an expert - just putting out some ideas that might help)
j
Hi @David Anderson, I have it also tried with a watermark on the
kafka_ts
field but I still get the same error
Hi @Matthias Broecheler, I’ve tried also with
PRIMARY KEY (id) NOT ENFORCED
but I get the same error:
Copy code
The Kafka table 'default_catalog.default_database.flink_user' with 'avro-confluent' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.
Wouldn’t deduping the versioned table first defeat the object of the Temporal join? My understanding is that type of join is used to retrieve the event that closely matches the event time of the record in the left table, which I’m not going to be able to do easily with a window function
m
Hey James, if I understand your use case correctly, you may be able to achieve this by NOT putting a primary key on the users table when you import it (and thus importing the whole change stream) and then deduping the stream on user id after ward which has the same effect. Then you can do a temporal join between transactions and dedup-users and it will join with the version of users at the time of the transaction.
When you do the dedup, make sure you do it on the rowtime of the user change stream to preserve the rowtime for the temporal join.
👍🏻 1
j
When you say import it, are you referring to creating the Flink table? I should note I’m doing most of this with just SQL at the moment. I guess I would have to create another Flink table from
deduped_users
though somehow? because the Temporal join requires a primary key to be defined in the versioned table, and I also need it to reference in the join, which might look something like this:
Copy code
tableEnv.executeSql("""
        INSERT INTO enriched_transactions
        SELECT
            ft.transaction_id
            , ft.transaction_amount
            , ft.merchant_name
            , ft.user_id
            , ft.created_at
            , fu.first_name
            , fu.last_name
            , fu.email
            , fu.phone
        FROM transactions as ft
        LEFT JOIN deduped_users FOR SYSTEM_TIME AS OF ft.kafka_ts as fu
            ON ft.user_id = fu.id
        """
    )
_* note, I’m joining a Flink table called
deduped_users
here_
m
The partition by columns become the primary key of the deduped_users table/view. Check out the example from the Flink docs: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/deduplication/ order_id is the primary key of the resulting view when you apply the dedup query.
j
I see, that’s neat. And then would I need to create a new table from that so that I can reference it in my final temporal join?
m
Up to you. You can inline it or make it a separate statement so it's easier to read.
j
I tried creating a separate statement like this:
Copy code
tableEnv.executeSql("""
    CREATE TABLE deduped_users (
        id INT,
        first_name VARCHAR,
        last_name VARCHAR,
        email VARCHAR,
        phone VARCHAR,
        `kafka_ts` TIMESTAMP(3)
    );
    """)

    tableEnv.sqlQuery("""
        INSERT INTO deduped_users
        SELECT *
        FROM (
          SELECT *, 
            ROW_NUMBER() OVER (PARTITION BY id ORDER BY kafka_ts ASC) AS row_num
          FROM flink_user)
        WHERE row_num = 1
    """)
but I got this error:
Copy code
Table options do not contain an option key 'connector' for discovering a connector. Therefore, Flink assumes a managed table.
The thing is I don’t particularly want to set it up as a connector, as it’s just a temporary/ intermediate Flink table that I really need.
m
Yes, tables are only for import and export. You want to create a view with the CREATE VIEW syntax.
j
Thanks for sticking with me @Matthias Broecheler 🙂 I’m not getting errors now at least. It’s not returning any results, but that could be something related to the timestamps that the messages were produced to Kafka or maybe the watermarks or something. I will have to have a deeper dig into that. The thing I’m uncertain about with this method though, is we’re deduping the user table, so we’ll only get 1 record per
user_id
(either the first or last), but what if the user has changed details 4 times, for example? Every transaction in the transactions stream will join the same record for that user. I thought one of the reasons that we would use a temporal join is so that we could have, say, 4 transactions for that user, and the relevant user record will be joined to each transaction depending on the event time. What do you think?
m
Yay, great progress. This should be doing what you want. When you do the join with the SYSTEM_TIME AS OF the Flink query engine will kinda undo the dedup and keep all the versions up to the last watermark so that it can join the stream with the "correct" (in time) version of the user. At least, that's how I understand it. That's why it's important to get the watermarks and timestamps right when you do a temporal join - otherwise you get no or incorrect results.
j
That’s great, thanks so much for your help!
Hi @David Anderson, would you mind clarifying how the watermarks work in this case i.e. when using an event-time temporal join? I have defined the same watermark using the Kafka metadata timestamp on the table on the probe side (
transactions
) and the table on the build side (
users
). I have not defined a delay interval on those watermarks (as it does not show one in this example):
Copy code
`kafka_ts` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR kafka_ts AS kafka_ts
This is the Event-Time Temporal Join:
Copy code
SELECT *
FROM transactions as ft
LEFT JOIN deduped_users FOR SYSTEM_TIME AS OF ft.kafka_ts as fu
    ON ft.user_id = fu.id
This join is returning no results. I think for this to work, there should be records in the left table with a
kafka_ts
timestamp that is higher than the corresponding
kafka_ts
timestamp in the right table. Is that correct? I’m wondering if the watermark is not progressing or something. I also tried adding a 5 second delay on the watermarks but that also didn’t help. Would really appreciate some help to try and understand this!