George Leonard
07/23/2024, 3:05 PMCREATE CATALOG c_iceberg_hive WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'warehouse' = '<s3a://warehouse>',
'hive-conf-dir' = './conf');
CREATE DATABASE `c_iceberg_hive`.`db01`;
and then
CREATE TABLE `c_iceberg_hive`.`db01`.t_f_unnested_sales (
`store_id` STRING,
`product` STRING,
`brand` STRING,
`saleValue` DOUBLE,
`category` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'unnested_sales',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
then it results in
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Creating table with computed columns is not supported yet.
if I change catalog:
CREATE CATALOG c_hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/flink/conf');
CREATE DATABASE `c_hive`.`db01`;
and then
CREATE TABLE `c_hive`.`db01`.t_f_unnested_sales (
`store_id` STRING,
`product` STRING,
`brand` STRING,
`saleValue` DOUBLE,
`category` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'unnested_sales',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
this works...
but when i now try and use this in.
CREATE TABLE `c_hive`.`db01`.t_f_unnested_sales (
`store_id` STRING,
`product` STRING,
`brand` STRING,
`saleValue` DOUBLE,
`category` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'unnested_sales',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
insert into `c_hive`.`db01`.t_f_unnested_sales
SELECT
`store`.`id` as `store_id`,
bi.`name` AS `product`,
bi.`brand` AS `brand`,
bi.`price` * bi.`quantity` AS `saleValue`,
bi.`category` AS `category`,
`saleDateTime_Ltz` as saleDateTime_Ltz,
`saleTimestamp_Epoc` as saleTimestamp_Epoc
FROM `c_hive`.`db01`.t_f_avro_salescompleted_x -- assuming avro_salescompleted_x is a table function
CROSS JOIN UNNEST(`basketItems`) AS bi;
And now try and push data to iceberg on minio.
CREATE TABLE t_i_unnested_sales WITH (
'connector' = 'iceberg',
'catalog-type' = 'hive',
'catalog-name' = 'c_hive',
'warehouse' = '<s3a://warehouse>',
'hive-conf-dir' = './conf/')
AS SELECT * FROM t_f_unnested_sales;
now results in:
[ERROR] Could not execute SQL statement. Reason:
org.apache.iceberg.exceptions.NoSuchIcebergTableException: Not an iceberg table: c_hive.db01.t_i_unnested_sales (type=null)
seems to be originating out of the catalog type...rmoff
07/23/2024, 3:09 PMCREATE TABLE t_i_unnested_sales WITH (
rmoff
07/23/2024, 3:10 PMinsert into `c_hive`.`db01`.t_f_unnested_sales
George Leonard
07/23/2024, 3:11 PMdb01
;`
before executeGeorge Leonard
07/23/2024, 3:11 PMc_hive
.db01
.t_f_unnested_salesGeorge Leonard
07/23/2024, 3:13 PMGeorge Leonard
07/23/2024, 3:18 PMlink SQL> insert into `c_hive`.`db01`.t_f_unnested_sales
> SELECT
> `store`.`id` as `store_id`,
> bi.`name` AS `product`,
> bi.`brand` AS `brand`,
> bi.`price` * bi.`quantity` AS `saleValue`,
> bi.`category` AS `category`,
> `saleDateTime_Ltz` as saleDateTime_Ltz,
> `saleTimestamp_Epoc` as saleTimestamp_Epoc
> FROM `c_hive`.`db01`.t_f_avro_salescompleted_x -- assuming avro_salescompleted_x is a table function
> CROSS JOIN UNNEST(`basketItems`) AS bi;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7c0e494acae75a062ec6dc00b6ab627a
Flink SQL> select * from `c_hive`.`db01`.t_f_unnested_sales;
Records returned.
Flink SQL> SET 'execution.checkpointing.interval' = '60sec';
[INFO] Execute statement succeed.
Flink SQL> SET 'pipeline.operator-chaining.enabled' = 'false';
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE `c_hive`.`db01`.t_i_unnested_sales WITH (
> 'connector' = 'iceberg',
> 'catalog-type' = 'hive',
> 'catalog-name' = 'c_hive',
> 'warehouse' = '<s3a://warehouse>',
> 'hive-conf-dir' = './conf')
> AS SELECT * FROM t_f_unnested_sales;
[ERROR] Could not execute SQL statement. Reason:
org.apache.iceberg.exceptions.NoSuchIcebergTableException: Not an iceberg table: c_hive.db01.t_i_unnested_sales (type=null)
George Leonard
07/23/2024, 4:45 PMCREATE CATALOG c_hive WITH (
'type' = 'hive',
'hive-conf-dir' = './conf');
defined at the moment, as I created tabkes in flink i can see "records or is it objects" created in minio. If i click on them, there is nothing however.
I exited flink sql, and re-entered, i just needed to rerun the above and execute a use <cat>.<db>
I could then select from the tables which are purely consuiming from kafka.
I could also re-execute my insert statements which re-created the flink jobs.George Leonard
07/23/2024, 4:47 PMGeorge Leonard
07/23/2024, 6:37 PM