George Leonard
07/24/2024, 9:56 AMFlink SQL> CREATE CATALOG c_iceberg_jdbc WITH (
> 'type' = 'iceberg',
> 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
> 'warehouse' = '<s3://warehouse>',
> 's3.endpoint' = '<http://minio:9000>',
> 's3.path-style-access' = 'true',
> 'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog',
> 'uri' ='jdbc:<postgresql://postgres:5432/?user=dba&password=rules>');
[INFO] Execute statement succeed.
Flink SQL> USE `c_iceberg_jdbc`.`db01`;
[INFO] Execute statement succeed.
Flink SQL>
> CREATE TABLE t_foo (c1 varchar, c2 int);
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO t_foo VALUES ('a',42);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b0d9bd91b766f4ebb772a4004053a3ea
Flink SQL> SET 'execution.runtime-mode' = 'batch';
>
[INFO] Execute statement succeed.
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
>
[INFO] Execute statement succeed.
Flink SQL> SELECT * FROM t_foo;
>
+----+----+
| c1 | c2 |
+----+----+
| a | 42 |
+----+----+
1 row in set
Flink SQL> CREATE or replace TABLE t_k_avro_salesbaskets_x (
> `invoiceNumber` STRING,
> `saleDateTime_Ltz` STRING,
> `saleTimestamp_Epoc` STRING,
> `terminalPoint` STRING,
> `nett` DOUBLE,
> `vat` DOUBLE,
> `total` DOUBLE,
> `store` row<`id` STRING, `name` STRING>,
> `clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
> `basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
> `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
> WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'avro_salesbaskets',
> 'properties.bootstrap.servers' = 'broker:29092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.schema-registry.url' = '<http://schema-registry:9081>',
> 'value.fields-include' = 'ALL'
> );
ERROR:
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Creating table with computed columns is not supported yet.
D. Draco O'Brien
07/24/2024, 12:10 PMGeorge Leonard
07/24/2024, 12:13 PMCREATE CATALOG c_hive WITH (
'type' = 'hive',
'hive-conf-dir' = './conf/');
George Leonard
07/24/2024, 12:14 PMD. Draco O'Brien
07/24/2024, 12:14 PMD. Draco O'Brien
07/24/2024, 12:15 PMGeorge Leonard
07/24/2024, 12:19 PMGeorge Leonard
07/24/2024, 12:20 PMCREATE CATALOG c_hive WITH (
'type' = 'hive',
'hive-conf-dir' = './conf/');
then the creates work... up to trying to created that t_i_unnested_sales table... then it complains thats not iceber.
up to that point all the tables are somewhere stored, catalog and the table devs etc and data actually show in minio.D. Draco O'Brien
07/24/2024, 12:20 PMD. Draco O'Brien
07/24/2024, 12:20 PMGeorge Leonard
07/24/2024, 12:21 PMD. Draco O'Brien
07/24/2024, 12:21 PMGeorge Leonard
07/24/2024, 12:21 PMGeorge Leonard
07/24/2024, 12:22 PMGeorge Leonard
07/24/2024, 12:22 PMD. Draco O'Brien
07/24/2024, 12:23 PMGeorge Leonard
07/24/2024, 12:23 PMGeorge Leonard
07/24/2024, 12:24 PMGeorge Leonard
07/24/2024, 12:24 PMD. Draco O'Brien
07/24/2024, 12:24 PMGeorge Leonard
07/24/2024, 12:29 PMCREATE CATALOG c_hive WITH (
'type' = 'hive',
'hive-conf-dir' = './conf/');
this catalog, and got data in minio in parquet format...
well up to creating that unnested iceberg table...René
09/06/2024, 12:20 PMGeorge Leonard
09/06/2024, 3:26 PMGeorge Leonard
09/07/2024, 5:45 AMGeorge Leonard
09/07/2024, 5:46 AM> select
> b.invoiceNumber,
> b.saleDateTime_Ltz,
> b.saleTimestamp_Epoc,
> b.terminalPoint,
> b.nett,
> b.vat,
> b.total,
> b.store,
> b.clerk,
> b.basketItems,
> a.payDateTime_Ltz,
> a.payTimestamp_Epoc,
> a.paid,
> a.finTransactionId
> FROM
> c_hive.db01.t_f_msqlcdc_salespayments a,
> -- c_hive.db01.t_f_pgcdc_salespayments a,
> c_hive.db01.t_k_avro_salesbaskets b
> WHERE a.invoiceNumber = b.invoiceNumber
> AND a.created_at > b.saleTimestamp_WM
> AND b.saleTimestamp_WM > (b.saleTimestamp_WM - INTERVAL '1' HOUR);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 'c_hive.db01.t_f_avro_salescompleted' doesn't support consuming update and delete changes which is produced by node Join(joinType=[InnerJoin], where=[((invoiceNumber = invoiceNumber0) AND (created_at > saleTimestamp_WM))], select=[invoiceNumber, payDateTime_Ltz, payTimestamp_Epoc, paid, finTransactionId, created_at, invoiceNumber0, saleDateTime_Ltz, saleTimestamp_Epoc, terminalPoint, nett, vat, total, store, clerk, basketItems, saleTimestamp_WM], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
René
09/12/2024, 9:22 AMGeorge Leonard
09/12/2024, 9:33 AM