George Leonard
07/22/2024, 4:50 PMCREATE TABLE t_f_avro_salescompleted_x (
`invoiceNumber` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`terminalPoint` STRING,
`nett` DOUBLE,
`vat` DOUBLE,
`total` DOUBLE,
`store` row<`id` STRING, `name` STRING>,
`clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
`basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
`payDateTime_Ltz` STRING,
`payTimestamp_Epoc` STRING,
`paid` DOUBLE,
`finTransactionId` STRING,
`payTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`payTimestamp_Epoc` AS BIGINT) / 1000)),
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'avro_salescompleted_x',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
-- the fields in the select is case sensitive, needs to match theprevious create tables which match the definitions in the struct/avro sections.
Insert into t_f_avro_salescompleted_x
select
b.invoiceNumber,
b.saleDateTime_Ltz,
b.saleTimestamp_Epoc,
b.terminalPoint,
b.nett,
b.vat,
b.total,
b.store,
b.clerk,
b.basketItems,
a.payDateTime_Ltz,
a.payTimestamp_Epoc,
a.paid,
a.finTransactionId
FROM
t_k_avro_salespayments_x a,
t_k_avro_salesbaskets_x b
WHERE a.invoiceNumber = b.invoiceNumber
AND a.payTimestamp_WM > b.saleTimestamp_WM
AND b.saleTimestamp_WM > (b.saleTimestamp_WM - INTERVAL '1' HOUR);
t_k_sales* are 2 flink tables created representing the actual kafka topics.
t_f_salescompleted as can be seen build by joining the 2 tables.
You can also see that t_f_salescompleted contains basketItems which is a array of items.
CREATE TABLE t_f_unnested_sales (
`store_id` STRING,
`product` STRING,
`brand` STRING,
`saleValue` DOUBLE,
`category` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'unnested_sales',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
insert into t_f_unnested_sales
SELECT
`store`.`id` as `store_id`,
bi.`name` AS `product`,
bi.`brand` AS `brand`,
bi.`price` * bi.`quantity` AS `saleValue`,
bi.`category` AS `category`,
`saleDateTime_Ltz` as saleDateTime_Ltz,
`saleTimestamp_Epoc` as saleTimestamp_Epoc
FROM t_f_avro_salescompleted_x -- assuming avro_salescompleted_x is a table function
CROSS JOIN UNNEST(`basketItems`) AS bi;
Here I wanted to create a output to my Iceberg stack of the t_f_unnested_sales records.
-- Add sink to Iceberg
CREATE TABLE t_i_unnested_sales WITH (
'connector' = 'iceberg',
'catalog-type' = 'hive',
'catalog-name' = 'dev',
'warehouse' = '<s3a://warehouse>',
'hive-conf-dir' = '/opt/sql-client/conf')
AS SELECT * FROM t_f_unnested_sales;
This executes, but only a metadata object gets created... and lots of errors in the jobmanager logs.
Flink SQL> select * from t_i_unnested_sales;
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" store_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" brand, DOUBLE saleValue, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" category, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" saleDateTime_Ltz, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" saleTimestamp_Epoc, TIMESTAMP(3) *ROWTIME* saleTimestamp_WM) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" store_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" brand, DOUBLE saleValue, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" category, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" saleDateTime_Ltz, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" saleTimestamp_Epoc, TIMESTAMP(3) saleTimestamp_WM) NOT NULL
rel:
LogicalProject(store_id=[$0], product=[$1], brand=[$2], saleValue=[$3], category=[$4], saleDateTime_Ltz=[$5], saleTimestamp_Epoc=[$6], saleTimestamp_WM=[$7])
LogicalTableScan(table=[[default_catalog, default_database, t_i_unnested_sales]])