Hi, According to Flink documentation, with the exa...
# troubleshooting
h
Hi, According to Flink documentation, with the example Regular Joins
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
WHERE ABS(TIMESTAMPDIFF(SECOND, orderTimestamp, productTimestamp)) <= 3600
then, it will be joined with all the previous and future records on the right side when the product id equals. Suppose I have Orders and Product as two kafka arvo streams. How can I make it so that every time a new record arrives in each table, only the latest records (by productId) are joined. I cannot use time-to-live because the arrival time of the two corresponding records of Orders and Product can be up to 1 hour and the arrival time of two order records with the same productId can be very close to each other. Specific example:
Orders
orderId productId orderTimestamp 1 101 2024-08-06 100000 2 101 2024-08-06 100005
Product
id productName productTimestamp 101 Product A 2024-08-06 095500 Expected result:
Copy code
orderId	productId	orderTime	         productName	productTime
1	    101	        2024-08-06 10:00:00	 Product A	    2024-08-06 09:55:00
2	    101	        2024-08-06 10:00:05	 Product A	    2024-08-06 09:55:00
but the result is:
Copy code
orderId	productId	orderTime	            productName	productTime
1	    101	        2024-08-06  10:00:00	Product A	2024-08-06 09:55:00
2	    101	        2024-08-06  10:00:05	Product A	2024-08-06 09:55:00
1	    101	        2024-08-06  10:00:00	Product A	2024-08-06 09:55:00
d
You can achieve this using Flink’s temporal join capabilities.
First you need to make sure watermarks are properly defined for both Products and Orders.
Copy code
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Orders orders = ...; // Kafka source for Orders
Product products = ...; // Kafka source for Product

orders.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Orders>() {
    @Override
    public long extractAscendingTimestamp(Orders element) {
        return element.orderTimestamp;
    }
});

products.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Product>() {
    @Override
    public long extractAscendingTimestamp(Product element) {
        return element.productTimestamp;
    }
});
This is crucial for using Flink’s temporal joins.
2. Convert the product stream into a Temporal Table Function
Copy code
TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().setIdleStateRetentionTime(Time.hours(1), Time.hours(1));

DataStream<Product> productStream = ...; // assume this is your product stream

// register the DataStream as a Table
tEnv.createTemporaryView("Products", productStream, $("id"), $("productName"), $("productTimestamp"));

// create a Table Function representing the latest product information
TableFunction<Product> latestProduct = new LatestProductTableFunction();

// register the Table Function
tEnv.registerFunction("latestProduct", latestProduct);
the LatestProductTableFunction would be a user-defined table function that retrieves the latest product given a productId, considering the event time. 3. Perform Temporal Join using Table API
you can use the Table API to perform a temporal join between the Orders stream and latestProduct table function.
Copy code
// convert Orders stream to Table
DataStream<Orders> ordersStream = ...; // Your Orders stream
tEnv.createTemporaryView("Orders", ordersStream, $("orderId"), $("productId"), $("orderTimestamp"));

// Perform the temporal join
Table result = tEnv.sqlQuery(
    "SELECT Orders.orderId, Orders.productId, Orders.orderTimestamp, latestProduct.productName, latestProduct.productTimestamp " +
    "FROM Orders AS Orders, LATEST_BY_OFFSET(Products) AS latestProduct " + 
    "WHERE Orders.productId = latestProduct.id AND ABS(TIMESTAMPDIFF(SECOND, Orders.orderTimestamp, latestProduct.productTimestamp)) <= 3600"
);

// convert the result back to a DataStream if necessary
DataStream<ResultType> resultStream = tEnv.toAppendStream(result, ResultType.class);
The SQL clause LATEST_BY_OFFSET(Products) in the query works by using Flink’s ability to look up the latest matching record in the Products table based on provided time constraint:
Copy code
(ABS(TIMESTAMPDIFF(SECOND, Orders.orderTimestamp, latestProduct.productTimestamp)) <= 3600).
Hope this example helps, and you can try implementing Flink’s temporal joins for your specific needs.
One more thing, instead of directly using a TableFunction, you could create a derived table that always holds the latest Product per productId using time-windowed aggregation or a custom operator. ``````
h
@D. Draco O'Brien I am using FLINK SQL, can it do your suggestions?
d
I think it should work. Ensure your Flink SQL environment is set to handle event time semantics
Copy code
SET 'table.exec.time-characteristic' = 'eventtime';
Instead of using a Table Function like in the previous example you can use Flink SQL’s LATERAL JOIN clause to achieve the same thing of joining with the latest product based on eventtime.
h
@D. Draco O'Brien I don't know how to do it your way on FLINK SQL. I'm trying with Versioned Tables and using deduplication query.
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr DESC) AS rownum
FROM table_name)
WHERE rownum = 1
What do you think?
d
Yes, that does look like it could work as well.
You probably want to test it’s performance and see if it can be optimized enough compared to temporal joins.
h
can you help me with temporal joins example with FLINK SQL. I am not very familiar with java code
d
Your approach using versioned tables and deduplication via windows functions is definitely valid approach. ROW_NUMBER() and OVER clause allows you to rank records within a partition which in this case is productId. You can then filter to keep only the latest records based on order or product timestamp.
If you want to use both Flink SQL and Temporal Joins that could be the best approach actually. So you would need to first make sure that Orders and Product tables are defined with watermarks
That’s going to look something like this
Copy code
CREATE TABLE Orders (
    orderId BIGINT,
    productId BIGINT,
    orderTimestamp TIMESTAMP(3),
    WATERMARK FOR orderTimestamp AS orderTimestamp - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka', 
    -- other Kafka connector properties
);

CREATE TABLE Products (
    id BIGINT,
    productName STRING,
    productTimestamp TIMESTAMP(3),
    WATERMARK FOR productTimestamp AS productTimestamp - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka', 
    -- other Kafka connector properties
);
Once you have defined the tables you I think you would want to use FOR SYSTEM TIME AS to join orders with products.
h
I have declared tables with watermarks. How to convert a stream to Temporal Table Function in FLINK SQL?
I think this is effected by FOR SYSTEM TIME AS and does operate on the event time.
It would look something like
Copy code
WITH 
    JoinedData AS (
        SELECT 
            Orders.orderId, 
            Orders.productId, 
            Orders.orderTimestamp, 
            Products.productName, 
            Products.productTimestamp
        FROM 
            Orders
        LEFT JOIN 
            Products FOR SYSTEM_TIME AS OF Orders.orderTimestamp
        ON 
            Orders.productId = Products.id
    )
SELECT *
FROM JoinedData
WHERE ABS(TIMESTAMPDIFF(SECOND, orderTimestamp, productTimestamp)) <= 3600;
but I think to do continuous updates you maybe need to use DataStream API or Table API.
I think the benefits and reason to use Table API are clarified in this article which is quite recent: https://medium.com/@hivemind_tech/understanding-apache-flink-a-journey-from-core-concepts-to-materialised-views-b8129070acf4
Which features both temporal join and interval join.