HI All, I have used flink-kuberentes-operator with...
# troubleshooting
n
HI All, I have used flink-kuberentes-operator with upgreadeMode: savepoint, but whenever deployment is restarted, everytime it creates a new jobId and try to restore using that newly create jobId savepoint path, which was not yet created. Resulting to get FileNotFoundException. Anyway how can I ask Flink not to create this new jobId, instead use static one to restore from savepoint path. Below are the logs:
Copy code
2023-09-13 10:59:00,320 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 3ca752ff5e242a8ff553d49e2579a146 is submitted.
2023-09-13 10:59:00,320 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=3ca752ff5e242a8ff553d49e2579a146.
2023-09-13 10:59:02,115 INFO  org.apache.flink.kubernetes.KubernetesResourceManagerDriver  [] - Recovered 0 pods from previous attempts, current attempt id is 1.
2023-09-13 10:59:02,115 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Recovered 0 workers from previous attempt.
2023-09-13 10:59:02,118 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Received JobGraph submission 'marketing-campaign-external-flink-awsqa' (3ca752ff5e242a8ff553d49e2579a146).
2023-09-13 10:59:02,118 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Submitting job 'marketing-campaign-external-flink-awsqa' (3ca752ff5e242a8ff553d49e2579a146).
2023-09-13 10:59:02,127 INFO  org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner [] - JobMasterServiceLeadershipRunner for job 3ca752ff5e242a8ff553d49e2579a146 was granted leadership with leader id 00000000-0000-0000-0000-000000000000. Creating new JobMasterServiceProcess.
2023-09-13 10:59:02,134 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at <akka://flink/user/rpc/jobmanager_2> .
2023-09-13 10:59:02,140 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Initializing job 'marketing-campaign-external-flink-awsqa' (3ca752ff5e242a8ff553d49e2579a146).
2023-09-13 10:59:02,222 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for marketing-campaign-external-flink-awsqa (3ca752ff5e242a8ff553d49e2579a146).
2023-09-13 10:59:02,245 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Created execution graph 9226eef4fce52095502263706ce44307 for job 3ca752ff5e242a8ff553d49e2579a146.
2023-09-13 10:59:02,253 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Running initialization on master for job marketing-campaign-external-flink-awsqa (3ca752ff5e242a8ff553d49e2579a146).
2023-09-13 10:59:02,254 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Successfully ran initialization on master in 0 ms.
2023-09-13 10:59:02,433 INFO  org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 1 new pipelined regions in 0 ms, total 1 pipelined regions currently.
2023-09-13 10:59:02,436 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=[/data/flink/state], enableIncrementalCheckpointing=UNDEFINED, numberOfTransferThreads=-1, writeBatchSize=-1}
2023-09-13 10:59:02,438 INFO  org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using predefined options: DEFAULT.
2023-09-13 10:59:02,439 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using application-defined state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=[/data/flink/state], enableIncrementalCheckpointing=FALSE, numberOfTransferThreads=4, writeBatchSize=2097152}
2023-09-13 10:59:02,439 INFO  org.apache.flink.runtime.state.StateBackendLoader            [] - State backend loader loads the state backend as EmbeddedRocksDBStateBackend
2023-09-13 10:59:02,440 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using job/cluster config to configure application-defined checkpoint storage: org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage@192e05a9
2023-09-13 10:59:02,920 WARN  org.apache.hadoop.metrics2.impl.MetricsConfig                [] - Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
2023-09-13 10:59:02,947 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl            [] - Scheduled Metric snapshot period at 10 second(s).
2023-09-13 10:59:02,947 INFO  org.apache.hadoop.metrics2.impl.MetricsSystemImpl            [] - s3a-file-system metrics system started
2023-09-13 10:59:03,028 WARN  org.apache.hadoop.util.NativeCodeLoader                      [] - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-09-13 10:59:06,151 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No checkpoint found during restore.
2023-09-13 10:59:06,152 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Starting job 3ca752ff5e242a8ff553d49e2579a146 from savepoint <s3://mkt-offline-store-qa/flink-savepointing/marketing-campaign-external-flink/savepoint-3ca752-d7b49e220c9e> ()
2023-09-13 10:59:06,221 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher     [] - Job 3ca752ff5e242a8ff553d49e2579a146 reached terminal state FAILED.
org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
	at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
	at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory '<s3://mkt-offline-store-qa/flink-savepointing/marketing-campaign-external-flink/savepoint-3ca752-d7b49e220c9e>' on file system 's3'.
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
	... 4 more
g
I don’t understand what the jobID generation logic has to do with your error. It simply states: `
Copy code
Cannot find checkpoint or savepoint file/directory '<s3://mkt-offline-store-qa/flink-savepointing/marketing-campaign-external-flink/savepoint-3ca752-d7b49e220c9e>' on file system 's3'.
this path has nothing to do with the jobid, it’s just the savepoint file path that should exist, but it doesn’t for some reason
n
Yeah correct Gyula, my bad. I did not compare the submitted job Id and savepoint path properly. Thanks for correcting. Still I wonder, from where the path is getting picked up, because same error I get every time when restart/redeployment happens.
g
during a restart/upgrade. The operator shuts down the application with stop-with-savepoint call
the resulting savepoint is recorded in the status (saved). It is then used to restore the job
could it be that for some reason the savepoint path you set can be written to but not read?
n
Might be that can be issue, but when i look back at s3, same is not present
Moreover, I can see it was written, but somehow getting deleted, so while reading not able to find it.
Copy code
2023-09-13 09:47:53,125 INFO  org.apache.flink.fs.s3.common.writer.S3Committer             [] - Committing flink-savepointing/marketing-campaign-external-flink/savepoint-3ca752-d7b49e220c9e/_metadata with MPU ID jtSeD5KVKTXhRVbIf1OhMDkCA8bnumK5trQL_1FB2dkDThDEYaiyBCSAEZVz1W36CuUgNuYlbZuT6urPdyQRKyiukShxFCtGGg5APgZzU5Zjx23937BqXT.ho39WtUzvHulLutyQk.xmBdETNU5z9e3Tt683l_NKrvJaMLWDzVg-
g
Flink itself would not delete it so something external may be doing it. This is generally a fairly simple / robust mechanism from Flink / Operator side
we haven’t seen similar issues, so I suspect something is going on in your env
n
Okay Thanks, let me figure out the possible reasons
HI When I looked more into it, tried redeploying flink multiple times after enough interval, But on each restart, still it's looking for same savepoint directory, which got deleted long back after very first restart only
Copy code
Caused by: java.io.FileNotFoundException: Cannot find checkpoint or savepoint file/directory '<s3://mkt-offline-store-qa/flink-savepointing/marketing-campaign-external-flink/savepoint-7d3aa2-40c908beb3c4>' on file system 's3'.
My Kuberentes-operator configs are:
Copy code
spec:
  restartNonce: 123
  flinkVersion: v1_16
  flinkConfiguration:
    kubernetes.operator.periodic.savepoint.interval: 10m
    kubernetes.operator.savepoint.history.max.count: "3"
job:
  jarURI: local:///opt/flink/usrlib/flink-job.jar
  parallelism: 10
  upgradeMode: savepoint
  savepointTriggerNonce: 123
a
hey Gyula, I tried the same approach to restore the data from savepoint when job got down and again restated, and I observed that when taskmanager went down and restarted at that time the restoration of data is happened from savepoint location but when jobmanager restarted at that time it was unable to pick data from savepoint location and unable to restore. Do you have suggestion on this? What we can do in this case?
g
please open a jira ticket with operator logs and Flink/Operator version info
👍 1