George Leonard
07/22/2024, 5:19 PMDaniel Riveros
07/22/2024, 7:15 PMNicholas Leong
07/23/2024, 1:23 AMMadhankumar R
07/23/2024, 3:18 AMMonika Bednarz
07/23/2024, 8:16 AMCannot have more than one execute() or executeAsync() call in a single environment.
described in a closed issue here.
Whenever I remove the env.execute(), only the Table API part is submitted. Messages are output into the Kafka stream, but there is no sign of execution of the Python process function in the logs 🤯
Could you please advise on how to execute all parts of code? 🙏George Leonard
07/23/2024, 9:31 AMCREATE CATALOG c_iceberg_hive WITH (
'type' = 'iceberg',
'catalog-type'='hive',
'warehouse' = '<s3a://warehouse>',
'hive-conf-dir' = '/opt/sql-client/conf');
CREATE DATABASE `c_iceberg_hive`.`db01`;
Create table
CREATE TABLE t_k_avro_salescompleted (
INVNUMBER STRING,
SALEDATETIME_LTZ STRING,
SALETIMESTAMP_EPOC STRING,
TERMINALPOINT STRING,
NETT DOUBLE,
VAT DOUBLE,
TOTAL DOUBLE,
STORE row<ID STRING, NAME STRING>,
CLERK row<ID STRING, NAME STRING, SURNAME STRING>,
BASKETITEMS array<row<ID STRING, NAME STRING, BRAND STRING, CATEGORY STRING, PRICE DOUBLE, QUANTITY INT>>,
FINTRANSACTIONID STRING,
PAYDATETIME_LTZ STRING,
PAYTIMESTAMP_EPOC STRING,
PAID DOUBLE,
SALESTIMESTAMP_WM AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(SALETIMESTAMP_EPOC AS BIGINT) / 1000)),
WATERMARK FOR SALESTIMESTAMP_WM AS SALESTIMESTAMP_WM
) WITH (
'connector' = 'kafka',
'topic' = 'avro_salescompleted',
'properties.bootstrap.servers' = 'broker:29092',
'scan.startup.mode' = 'earliest-offset',
'properties.group.id' = 'testGroup',
'value.format' = 'avro-confluent',
'value.avro-confluent.schema-registry.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
error
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Creating table with computed columns is not supported yet.
Vadada Venkata Madhusudhana Rao
07/23/2024, 10:21 AMShai Somekh
07/23/2024, 11:41 AMThomas Steinholz
07/23/2024, 1:03 PMFlinkDeployment
using the flink-kubernetes-operator
? https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.8/docs/custom-resource/reference/George Leonard
07/23/2024, 3:05 PMCREATE CATALOG c_iceberg_hive WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'warehouse' = '<s3a://warehouse>',
'hive-conf-dir' = './conf');
CREATE DATABASE `c_iceberg_hive`.`db01`;
and then
CREATE TABLE `c_iceberg_hive`.`db01`.t_f_unnested_sales (
`store_id` STRING,
`product` STRING,
`brand` STRING,
`saleValue` DOUBLE,
`category` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'unnested_sales',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
then it results in
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Creating table with computed columns is not supported yet.
if I change catalog:
CREATE CATALOG c_hive WITH (
'type' = 'hive',
'hive-conf-dir' = '/opt/flink/conf');
CREATE DATABASE `c_hive`.`db01`;
and then
CREATE TABLE `c_hive`.`db01`.t_f_unnested_sales (
`store_id` STRING,
`product` STRING,
`brand` STRING,
`saleValue` DOUBLE,
`category` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'unnested_sales',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
this works...
but when i now try and use this in.
CREATE TABLE `c_hive`.`db01`.t_f_unnested_sales (
`store_id` STRING,
`product` STRING,
`brand` STRING,
`saleValue` DOUBLE,
`category` STRING,
`saleDateTime_Ltz` STRING,
`saleTimestamp_Epoc` STRING,
`saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
) WITH (
'connector' = 'kafka',
'topic' = 'unnested_sales',
'properties.bootstrap.servers' = 'broker:29092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = '<http://schema-registry:9081>',
'value.fields-include' = 'ALL'
);
insert into `c_hive`.`db01`.t_f_unnested_sales
SELECT
`store`.`id` as `store_id`,
bi.`name` AS `product`,
bi.`brand` AS `brand`,
bi.`price` * bi.`quantity` AS `saleValue`,
bi.`category` AS `category`,
`saleDateTime_Ltz` as saleDateTime_Ltz,
`saleTimestamp_Epoc` as saleTimestamp_Epoc
FROM `c_hive`.`db01`.t_f_avro_salescompleted_x -- assuming avro_salescompleted_x is a table function
CROSS JOIN UNNEST(`basketItems`) AS bi;
And now try and push data to iceberg on minio.
CREATE TABLE t_i_unnested_sales WITH (
'connector' = 'iceberg',
'catalog-type' = 'hive',
'catalog-name' = 'c_hive',
'warehouse' = '<s3a://warehouse>',
'hive-conf-dir' = './conf/')
AS SELECT * FROM t_f_unnested_sales;
now results in:
[ERROR] Could not execute SQL statement. Reason:
org.apache.iceberg.exceptions.NoSuchIcebergTableException: Not an iceberg table: c_hive.db01.t_i_unnested_sales (type=null)
seems to be originating out of the catalog type...George Leonard
07/24/2024, 9:56 AMFlink SQL> CREATE CATALOG c_iceberg_jdbc WITH (
> 'type' = 'iceberg',
> 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
> 'warehouse' = '<s3://warehouse>',
> 's3.endpoint' = '<http://minio:9000>',
> 's3.path-style-access' = 'true',
> 'catalog-impl' = 'org.apache.iceberg.jdbc.JdbcCatalog',
> 'uri' ='jdbc:<postgresql://postgres:5432/?user=dba&password=rules>');
[INFO] Execute statement succeed.
Flink SQL> USE `c_iceberg_jdbc`.`db01`;
[INFO] Execute statement succeed.
Flink SQL>
> CREATE TABLE t_foo (c1 varchar, c2 int);
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO t_foo VALUES ('a',42);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: b0d9bd91b766f4ebb772a4004053a3ea
Flink SQL> SET 'execution.runtime-mode' = 'batch';
>
[INFO] Execute statement succeed.
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
>
[INFO] Execute statement succeed.
Flink SQL> SELECT * FROM t_foo;
>
+----+----+
| c1 | c2 |
+----+----+
| a | 42 |
+----+----+
1 row in set
Flink SQL> CREATE or replace TABLE t_k_avro_salesbaskets_x (
> `invoiceNumber` STRING,
> `saleDateTime_Ltz` STRING,
> `saleTimestamp_Epoc` STRING,
> `terminalPoint` STRING,
> `nett` DOUBLE,
> `vat` DOUBLE,
> `total` DOUBLE,
> `store` row<`id` STRING, `name` STRING>,
> `clerk` row<`id` STRING, `name` STRING, `surname` STRING>,
> `basketItems` array<row<`id` STRING, `name` STRING, `brand` STRING, `category` STRING, `price` DOUBLE, `quantity` INT>>,
> `saleTimestamp_WM` AS TO_TIMESTAMP(FROM_UNIXTIME(CAST(`saleTimestamp_Epoc` AS BIGINT) / 1000)),
> WATERMARK FOR `saleTimestamp_WM` AS `saleTimestamp_WM`
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'avro_salesbaskets',
> 'properties.bootstrap.servers' = 'broker:29092',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'value.format' = 'avro-confluent',
> 'value.avro-confluent.schema-registry.url' = '<http://schema-registry:9081>',
> 'value.fields-include' = 'ALL'
> );
ERROR:
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Creating table with computed columns is not supported yet.
George Leonard
07/24/2024, 11:55 AMGeorge Leonard
07/24/2024, 11:56 AMGeorge Leonard
07/24/2024, 11:56 AMAnis Shaikh
07/24/2024, 5:45 PMjobId
of the Flink job.
2. Stopping the Job:
◦ Using the saved jobId
, I execute the stop API provided by Flink, which stops the job and creates a savepoint.
◦ The stop API returns a request-id
(triggerId
).
◦ Using this request-id
, I call /jobs/{flinkJobId}/savepoints/{triggerId}
to retrieve the savepoint path.
◦ I store this savepoint path for future use.
3. Resuming the Job:
◦ To resume the job, I execute the Flink run API /jars/{jarId}/run
with the necessary program arguments, including the --savepoint
path stored earlier.
Despite following this approach, the job is restarting from the beginning and reprocessing all records instead of resuming from where it left off.
I am seeking guidance on the following:
• Is my current approach correct?
• If not, where might I be going wrong?
• Any recommendations or best practices to ensure the job resumes processing from the last savepoint.
I appreciate any help or insights you can provide.Vishva Mahadevan
07/25/2024, 4:08 AMGeorge Leonard
07/25/2024, 11:58 AMFlink SQL>
> CREATE CATALOG c_iceberg_hive WITH (
> 'type' = 'iceberg',
> 'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
> 'warehouse' = '<s3://warehouse>',
> 's3.endpoint' = '<http://minio:9000>',
> 's3.path-style-access' = 'true',
> 'catalog-type' = 'hive'
> );
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, java.lang.Object)'
thinking it will take someone that's done this like 5 min to see my gap/miss... please.Vishva Mahadevan
07/25/2024, 1:50 PM2024-07-25 19:12:05
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
at org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener.notifyTaskFailure(UpdateSchedulerNgOnInternalFailuresListener.java:51)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.notifySchedulerNgAboutInternalTaskFailure(DefaultExecutionGraph.java:1622)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1137)
at org.apache.flink.runtime.executiongraph.Execution.processFail(Execution.java:1077)
at org.apache.flink.runtime.executiongraph.Execution.fail(Execution.java:778)
at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.signalPayloadRelease(SingleLogicalSlot.java:195)
at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot.release(SingleLogicalSlot.java:182)
at org.apache.flink.runtime.scheduler.SharedSlot.lambda$release$4(SharedSlot.java:270)
at java.base/java.util.concurrent.CompletableFuture.uniAcceptNow(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.uniAcceptStage(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.thenAccept(Unknown Source)
at org.apache.flink.runtime.scheduler.SharedSlot.release(SharedSlot.java:270)
at org.apache.flink.runtime.jobmaster.slotpool.AllocatedSlot.releasePayload(AllocatedSlot.java:152)
at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releasePayload(DefaultDeclarativeSlotPool.java:483)
at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.freeAndReleaseSlots(DefaultDeclarativeSlotPool.java:475)
at org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool.releaseSlots(DefaultDeclarativeSlotPool.java:446)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.internalReleaseTaskManager(DeclarativeSlotPoolService.java:275)
at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolService.releaseTaskManager(DeclarativeSlotPoolService.java:231)
at org.apache.flink.runtime.jobmaster.JobMaster.disconnectTaskManager(JobMaster.java:533)
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.handleTaskManagerConnectionLoss(JobMaster.java:1440)
at org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1435)
at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(HeartbeatMonitorImpl.java:155)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id stream-join-batch-layer-taskmanager-1-21 timed out.
... 31 more
Edgar Ferney Ruiz Anzola
07/25/2024, 1:52 PM2024-07-25 07:29:54,586 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Initializing job 'pot' (bfadda0ae94f23951224bb498106d4cf).
2024-07-25 07:29:54,604 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for pot (bfadda0ae94f23951224bb498106d4cf).
2024-07-25 07:29:54,785 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Recovering checkpoints from KubernetesStateHandleStore{configMapName='pot-flink-deployment-bfadda0ae94f23951224bb498106d4cf-config-map'}.
2024-07-25 07:29:54,793 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Found 0 checkpoints in KubernetesStateHandleStore{configMapName='pot-flink-deployment-bfadda0ae94f23951224bb498106d4cf-config-map'}.
2024-07-25 07:29:54,793 INFO org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils [] - Trying to fetch 0 checkpoints from storage.
2024-07-25 07:29:54,892 INFO org.apache.flink.kubernetes.kubeclient.resources.KubernetesConfigMapSharedInformer [] - Starting to watch for pot/
and the fun part is sometimes it works, A few times it finds the checkpoint and restores correctly.
if I check the configmap pot-flink-deployment-bfadda0ae94f23951224bb498106d4cf-config-map
the checkpoint information is correct there and the folder is correctly stored in the gcp bucket
hope anyone can guide me to solve this problemGeorge Leonard
07/25/2024, 3:17 PMJeeno Lentin
07/25/2024, 3:41 PM1.15
to 1.18
in our AWS Kinesis (AWS Managed Apache Flink) environment. While Flink 1.18 is supposed to support Java 17, we are encountering issues when deploying Java 17 applications. The applications still work with Java 11 but fail with Java 17 with this error:
Caused by: java.lang.UnsupportedClassVersionError: <class_name> has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 55.0
It appears that AWS is supplying JRE 11, and we don’t see any options in the AWS console to choose the Java Runtime..
Has anyone else experienced similar issues with AWS Managed Apache Flink and Java 17? Any insights or workarounds would be greatly appreciated. We are also in contact with AWS support but are looking for any additional guidance from the community.
Thank you!Sergey Anokhovskiy
07/25/2024, 3:49 PMScott Robertson
07/25/2024, 9:08 PM<NULL>
?
I have this kafka source:
CREATE TABLE kafka_source (
id INT,
name STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'test',
'scan.startup.mode' = 'earliest-offset',
'topic' ='source',
'value.format' = 'json'
);
The WATERMARK seems to enable a TUMBLE window
SELECT * FROM TABLE(
TUMBLE(TABLE kafka_source, DESCRIPTOR(ts), INTERVAL '5' SECONDS));
I can insert fine:
INSERT INTO kafka_source
VALUES (1, 'matt', CURRENT_TIMESTAMP);
But the watermarks are all null.
i.e.
SELECT CURRENT_WATERMARK(ts) from kafka_source;
Opens a window, with a Table, and all the values are <NULL>
.
Why would all watermarks be <NULL>
?Rama Raghava Reddy Arvabhumi
07/26/2024, 12:36 AMHiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
Is it possible to provide hive metastore connection object instead of modifying hive-site.xml ?Nicholas Leong
07/26/2024, 3:34 AMIhor
07/26/2024, 9:08 AMMySqlSplitReader
) until a manual deployment delete (I'm using k8s and the flink k8s operator).
Any idea how to make detection of connection issue and "restart" automatic?Irene Rodriguez
07/26/2024, 10:04 AMVishva Mahadevan
07/26/2024, 12:26 PMScott Robertson
07/26/2024, 12:27 PMLouis Cameron Booth
07/26/2024, 2:25 PM4.3.0-1.19
. The examples in the public repo in the test folder appear to be for 1.18, and when I try to write the same code locally using 1.19 I get errors when attempting to sink to my DataStream: Cannot resolve method 'sinkTo(KinesisStreamsSink<String>)'
with suggestions to cast arguments to get it working with the sinkTo methods.