hehehe i'm going to run out of hair.... got Robin'...
# troubleshooting
g
hehehe i'm going to run out of hair.... got Robin's Data to Iceberg on S3/Minio, with PostgreSql as catalog working. trying to create a table:
Copy code
Flink SQL> CREATE CATALOG c_iceberg_jdbc WITH (
>    'type' = 'iceberg',
>    'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
>     'warehouse' = '<s3://warehouse>',
>    's3.endpoint' = '<http://minio:9000>',
>    's3.path-style-access' = 'true',
>    'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog',
>    'uri' ='jdbc:<postgresql://postgres:5432/?user=dba&password=rules>');
[INFO] Execute statement succeed.

Flink SQL> USE `c_iceberg_jdbc`.`db01`;
[INFO] Execute statement succeed.

Flink SQL>
> CREATE TABLE t_foo (c1 varchar, c2 int);
[INFO] Execute statement succeed.



Flink SQL> INSERT INTO t_foo VALUES ('a',42);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b0d9bd91b766f4ebb772a4004053a3ea

Flink SQL> SET 'execution.runtime-mode' = 'batch';
>
[INFO] Execute statement succeed.

Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
>
[INFO] Execute statement succeed.

Flink SQL> SELECT * FROM t_foo;
>
+----+----+
| c1 | c2 |
+----+----+
|  a | 42 |
+----+----+
1 row in set

Flink SQL> CREATE or replace TABLE t_k_avro_salesbaskets_x (
>     `invoiceNumber` STRING,
>     `saleDateTime_Ltz` STRING,
>     `saleTimestamp_Epoc` STRING,
>     `terminalPoint` STRING,
>     `nett` DOUBLE,
>     `vat` DOUBLE,
>     `total` DOUBLE,
>     `store` row<`id` STRING, `name` STRING>,
>     `clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
>     `basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
>     `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
>     WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = 'avro_salesbaskets',
>     'properties.bootstrap.servers' = 'broker:29092',
>     'properties.group.id' = 'testGroup',
>     'scan.startup.mode' = 'earliest-offset',
>     'value.format' = 'avro-confluent',
>     'value.avro-confluent.schema-registry.url' = '<http://schema-registry:9081>',
>     'value.fields-include' = 'ALL'
> );
ERROR:
Copy code
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Creating table with computed columns is not supported yet.
d
The error message indicates a computed column is not supported in SQL create statement at least for the version of the Iceburg connector you are using. Unless its under dev or you can find another version then it maybe necessary to create the table without the computed columns and perform those computations in streaming pipeline instead
g
the compute columes are pretty much the water marks. with default catalog default database this all works, this only rears head when trying to create the iceberg backed stack, with hive as catalog. if I create catalog with following then it works, up to that unnedsted table
Copy code
CREATE CATALOG c_hive WITH (
      'type' = 'hive',
      'hive-conf-dir' = './conf/');
it then says thats not a iceberg tables... pretty sure good part of this is me getting confused by the different layers...
d
this issue seems to suggest that computed columns are not supported
g
this seems to be a issue with the catalog... having problem... pretty strange as the catalog is the entire backign bit needed by a prod environment. this has now been seen with hive on s3 and postgresql/jdbc based / stored catalog.
when catalog is simply created as
Copy code
CREATE CATALOG c_hive WITH (
      'type' = 'hive',
      'hive-conf-dir' = './conf/');
then the creates work... up to trying to created that t_i_unnested_sales table... then it complains thats not iceber. up to that point all the tables are somewhere stored, catalog and the table devs etc and data actually show in minio.
d
Ok I saw this issue also
g
need to go see if with the new 1.6 if this is resolved
d
ok
g
this seems to be such a fundamental problem.. Flink and windows are dependant on water marks.
have to see if/how i can do them dif... maybe move the watermark calc to the insert statement,
but ye, flabber gasted.
d
“computed columns and watermark specs are not supported in the FlinkCatalog.javacode.”
g
sh$t.
computer columns i can prob move to the insert statement, make the destination table "dumb"
but windows/group by's need watermarks
d
yes, thats the only option right now I think is just create without compute columns and do the transforms upstream
g
need to figure out how/where the limitation comes in... if it is specific to a catalog type. as i got it working with
Copy code
CREATE CATALOG c_hive WITH (
      'type' = 'hive',
      'hive-conf-dir' = './conf/');
this catalog, and got data in minio in parquet format... well up to creating that unnested iceberg table...
r
Are there any new insights on that? I am just figuring out what's the best (easiest) solution for a catalog if Hive is out of question.
g
i used for my research a stand alone which i modified and had a postgreSQL back end. see dev-hms-postgresql in the following link> i'm busy for the experience to build a 2 node Hive environment, hiveserver2 and seperate metastore with a postgresql back end. this was hive catalog for both the Apache Iceberg scenario and the Apache Paimon solution https://medium.com/@georgelza/an-exercise-in-discovery-streaming-data-in-the-analytical-world-part-1-e7c17d61b9d2
👍 1
I am using Hive for Paimon. working, had to work around some bits. still figuring some of this out. what can be copied how and where, where to use water marks and how and how not.
gets even more interesting when you want to do input via cdc. not about the catalog even then, but what can be copied how as a stream and not. ie: Flink SQL> Insert into c_hive.db01.t_f_avro_salescompleted
Copy code
> select
>         b.invoiceNumber,
>         b.saleDateTime_Ltz,
>         b.saleTimestamp_Epoc,
>         b.terminalPoint,
>         b.nett,
>         b.vat,
>         b.total,
>         b.store,
>         b.clerk,
>         b.basketItems,
>         a.payDateTime_Ltz,
>         a.payTimestamp_Epoc,
>         a.paid,
>         a.finTransactionId
>     FROM
>         c_hive.db01.t_f_msqlcdc_salespayments a,
>     --    c_hive.db01.t_f_pgcdc_salespayments a,
>         c_hive.db01.t_k_avro_salesbaskets b
>     WHERE a.invoiceNumber = b.invoiceNumber
>     AND a.created_at > b.saleTimestamp_WM
>     AND b.saleTimestamp_WM > (b.saleTimestamp_WM - INTERVAL '1' HOUR);
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 'c_hive.db01.t_f_avro_salescompleted' doesn't support consuming update and delete changes which is produced by node Join(joinType=[InnerJoin], where=[((invoiceNumber = invoiceNumber0) AND (created_at > saleTimestamp_WM))], select=[invoiceNumber, payDateTime_Ltz, payTimestamp_Epoc, paid, finTransactionId, created_at, invoiceNumber0, saleDateTime_Ltz, saleTimestamp_Epoc, terminalPoint, nett, vat, total, store, clerk, basketItems, saleTimestamp_WM], leftInputSpec=[JoinKeyContainsUniqueKey], rightInputSpec=[NoUniqueKey])
r
Great blogs from you! Thanks for your insights, really very useful.
g
appreciated. busy with another series.