George Leonard
09/13/2024, 5:03 PMCREATE OR REPLACE TABLE c_hive.db01.t_f_unnested_sales (
`store_id` STRING,
`product` STRING,
`brand` STRING,
`subtotal` DOUBLE,
`category` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`,
PRIMARY KEY (`store_id`, `product`, `brand`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka'
,'topic' = 't_f_unnested_sales'
,'properties.bootstrap.servers' = 'broker:29092'
,'value.fields-include' = 'ALL'
,'value.format' = 'avro-confluent'
,'value.avro-confluent.schema-registry.url'= '<http://schema-registry:9081>'
,'key.format' = 'avro-confluent'
,'key.avro-confluent.schema-registry.url' = '<http://schema-registry:9081>'
,'properties.group.id' = 'mysqlcdcsourced'
,'value.fields-include' = 'ALL'
);
INSERT INTO c_hive.db01.t_f_unnested_sales
SELECT
`store`.`id` as `store_id`,
bi.`name` AS `product`,
bi.`brand` AS `brand`,
bi.`subtotal` AS `subtotal`,
bi.`category` AS `category`,
`saleDateTime_Ltz` as saleDateTime_Ltz,
`saleTimestamp_Epoc` as saleTimestamp_Epoc
FROM c_hive.db01.t_f_avro_salescompleted -- assuming avro_salescompleted is a table function
CROSS JOIN UNNEST(`basketItems`) AS bi;
CREATE TABLE c_paimon.dev.t_unnested_sales WITH (
'bucket' = '4',
'bucket-key' = 'store_id'
) AS SELECT
`store_id`,
`product` ,
`brand` ,
`subtotal`,
`category`,
`saleDateTime_Ltz`,
`saleTimestamp_Epoc`
FROM c_hive.db01.t_f_unnested_sales;
George Leonard
09/17/2024, 5:36 AM