Jaehyeon Kim
02/23/2023, 3:52 AM{
"seller_id": "LNK",
"quantity": 5,
"sale_ts": 1677124093560,
"name": "Toothpaste",
"product_price": 4.99
}
The issue is it fails to create the aggregated output into the sink tables when the window is based on the event time (evttime
) - it just stuck forever. On the other hand, it works when it is based on the process time (proctime
) and I'm quite confused.
Can you please check the attached script and inform me how to make it working?Mingfeng Tan
02/23/2023, 6:29 AMCaused by: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
. I have tried to increase kafka request time out for the consumer, but it does not have any effect. and the issue is still there.
The flink jobs are running under standalone mode and are managed by apache flink operator 1.3.1. During upgrade, the stop and savepoint operation hits the following but manually trigger savepoints are totally OK.
Previously there was discussion mentioned it can be resolved by changing the kafka consumer/producer to kafka source and sink. However we have > 50 flink jobs so need time to change all of them.
Anybody know if there is workaround for this issue?
Thanks!
2023-02-22 22:25:42
org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1036)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: Event Source -> Extract Value From Kafka Message -> Add Datamart Metadata -> Filter Datamart Orgs And Stores (1/1)#2 Failure reason: Task has failed.
at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1375)
at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1318)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:344)
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
... 3 more
Caused by: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:306)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:286)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.base/java.lang.Thread.run(Unknown Source)
Matt Fysh
02/23/2023, 11:55 AMSumit Nekar
02/23/2023, 12:16 PMGiannis Polyzos
02/23/2023, 4:16 PMSELECT *
FROM orders
JOIN products
ON orders.productId = products.id
what i realised is its best practise?? to use a temporal join for this, which means changing my query to something like this
SELECT *
FROM orders
JOIN products FOR SYSTEM_TIME AS OF orders.orderTime_ltz
ON orders.productId = products.id
What im trying to undestand is:
1. How is it different and what SYSTEM_TIME AS OF actually enforces
2. Why when the first query works and emits results, when i specify SYSTEM_TIME AS OF i get no outputEmmanuel Leroy
02/23/2023, 5:35 PMChristos Hadjinikolis
02/23/2023, 9:29 PMPrithvi Dammalapati
02/23/2023, 10:46 PMflink-conf.yaml
did not produce expected results
s3a.attempts.maximum: "25"
s3a.threads.max: "512"
s3a.threads.keepalivetime: "240"
s3a.connection.maximum: "2048"
s3a.multipart.size: "100M"
s3a.fast.upload.active.blocks: "8"
s3a.multipart.purge: "true"
Any recommendations to expedite the checkpointing to S3? TIAZhiyu Tian
02/24/2023, 12:43 AMAmir Hossein Sharifzadeh
02/24/2023, 12:59 AMding bei
02/24/2023, 4:14 AM1. My APP is runing on kubernetes managed by flink kubernetes operator, I use ENV ENABLE_BUILT_IN_PLUGINS to link hadoop-s3 plugin to where it supposed to be, and I exec into the pod ,confirmed iflink-s3-fs-hadoop-1.15.2.jar is in plugins/hadoop-s3 folder.
2. My APP will read proto records from kafka, so I wrote a deserializer which will call hadoop releated class funciton ,but not directly ,in a very deep chain.
3. However , it report NoClassFoundError org/apache/hadoop/conf/Configuration ,and the error trace back to deserializer
4. Then I append hadoop dependency into my application pom ,and package into a fat jar, guess what, it worked
So can somebody please explain this to me ? I read a lot of times of Flink DOC about plugins and classloader ,still no clueAditya
02/24/2023, 5:00 AMRashmin Patel
02/24/2023, 6:24 AMts
less than current watermark.
I am using partition aware watermark strategy (max-bounded-out-of-order) with env.fromSource[Option[T]](kafkaSource, watermarkStrategy, id)
In what scenarios, this behaviour can happen ?chunilal kukreja
02/24/2023, 8:21 AMDimitris Kalouris
02/24/2023, 2:12 PMb
STRING>) WITH ( 'connector' = 'filesystem', 'path' = 'my_file.json', 'format'='json');
select * from my_table;
gives:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (byte[])"{"; line: 1, column: 1])
at [Source: UNKNOWN; line: 1, column: 2]Sergio Sainz
02/25/2023, 2:18 AMKrish Narukulla
02/25/2023, 7:04 PMSumit Nekar
02/26/2023, 7:06 AMViacheslav Chernyshev
02/27/2023, 10:29 AMKeyedCoProcessFunction
s into the pipeline. As far as I can see, these operators are currently not chainable. Subsequently, they are forced to go through a "network" serialisation barrier here, despite having exactly the same parallelism. I find it a bit unexpected and wonder if there is a way to avoid it. Example job graph:
Source A -> keyBy(A) --+-> KeyedCoProcess -> Sink
/
Source B -> keyBy(B) +
If I have either Source A -> keyBy(A) -> KeyedProcess -> Sink
or Source B -> keyBy(B) -> KeyedProcess -> Sink
independently, then everything after keyBy
is chained as expected and the information is passed without extra serialisation.kingsathurthi
02/27/2023, 11:08 AMchunilal kukreja
02/27/2023, 12:40 PMorg.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169)
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2104)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2083)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
And this has become a big problem, that once it starts coming in post that job restarts & hardly gets into stable condition.Nikhil Kalyankar
02/27/2023, 2:07 PMFileSink
as well.Vibhavari Bellutagi
02/27/2023, 2:36 PM刘路
02/27/2023, 3:01 PM刘路
02/27/2023, 3:14 PMTommy May
02/27/2023, 8:10 PMkeyBy
is resulting in hundreds of mb/sec of network shuffling, so we could see a significant performance bump if we could better preserve the upstream partitioning in FlinkAravind
02/27/2023, 8:19 PMNathanael England
02/27/2023, 8:38 PMBIG_INT
?Eric Xiao
02/27/2023, 8:40 PM{"instant":{"epochSecond":1677246902,"nanoOfSecond":184000000},"thread":"OkHttp <https://10.28.56.1/>...","level":"WARN","loggerName":"io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener","message":"Exec Failure javax.net.ssl.SSLException Connection reset","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":255067,"threadPriority":5}
{"instant":{"epochSecond":1677253016,"nanoOfSecond":460000000},"thread":"pool-278132-thread-1","level":"ERROR","loggerName":"io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector","message":"Exception occurred while acquiring lock 'ConfigMapLock: ...-cluster-config-map (..)'",
We are running with flink version 1.15.1
and not with the flink k8s operator. We are not sure how to investigate into this issue and what the root cause could be.Krish Narukulla
02/27/2023, 10:29 PMopt/flink
would be added to flink class path
?
root@examplecode-6d4d45688f-x8k6n:/opt/flink# ls opt/
flink-azure-fs-hadoop-1.16.1.jar flink-gs-fs-hadoop-1.16.1.jar flink-s3-fs-hadoop-1.16.1.jar flink-sql-client-1.16.1.jar python
flink-cep-scala_2.12-1.16.1.jar flink-oss-fs-hadoop-1.16.1.jar flink-s3-fs-presto-1.16.1.jar flink-sql-gateway-1.16.1.jar
flink-gelly-1.16.1.jar flink-python-1.16.1.jar flink-shaded-netty-tcnative-dynamic-2.0.44.Final-15.0.jar flink-state-processor-api-1.16.1.jar
flink-gelly-scala_2.12-1.16.1.jar flink-queryable-state-runtime-1.16.1.jar flink-shaded-zookeeper-3.6.3.jar flink-table-planner_2.12-1.16.1.jar