Hải Nguyễn
08/12/2024, 8:03 AMSELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
WHERE ABS(TIMESTAMPDIFF(SECOND, orderTimestamp, productTimestamp)) <= 3600
then, it will be joined with all the previous and future records on the right side when the product id equals.
Suppose I have Orders and Product as two kafka arvo streams. How can I make it so that every time a new record arrives in each table, only the latest records (by productId) are joined. I cannot use time-to-live because the arrival time of the two corresponding records of Orders and Product can be up to 1 hour and the arrival time of two order records with the same productId can be very close to each other.
Specific example:
Orders
orderId productId orderTimestamp
1 101 2024-08-06 100000
2 101 2024-08-06 100005
Product
id productName productTimestamp
101 Product A 2024-08-06 095500
Expected result:
orderId productId orderTime productName productTime
1 101 2024-08-06 10:00:00 Product A 2024-08-06 09:55:00
2 101 2024-08-06 10:00:05 Product A 2024-08-06 09:55:00
but the result is:
orderId productId orderTime productName productTime
1 101 2024-08-06 10:00:00 Product A 2024-08-06 09:55:00
2 101 2024-08-06 10:00:05 Product A 2024-08-06 09:55:00
1 101 2024-08-06 10:00:00 Product A 2024-08-06 09:55:00
D. Draco O'Brien
08/12/2024, 9:40 AMD. Draco O'Brien
08/12/2024, 9:45 AMStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Orders orders = ...; // Kafka source for Orders
Product products = ...; // Kafka source for Product
orders.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Orders>() {
@Override
public long extractAscendingTimestamp(Orders element) {
return element.orderTimestamp;
}
});
products.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Product>() {
@Override
public long extractAscendingTimestamp(Product element) {
return element.productTimestamp;
}
});
D. Draco O'Brien
08/12/2024, 9:45 AMD. Draco O'Brien
08/12/2024, 9:48 AMTableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
tEnv.getConfig().setIdleStateRetentionTime(Time.hours(1), Time.hours(1));
DataStream<Product> productStream = ...; // assume this is your product stream
// register the DataStream as a Table
tEnv.createTemporaryView("Products", productStream, $("id"), $("productName"), $("productTimestamp"));
// create a Table Function representing the latest product information
TableFunction<Product> latestProduct = new LatestProductTableFunction();
// register the Table Function
tEnv.registerFunction("latestProduct", latestProduct);
D. Draco O'Brien
08/12/2024, 9:50 AMD. Draco O'Brien
08/12/2024, 9:53 AMD. Draco O'Brien
08/12/2024, 9:54 AM// convert Orders stream to Table
DataStream<Orders> ordersStream = ...; // Your Orders stream
tEnv.createTemporaryView("Orders", ordersStream, $("orderId"), $("productId"), $("orderTimestamp"));
// Perform the temporal join
Table result = tEnv.sqlQuery(
"SELECT Orders.orderId, Orders.productId, Orders.orderTimestamp, latestProduct.productName, latestProduct.productTimestamp " +
"FROM Orders AS Orders, LATEST_BY_OFFSET(Products) AS latestProduct " +
"WHERE Orders.productId = latestProduct.id AND ABS(TIMESTAMPDIFF(SECOND, Orders.orderTimestamp, latestProduct.productTimestamp)) <= 3600"
);
// convert the result back to a DataStream if necessary
DataStream<ResultType> resultStream = tEnv.toAppendStream(result, ResultType.class);
D. Draco O'Brien
08/12/2024, 10:12 AM(ABS(TIMESTAMPDIFF(SECOND, Orders.orderTimestamp, latestProduct.productTimestamp)) <= 3600).
D. Draco O'Brien
08/12/2024, 10:15 AMD. Draco O'Brien
08/12/2024, 10:32 AMHải Nguyễn
08/13/2024, 6:45 AMD. Draco O'Brien
08/13/2024, 7:00 AMSET 'table.exec.time-characteristic' = 'eventtime';
D. Draco O'Brien
08/13/2024, 7:04 AMHải Nguyễn
08/13/2024, 7:05 AMSELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr DESC) AS rownum
FROM table_name)
WHERE rownum = 1
What do you think?D. Draco O'Brien
08/13/2024, 7:08 AMD. Draco O'Brien
08/13/2024, 7:12 AMHải Nguyễn
08/13/2024, 7:14 AMD. Draco O'Brien
08/13/2024, 7:15 AMD. Draco O'Brien
08/13/2024, 7:20 AMD. Draco O'Brien
08/13/2024, 7:23 AMCREATE TABLE Orders (
orderId BIGINT,
productId BIGINT,
orderTimestamp TIMESTAMP(3),
WATERMARK FOR orderTimestamp AS orderTimestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
-- other Kafka connector properties
);
CREATE TABLE Products (
id BIGINT,
productName STRING,
productTimestamp TIMESTAMP(3),
WATERMARK FOR productTimestamp AS productTimestamp - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
-- other Kafka connector properties
);
D. Draco O'Brien
08/13/2024, 7:44 AMHải Nguyễn
08/13/2024, 7:45 AMD. Draco O'Brien
08/13/2024, 7:45 AMD. Draco O'Brien
08/13/2024, 7:46 AMD. Draco O'Brien
08/13/2024, 7:48 AMWITH
JoinedData AS (
SELECT
Orders.orderId,
Orders.productId,
Orders.orderTimestamp,
Products.productName,
Products.productTimestamp
FROM
Orders
LEFT JOIN
Products FOR SYSTEM_TIME AS OF Orders.orderTimestamp
ON
Orders.productId = Products.id
)
SELECT *
FROM JoinedData
WHERE ABS(TIMESTAMPDIFF(SECOND, orderTimestamp, productTimestamp)) <= 3600;
D. Draco O'Brien
08/13/2024, 7:53 AMD. Draco O'Brien
08/13/2024, 7:54 AMD. Draco O'Brien
08/13/2024, 7:58 AMD. Draco O'Brien
08/13/2024, 8:11 AMD. Draco O'Brien
08/13/2024, 8:12 AM