Sylvia Lin
10/04/2022, 9:37 PMDamon Rolfs
10/05/2022, 3:32 AMSevvy Yusuf
10/05/2022, 12:42 PMJaume
10/05/2022, 1:20 PMupsert-kafka
connector) for joining and sink the result into another Kafka topic. For some cases it creates a proper result, but for others it results in an unexpected Tombstone message (They are unexpected because their relationships do exist).
🤔 Am I missing something? Does anyone why is this behavior happening? Can anyone bring some light here?
🧵 I'll put more info in thread ⤵️Lydian Lee
10/05/2022, 10:39 PMWebIdentityTokenCredentialsProvider
for s3 presto? I’ve tested provide
presto.s3.credentials-provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
but it failed with :
Error creating an instance of com.amazonaws.auth.WebIdentityTokenCredentialsProvider for URI s3p
Thanks!Aqib Mehmood
10/06/2022, 6:02 AMDataStream<Row> processor = resultStream.map(new MapFunction<Row, Row>() {
@Override
public Row map(Row row) throws NullPointerException{
try{
String newPrice = row.getField("price").toString();
String prodcutId = row.getField("productId").toString();
int i=Integer.parseInt(prodcutId);
Table pastPrice = tEnv.executeSql("SELECT price FROM (\n" +
" SELECT data.price from orders where data.productId = " + i + " order by data.updatedAt desc limit 1);");
}
catch (NullPointerException err){
System.out.println("Empty row");
}
return row;
}
})
We are getting this error
Caused by: org.apache.flink.api.common.InvalidProgramException: The implementation of the AbstractStreamTableEnvironmentImpl is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:164) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2194) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:201) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:575) ~[flink-dist-1.15.0.jar:1.15.0]
at pricingtriggers.DataStreamJob.main(DataStreamJob.java:129) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist-1.15.0.jar:1.15.0]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:291) ~[flink-dist-1.15.0.jar:1.15.0]
What am I doing wrong here?
TIASumit Nekar
10/06/2022, 6:45 AMSuppressed: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Failed to send data to Kafka: Request parameters do not satisfy the configured policy.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965) ~[?:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Suppressed: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Failed to send data to Kafka: Request parameters do not satisfy the configured policy.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:965) ~[?:?]
at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:864) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runAndSuppressThrowable(StreamTask.java:843) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:756) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:662) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Request parameters do not satisfy the configured policy.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925) ~[?:?]
... 10 more
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy.
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Request parameters do not satisfy the configured policy.
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1095) ~[?:?]
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.close(FlinkKafkaProducer.java:925) ~[?:?]
... 10 more
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy.
Caused by: org.apache.kafka.common.errors.PolicyViolationException: Request parameters do not satisfy the configured policy.
Its not logging which policy is violated.
Appreciate your help to debug this issue.clen.moras
10/06/2022, 10:17 AM2022-10-06 10:10:10,849 i.j.o.p.e.s.c.ControllerResourceEventSource [INFO ] Stopping informer 'flinksessionjobcontroller' Controller -> io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer@297ca8b5
2022-10-06 10:10:10,849 i.j.o.p.e.s.c.ControllerResourceEventSource [INFO ] Stopping informer 'flinkdeploymentcontroller' Controller -> io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer@3c0036b
Exception in thread "main" io.fabric8.kubernetes.client.KubernetesClientException: Operation: [list] for kind: [FlinkDeployment] with name: [null] in namespace: [o11y-aiops] failed.
at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:130)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.listRequestHelper(BaseOperation.java:140)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:415)
at io.fabric8.kubernetes.client.dsl.base.BaseOperation.list(BaseOperation.java:83)
at io.fabric8.kubernetes.client.informers.cache.Reflector.listSyncAndWatch(Reflector.java:81)
at io.fabric8.kubernetes.client.informers.impl.DefaultSharedIndexInformer.run(DefaultSharedIndexInformer.java:146)
at io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.createAndRunInformerFor(ControllerResourceEventSource.java:101)
at io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.lambda$start$0(ControllerResourceEventSource.java:84)
at java.base/java.lang.Iterable.forEach(Unknown Source)
at io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource.start(ControllerResourceEventSource.java:83)
at io.javaoperatorsdk.operator.processing.event.EventSourceManager.start(EventSourceManager.java:72)
at io.javaoperatorsdk.operator.processing.Controller.start(Controller.java:196)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source)
at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source)
at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(Unknown Source)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$Head.forEach(Unknown Source)
at io.javaoperatorsdk.operator.Operator$ControllerManager.start(Operator.java:170)
at io.javaoperatorsdk.operator.Operator.start(Operator.java:83)
at org.apache.flink.kubernetes.operator.FlinkOperator.run(FlinkOperator.java:169)
at org.apache.flink.kubernetes.operator.FlinkOperator.main(FlinkOperator.java:179)
Caused by: java.net.SocketTimeoutException: connect timed out
appreciate any help with debugging.Emily Morgan
10/06/2022, 1:48 PMinitializeState
and snapshotState
overrides for the source and then manually managing the state inside the custom flat map operator. Updating the state is working correctly because I can see that the size of the metadata file changes after I create a savepoint, but is there an example of how to know the size of the state before the metadata file is written?Beny Chernyak
10/06/2022, 1:54 PMorg.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider InstanceProfileCredentialsProvider : com.amazonaws.SdkClientException: Failed to connect to service endpoint:
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:187)
From configuration side nothing changed: I use HADOOP_CONF_DIR env variable pointing to directory with the same core-site.xml I've used earlier:
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3n.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>xxxxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>xxxxx</value>
</property>
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider</value>
</property>
<property>
<name>fs.s3a.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3a</value>
</property>
</configuration>
I succeed only when I explicitly put AWS_ACCESS_KEY_ID and AWS_SECRET_KEY as env variables, and I'd like to use the values coming from core-site.xml.
What I've tried so far:
• Changing org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider to
com.amazonaws.auth.profile.ProfileCredentialsProvider in core-site.xml
• Adding FLINK_HOME and FLINK_CONF env variables
• Adding flink-conf.yaml with
fs.s3a.access.key: xxxxxx
fs.s3a.secret.key: xxxxxx
Nothing helps so far. What am I doing wrong and/or what am I missing here?Maher Turifi
10/06/2022, 3:19 PMDROP TABLE IF EXISTS versioned_rates;
CREATE TABLE versioned_rates (
rate_currency STRING NOT NULL,
rate DECIMAL(38, 10),
rate_proctime as PROCTIME(),
currency_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
WATERMARK FOR currency_time AS currency_time ,
PRIMARY KEY(rate_currency) NOT ENFORCED -- primary key (currency)
) WITH (
'connector' = 'upsert-kafka',
...
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TABLE orders (
order_id STRING,
order_currency STRING NOT NULL,
amount INT,
order_proctime as PROCTIME()
) WITH (
'connector' = 'kafka',
...
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
);
my join statement is :
SELECT
o.order_id,
o.order_proctime,
o.amount * r.rate AS amount,
r.rate rate,
r.rate_currency
FROM orders AS o JOIN versioned_rates_v FOR SYSTEM_TIME AS OF o.order_proctime r
on o.order_currency = r.rate_currency;
However, I;m receiving the following error:
TableException: Processing-time temporal join is not supported yet.
...
According to StreamExecTemporalJoin.java :
"the semantic of this implementation is problematic, because the join
// processing
// for left stream doesn't wait for the complete snapshot of temporal table, this
// may
// mislead users in production environment. See FLINK-19830 for more details.
so I tried implementing this using the temporal table function:
SCALA>val versioned_rates= stenv.from("versioned_rates_v")
SCALA>val rates = versioned_rates.createTemporalTableFunction($"rate_proctime", $"rate_currency")
SELECT
o.order_id,
o.amount * r.rate AS amount,
r.rate rate,
r.rate_currency
FROM orders o, LATERAL TABLE (Rates(o.order_proctime)) r
WHERE o.order_currency = r.rate_currency;
However the issue now is different:
UnsupportedOperationException
I also tried this in scala :
val orders = stenv.from("orders")
val result = orders
.joinLateral(call("Rates", $("order_proctime")), $("order_currency").isEqual($("rate_currency")))
stenv.dropTemporaryView("join_results")
stenv.createTemporaryView("join_results", result)
SQL> select * from join_results
is there anything I'm doing wrong, could anyone guid me on this, thanksPrasaanth Neelakandan
10/06/2022, 10:00 PMLydian Lee
10/07/2022, 7:14 AM2022-10-05 17:05:32,027 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Impulse -> [3]Reading message from kafka/KafkaIO.Read/KafkaIO.Read.ReadFromKafkaViaUnbounded/Read(KafkaUnboundedSource)/{ParDo(Out
2022-10-05 17:19:34,272 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Triggering cancel-with-savepoint for job 1a68f74acf0ccf403693e2f228fa62a6.
2022-10-05 17:19:34,285 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 1 (type=SAVEPOINT) @ 1664990374275 for job 1a68f74acf0ccf403693e2f228fa62a6.
2022-10-05 17:19:34,287 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Making directory: <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
2022-10-05 17:19:34,287 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - op_mkdirs += 1 -> 3
2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - op_get_file_status += 1 -> 6
2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Getting path status for <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89> (flink/expansi
2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - S3GetFileStatus <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
2022-10-05 17:19:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_metadata_requests += 1 -> 6
2022-10-05 17:19:34,381 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_list_requests += 1 -> 6
2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Not Found: <s3a://affirm-stage-chrono/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - op_get_file_status += 1 -> 7
2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Getting path status for <s3a://test-bucket/flink/expansion-service-example/savepoints> (flink/expansion-service-example/savepoints)
2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - S3GetFileStatus <s3a://test-bucket/flink/expansion-service-example/savepoints>
2022-10-05 17:19:34,411 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_metadata_requests += 1 -> 7
2022-10-05 17:19:34,431 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_list_requests += 1 -> 7
2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Found path as directory (with /)
2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Prefix count = 0; object count=1
2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Summary: flink/expansion-service-example/savepoints/ 0
2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT 0 bytes to flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/
2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT start 0 bytes
2022-10-05 17:19:34,461 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_put_requests += 1 -> 3
2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT completed success=true; 0 bytes
2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_put_requests_completed += 1 -> 3
2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Finished write to flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/, len 0
2022-10-05 17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete unnecessary fake directory flink/expansion-service-example/savepoints/ for <s3a://affirm-stage-chrono/flink/expansion-service-exa>
2022-10-05 17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete unnecessary fake directory flink/expansion-service-example/ for <s3a://affirm-stage-chrono/flink/expansion-service-example>
2022-10-05 17:19:34,527 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete unnecessary fake directory flink/ for <s3a://affirm-stage-chrono/flink>
2022-10-05 17:19:34,527 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_delete_requests += 1 -> 3
2022-10-05 17:19:34,617 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress [] - PUT flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/: 0 bytes
2022-10-05 17:24:34,285 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 1 of job 1a68f74acf0ccf403693e2f228fa62a6 expired before completing.
2022-10-05 17:24:34,287 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 1a68f74acf0ccf403693e2f228fa62a6. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000) [flink-dist_2.12-1.14.5.jar:1.14.5]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_292]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_292]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_292]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]
2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - op_delete += 1 -> 1
2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - op_get_file_status += 1 -> 8
2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Getting path status for <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89> (flink/expansi
2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - S3GetFileStatus <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
2022-10-05 17:24:34,288 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_metadata_requests += 1 -> 8
2022-10-05 17:24:34,360 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_list_requests += 1 -> 8
2022-10-05 17:24:34,394 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Found path as directory (with /)
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Prefix count = 0; object count=1
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Summary: flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/ 0
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Delete path <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89> - recursive true
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - delete: Path is a directory: <s3a://test-bucket/flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89>
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Deleting fake empty directory flink/expansion-service-example/savepoints/savepoint-1a68f7-bb0ae207bd89/
2022-10-05 17:24:34,395 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_delete_requests += 1 -> 4
2022-10-05 17:24:34,427 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - S3GetFileStatus <s3a://test-bucket/flink/expansion-service-example/savepoints>
2022-10-05 17:24:34,427 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_metadata_requests += 1 -> 9
2022-10-05 17:24:34,444 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_list_requests += 1 -> 9
2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Not Found: <s3a://test-bucket/flink/expansion-service-example/savepoints>
2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Creating new fake directory at <s3a://test-bucket/flink/expansion-service-example/savepoints>
2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT 0 bytes to flink/expansion-service-example/savepoints/
2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT start 0 bytes
2022-10-05 17:24:34,482 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_put_requests += 1 -> 4
2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - PUT completed success=true; 0 bytes
2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_put_requests_completed += 1 -> 4
2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem [] - Finished write to flink/expansion-service-example/savepoints/, len 0
2022-10-05 17:24:34,548 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete unnecessary fake directory flink/expansion-service-example/ for <s3a://affirm-stage-chrono/flink/expansion-service-example>
2022-10-05 17:24:34,548 TRACE org.apache.hadoop.fs.s3a.S3AFileSystem [] - To delete unnecessary fake directory flink/ for <s3a://affirm-stage-chrono/flink>
2022-10-05 17:24:34,548 DEBUG org.apache.hadoop.fs.s3a.S3AStorageStatistics [] - object_delete_requests += 1 -> 5
2022-10-05 17:24:34,597 DEBUG org.apache.hadoop.fs.s3a.S3AFileSystem.Progress [] - PUT flink/expansion-service-example/savepoints/: 0 bytes
Emile Alberts
10/07/2022, 10:46 AMDataDogHttpReporter
for metrics: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/metric_reporters/#datadog.
How can I read the API key from environment variables or a Kubernetes secret?Tiansu Yu
10/07/2022, 1:48 PMAbhinav sharma
10/07/2022, 2:16 PMHygor Knust
10/07/2022, 4:54 PMCREATE TABLE source_table (
…columns
)
WITH (
'connector' = 'kafka',
'topic' = 'source_topic',
'properties.bootstrap.servers' = ${kafka_broker},
'properties.sasl.jaas.config' = ${kafka_sasl_config},
'key.avro-confluent.basic-auth.user-info' = ${sr_auth},
'value.avro-confluent.basic-auth.user-info' = ${sr_auth}
)
This way we will also not have sensitive information hard coded.Carlos Castro
10/07/2022, 6:57 PMMingliang Liu
10/07/2022, 11:40 PMSumit Nekar
10/08/2022, 4:15 AMAll our jobs are long time running jobs
.I am thinking of having a session cluster but deploying only one job to get the benefits of both modes.
How is your experience with a production setup?Ravisangar Vijayan
10/08/2022, 4:38 AMRavisangar Vijayan
10/08/2022, 9:32 PMLee Senlen
10/10/2022, 1:40 AMBhupendra Yadav
10/10/2022, 10:00 AMTiansu Yu
10/10/2022, 10:49 AMthe same configuration keys as the Presto file system
in this page is broken. Also in general, how do i pass the presto JDBC connection parameters (url, port number, user, credentials, and finally the fully qualified table name string) to the presto connector? Or one has to save the parameters in a configuration file (and load it somehow) and replace the s3 address with the s3p prefix?
2. I do not understand what it means It is the only S3 file system with support for the FileSystem.
?Angelo Kastroulis
10/10/2022, 2:19 PMCREATE TABLE MyTable (
id INT,
stopTime TIMESTAMP(0),
startTime TIMESTAMP(0),
fk INT) WITH (
<set a number of BUCKETs AND/OR BUCKETKEY by fk HERE>,
<FILESYSTEM SOURCE CONNECTOR HERE>
)
I’m getting an error the the filesystem connector doesn’t understand the options for “bucket” or “bucket-key” which leads me to believe that you can’t use them with a connector, but there is no documentation on any of the expected usage other than mentioning buckets in passing in the docs.
How would buckets normally be used?Tiansu Yu
10/10/2022, 3:59 PMList field [optional binary attribute_key_id (STRING)] in List [attribute_values] has to be required.
Somehow flink-parquet tries to ask the field embeded in a list to be type required, through the entire group in the parquet schema is optional. Details in 🧵Ravisangar Vijayan
10/10/2022, 4:01 PMorg.apache.flink.runtime.execution.CancelTaskException: Buffer pool has already been destroyed.
?Tawfik Yasser
10/10/2022, 6:11 PMKwangin Jung
10/11/2022, 1:20 AMpublic class MyTriggerFunction extends Trigger<String, TimeWindow> {
public MyTriggerFunction() {
this.aggregatingStateDescriptor = new AggregatingStateDescriptor<>(
"my-trigger",
new MyAggregatorFunction(),
new TupleSerializer<>(
(Class<Tuple2<Integer, Integer>>) (Class) Tuple2.class,
new TypeSerializer[] {new IntSerializer(), new IntSerializer()}
);
);
}
public String onElement(...) {
if (...) {
return TriggerResult.FIRE_AND_PURGE;
}
}
}
public class MyAggregatorFunction implements AggregateFunction<String, Tuple2<Integer, Integer>, Boolean> {
// ...
}
Until I've tested, when TriggerResult.FIRE_AND_PURGE
is being called, it emits the result of current window, but data in this.aggregatingStateDescriptor
still remains.
Does somebody knows when this data will be cleared? I've see following in document
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/windows/#fire-and-purge
Purging will simply remove the contents of the window and will leave any potential meta-information about the window and any trigger state intact.
but I don't think this will be remained forever, because millions of window can be created while processing, and remaining all these data looks bad for memory.