Hi everyone, I am currently working on implementin...
# troubleshooting
a
Hi everyone, I am currently working on implementing APIs to stop and resume Flink jobs using savepoints as per the official Flink documentation. Below is an outline of my approach: 1. Job Initialization: ◦ On starting the job, I save the
jobId
of the Flink job. 2. Stopping the Job: ◦ Using the saved
jobId
, I execute the stop API provided by Flink, which stops the job and creates a savepoint. ◦ The stop API returns a
request-id
(
triggerId
). ◦ Using this
request-id
, I call
/jobs/{flinkJobId}/savepoints/{triggerId}
to retrieve the savepoint path. ◦ I store this savepoint path for future use. 3. Resuming the Job: ◦ To resume the job, I execute the Flink run API
/jars/{jarId}/run
with the necessary program arguments, including the
--savepoint
path stored earlier. Despite following this approach, the job is restarting from the beginning and reprocessing all records instead of resuming from where it left off. I am seeking guidance on the following: • Is my current approach correct? • If not, where might I be going wrong? • Any recommendations or best practices to ensure the job resumes processing from the last savepoint. I appreciate any help or insights you can provide.
d
Seems correct based on the standard procedures to stop and resume Flink jobs using savepoints. Regarding the job to restarting from the beginning instead of resuming from the savepoint: 1. Savepoint vs. Checkpoint. Ensure that you are indeed creating a savepoint and not confusing it with checkpoints. Savepoints are for long-term storage and intended for job upgrades or migrations, whereas checkpoints are for failure recovery and may not allow job modifications between stopping and restarting.
2. correct usage of --savepointPath: When submitting the job to resume, check you are passing the savepoint path via the command-line argument. Make sure there are no typos and the path is complete and accessible.
a
Hey Draco first-of -all thanks for you reply. When I hit a stop flink job API at that point I get a request-id. With that request-id I am getting the savepointPath form /jobs/{jobId}/savepoints/{request-id} API. And this savepointPath I am using in program arguments to resume the same flink job from. code snippet : STOP API :
Copy code
String stopJobEndpoint = "<http://10.0.0.19:32530/jobs/>" + flinkJobId + "/stop";
restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
Map<String, Object> response = restTemplate.postForObject(stopJobEndpoint, requestEntity, Map.class);
<http://log.info|log.info>("Stop job response: {}", response);
return (String) response.get("request-id");
SAVEPOINT PATH :
Copy code
String savepointStatusEndpoint =
    "<http://10.0.0.19:32530/jobs/>" + flinkJobId + "/savepoints/" + triggerId;
Map<String, Object> response = restTemplate.getForObject(savepointStatusEndpoint,
    Map.class);
<http://log.info|log.info>("Savepoint status response: {}", response);
if ("COMPLETED".equals(statusId)) {
          operation = (Map<String, Object>) response.get("operation");
          savepointPath = (String) operation.get("location");
          break;
        } else if ("IN_PROGRESS".equals(statusId)) {
          <http://log.info|log.info>("Savepoint creation is in progress...");
          TimeUnit.SECONDS.sleep(2); // Check again after 2 seconds
}
return savepointPath;
RESUME JOB :
Copy code
public void resumeFlinkJob(String flinkJobId, String savepointPath) {
  String programArgs = String.format( "--json {} --savepoint %s --parallelism %d" , savepointPath, 10);
  String resumeJobEndpoint = "<http://10.0.0.19:32530/jars/>" +    jarId + "/run";
  <http://log.info|log.info>("-----resume job endpoint-----: {}",  resumeJobEndpoint);
  HttpHeaders headers = new HttpHeaders();
  headers.setContentType(MediaType.APPLICATION_JSON);

  Map<String, Object> requestBody = new HashMap<>();
  requestBody.put("entryClass", "com.flink.FlinkApplication");
  requestBody.put("programArgs", programArgs);
  requestBody.put("parallelism", 10);

  HttpEntity<Map<String, Object>> requestEntity = new HttpEntity<>(requestBody, headers);
    restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
    String resumeJobResponse = restTemplate.postForObject(resumeJobEndpoint, requestEntity,
        String.class);
    <http://log.info|log.info>("Flink job resumed with response: {}", resumeJobResponse);
}
d
ok did you check that this path exists and has expected value: /jobs/{jobId}/savepoints/{request-id}
a
On your 2nd point if I am correctly using savepoint path or not. I have given a savepoint path in deployment file and the savepoints are getting stored in that path only. But the path I am getting when hitting /savepoints/{request-id} api it is slightly different. Deployment file : state.savepoints.dir: file:///flink-basic/savepoints state.checkpoints.dir: file:///flink-basic/checkpoints In code I am getting path as : 2024-07-24T163314.376+05:30 INFO 37796 --- [nio-8082-exec-2] c.a.m.i.s.service.impl.JobServiceImpl : ----------Savepoint path---------: file:/flink-basic/savepoints/savepoint-d9f250-2766bc73c165 Is this a issue of job not resuming from savepoint?
d
arg parameter passing looks ok but just make sure that
Copy code
--json{}
does not conflict with any of the parameters that conflict or override savepoint operations
3. Check that job configuration matches when resuming from a savepoint. This includes entry class, parallelism, and any other params passed to the job
a
ok did you check that this path exists and has expected value: /jobs/{jobId}/savepoints/{request-id} ::: Yes expected values are only here.
d
4. check that state.backend.savepoint.compatibility-mode is correctly set for your version of flink
if you have done an upgrade for example
a
Whatever entry class, parallelism and arguments were used to start the job before stopping, I am using exactly same only just --savepointPath was added for resuming job again.
d
5. check for state compatibility. If you have changed jobs code that affects its state ie type of a state variable for example. This might keep flink from being able to restore the state properly. Make sure that code your running when resuming is compatible with the state in the savepoint.
6) check triggerid. Path seems ok but be sure that trigger id your using to fetch the savepoint path corresponds exactly to the successful savepoint operation and not a failed or partial one.
Other than these things I think you want to enable full logging/tracing to see carefully for error messages.
a
I need to check this parameter : state.backend.savepoint.compatibility-mode. I am not sure that exactly it is.
d
its more relevant if you did like an upgrade of flink versions or something like that. Not sure but might toggle it see if it makes a difference (depending on your env) be careful in production of course
publish as much as you can about messages when you start job and be sure to enable detailed logging/tracing
a
Okay flink version is same we are using 1.17 I will look for state compatibility. Thanks again.
d
Yep let us know what you find
👍 1
a
Hey valo, Have you used upgradeMode as savepoint while resuming from savepointPath?