hi hi allllll... eventually got my Apache flink st...
# troubleshooting
g
hi hi allllll... eventually got my Apache flink stack build, integreated with Iceberg... got Robin's example flowing into the back end minio based S3/Iceberg store, with hive.... took some work πŸ˜‰ as said, i got his t_i_orders being populated with the metadata and data directories created and files.... I've got a flink table called t_f_unnested_sales... see below in flink, thats a unnested table in flink of my avro_salescompleted_x table. I've created a t_i_unnested_sales as per below, it creates the metadata, but nothing happens around data... please help.
Copy code
CREATE TABLE t_f_avro_salescompleted_x (
    `invoiceNumber` STRING,
    `saleDateTime_Ltz` STRING,
    `saleTimestamp_Epoc` STRING,
    `terminalPoint` STRING,
    `nett` DOUBLE,
    `vat` DOUBLE,
    `total` DOUBLE,
    `store` row<`id` STRING, `name` STRING>,
    `clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
    `basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,     
    `payDateTime_Ltz` STRING,
    `payTimestamp_Epoc` STRING,
    `paid` DOUBLE,
    `finTransactionId` STRING,
    `payTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`payTimestamp_Epoc` AS BIGINT) / 1000)),
    `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'avro_salescompleted_x',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.schema-registry.url' = '<http://schema-registry:9081>',
    'value.fields-include' = 'ALL'
);

-- the fields in the select is case sensitive, needs to match theprevious create tables which match the definitions in the struct/avro sections.
Insert into t_f_avro_salescompleted_x
select
        b.invoiceNumber,
        b.saleDateTime_Ltz,
        b.saleTimestamp_Epoc,
        b.terminalPoint,
        b.nett,
        b.vat,
        b.total,
        b.store,
        b.clerk,    
        b.basketItems,        
        a.payDateTime_Ltz,
        a.payTimestamp_Epoc,
        a.paid,
        a.finTransactionId
    FROM 
        t_k_avro_salespayments_x a,
        t_k_avro_salesbaskets_x b
    WHERE a.invoiceNumber = b.invoiceNumber
    AND a.payTimestamp_WM > b.saleTimestamp_WM 
    AND b.saleTimestamp_WM > (b.saleTimestamp_WM - INTERVAL '1' HOUR);
t_k_sales* are 2 flink tables created representing the actual kafka topics. t_f_salescompleted as can be seen build by joining the 2 tables. You can also see that t_f_salescompleted contains basketItems which is a array of items.
Copy code
CREATE TABLE t_f_unnested_sales (
    `store_id` STRING,
    `product` STRING,
    `brand` STRING,
    `saleValue` DOUBLE,
    `category` STRING,
    `saleDateTime_Ltz` STRING,
    `saleTimestamp_Epoc` STRING,
    `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'unnested_sales',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '<http://schema-registry:9081>',
    'value.fields-include' = 'ALL'
);

insert into t_f_unnested_sales
SELECT
      `store`.`id` as `store_id`,
      bi.`name` AS `product`,
      bi.`brand` AS `brand`,
      bi.`price` * bi.`quantity` AS `saleValue`,
      bi.`category` AS `category`,
      `saleDateTime_Ltz` as saleDateTime_Ltz,
      `saleTimestamp_Epoc` as saleTimestamp_Epoc
    FROM t_f_avro_salescompleted_x  -- assuming avro_salescompleted_x is a table function
    CROSS JOIN UNNEST(`basketItems`) AS bi;
Here I wanted to create a output to my Iceberg stack of the t_f_unnested_sales records.
Copy code
-- Add sink to Iceberg
CREATE TABLE t_i_unnested_sales WITH (
	  'connector'     = 'iceberg',
	  'catalog-type'  = 'hive',
	  'catalog-name'  = 'dev',
	  'warehouse'     = '<s3a://warehouse>',
	  'hive-conf-dir' = '/opt/sql-client/conf')
  AS SELECT * FROM t_f_unnested_sales;
This executes, but only a metadata object gets created... and lots of errors in the jobmanager logs.
Copy code
Flink SQL> select * from t_i_unnested_sales;
[ERROR] Could not execute SQL statement. Reason:
java.lang.AssertionError: Conversion to relational algebra failed to preserve datatypes:
validated type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" store_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" brand, DOUBLE saleValue, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" category, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" saleDateTime_Ltz, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" saleTimestamp_Epoc, TIMESTAMP(3) *ROWTIME* saleTimestamp_WM) NOT NULL
converted type:
RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" store_id, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" brand, DOUBLE saleValue, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" category, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" saleDateTime_Ltz, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" saleTimestamp_Epoc, TIMESTAMP(3) saleTimestamp_WM) NOT NULL
rel:
LogicalProject(store_id=[$0], product=[$1], brand=[$2], saleValue=[$3], category=[$4], saleDateTime_Ltz=[$5], saleTimestamp_Epoc=[$6], saleTimestamp_WM=[$7])
  LogicalTableScan(table=[[default_catalog, default_database, t_i_unnested_sales]])