ok... stuck. if I: ```CREATE CATALOG c_iceberg_hiv...
# troubleshooting
g
ok... stuck. if I:
Copy code
CREATE CATALOG c_iceberg_hive WITH (
        'type'          = 'iceberg',
        'catalog-type'  = 'hive',
        'warehouse'     = '<s3a://warehouse>',
        'hive-conf-dir' = './conf');

CREATE DATABASE `c_iceberg_hive`.`db01`;
and then
Copy code
CREATE TABLE `c_iceberg_hive`.`db01`.t_f_unnested_sales (
    `store_id` STRING,
    `product` STRING,
    `brand` STRING,
    `saleValue` DOUBLE,
    `category` STRING,
    `saleDateTime_Ltz` STRING,
    `saleTimestamp_Epoc` STRING,
    `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'unnested_sales',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '<http://schema-registry:9081>',
    'value.fields-include' = 'ALL'
);
then it results in
Copy code
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Creating table with computed columns is not supported yet.
if I change catalog:
Copy code
CREATE CATALOG c_hive WITH (
        'type' = 'hive',
        'hive-conf-dir' = '/opt/flink/conf');

CREATE DATABASE `c_hive`.`db01`;
and then
Copy code
CREATE TABLE `c_hive`.`db01`.t_f_unnested_sales (
    `store_id` STRING,
    `product` STRING,
    `brand` STRING,
    `saleValue` DOUBLE,
    `category` STRING,
    `saleDateTime_Ltz` STRING,
    `saleTimestamp_Epoc` STRING,
    `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'unnested_sales',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '<http://schema-registry:9081>',
    'value.fields-include' = 'ALL'
);
this works... but when i now try and use this in.
Copy code
CREATE TABLE `c_hive`.`db01`.t_f_unnested_sales (
    `store_id` STRING,
    `product` STRING,
    `brand` STRING,
    `saleValue` DOUBLE,
    `category` STRING,
    `saleDateTime_Ltz` STRING,
    `saleTimestamp_Epoc` STRING,
    `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
    WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
    'connector' = 'kafka',
    'topic' = 'unnested_sales',
    'properties.bootstrap.servers' = 'broker:29092',
    'properties.group.id' = 'testGroup',
    'scan.startup.mode' = 'earliest-offset',
    'value.format' = 'avro-confluent',
    'value.avro-confluent.url' = '<http://schema-registry:9081>',
    'value.fields-include' = 'ALL'
);

insert into `c_hive`.`db01`.t_f_unnested_sales
SELECT
      `store`.`id` as `store_id`,
      bi.`name` AS `product`,
      bi.`brand` AS `brand`,
      bi.`price` * bi.`quantity` AS `saleValue`,
      bi.`category` AS `category`,
      `saleDateTime_Ltz` as saleDateTime_Ltz,
      `saleTimestamp_Epoc` as saleTimestamp_Epoc
    FROM `c_hive`.`db01`.t_f_avro_salescompleted_x  -- assuming avro_salescompleted_x is a table function
    CROSS JOIN UNNEST(`basketItems`) AS bi;
And now try and push data to iceberg on minio.
Copy code
CREATE TABLE t_i_unnested_sales WITH (
     'connector'     = 'iceberg',
     'catalog-type'  = 'hive',
     'catalog-name'  = 'c_hive',
     'warehouse'     = '<s3a://warehouse>',
     'hive-conf-dir' = './conf/')
   AS SELECT * FROM t_f_unnested_sales;
now results in:
Copy code
[ERROR] Could not execute SQL statement. Reason:
org.apache.iceberg.exceptions.NoSuchIcebergTableException: Not an iceberg table: c_hive.db01.t_i_unnested_sales (type=null)
seems to be originating out of the catalog type...
r
Which catalog are you creating this in? Unlike your other statements you've not qualified it
Copy code
CREATE TABLE t_i_unnested_sales WITH (
and does this succeed?
Copy code
insert into `c_hive`.`db01`.t_f_unnested_sales
g
creating it qualified or not resulted in same. i did a: use `c_hive.
db01
;` before execute
yes: insert into
c_hive
.
db01
.t_f_unnested_sales
i can do all with full qualification or with none (after issuing use <catalog>.<database>;) same result.
Copy code
link SQL> insert into `c_hive`.`db01`.t_f_unnested_sales
> SELECT
>       `store`.`id` as `store_id`,
>       bi.`name` AS `product`,
>       bi.`brand` AS `brand`,
>       bi.`price` * bi.`quantity` AS `saleValue`,
>       bi.`category` AS `category`,
>       `saleDateTime_Ltz` as saleDateTime_Ltz,
>       `saleTimestamp_Epoc` as saleTimestamp_Epoc
>     FROM `c_hive`.`db01`.t_f_avro_salescompleted_x  -- assuming avro_salescompleted_x is a table function
>     CROSS JOIN UNNEST(`basketItems`) AS bi;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7c0e494acae75a062ec6dc00b6ab627a


Flink SQL> select * from `c_hive`.`db01`.t_f_unnested_sales;
Records returned.
Copy code
Flink SQL> SET 'execution.checkpointing.interval' = '60sec';
[INFO] Execute statement succeed.

Flink SQL> SET 'pipeline.operator-chaining.enabled' = 'false';
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE `c_hive`.`db01`.t_i_unnested_sales WITH (
>     'connector'     = 'iceberg',
>     'catalog-type'  = 'hive',
>     'catalog-name'  = 'c_hive',
>     'warehouse'     = '<s3a://warehouse>',
>     'hive-conf-dir' = './conf')
>   AS SELECT * FROM t_f_unnested_sales;
[ERROR] Could not execute SQL statement. Reason:
org.apache.iceberg.exceptions.NoSuchIcebergTableException: Not an iceberg table: c_hive.db01.t_i_unnested_sales (type=null)
nice part... with :
Copy code
CREATE CATALOG c_hive WITH (
        'type' = 'hive',
        'hive-conf-dir' = './conf');
defined at the moment, as I created tabkes in flink i can see "records or is it objects" created in minio. If i click on them, there is nothing however. I exited flink sql, and re-entered, i just needed to rerun the above and execute a use <cat>.<db> I could then select from the tables which are purely consuiming from kafka. I could also re-execute my insert statements which re-created the flink jobs.
this is wth type "hive" though. guessing i need to switch to type = "iceberg" problem then it seems to complain about derived fields, which are not supported.
... could it be caused by hms, the version referred to in the Dockerfile is amd64 only, my laptop is a M1 arm64 based MBP.