Anis Shaikh
07/24/2024, 5:45 PMjobId
of the Flink job.
2. Stopping the Job:
◦ Using the saved jobId
, I execute the stop API provided by Flink, which stops the job and creates a savepoint.
◦ The stop API returns a request-id
(triggerId
).
◦ Using this request-id
, I call /jobs/{flinkJobId}/savepoints/{triggerId}
to retrieve the savepoint path.
◦ I store this savepoint path for future use.
3. Resuming the Job:
◦ To resume the job, I execute the Flink run API /jars/{jarId}/run
with the necessary program arguments, including the --savepoint
path stored earlier.
Despite following this approach, the job is restarting from the beginning and reprocessing all records instead of resuming from where it left off.
I am seeking guidance on the following:
• Is my current approach correct?
• If not, where might I be going wrong?
• Any recommendations or best practices to ensure the job resumes processing from the last savepoint.
I appreciate any help or insights you can provide.D. Draco O'Brien
07/25/2024, 5:50 AMD. Draco O'Brien
07/25/2024, 5:53 AMAnis Shaikh
07/25/2024, 6:06 AMString stopJobEndpoint = "<http://10.0.0.19:32530/jobs/>" + flinkJobId + "/stop";
restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
Map<String, Object> response = restTemplate.postForObject(stopJobEndpoint, requestEntity, Map.class);
<http://log.info|log.info>("Stop job response: {}", response);
return (String) response.get("request-id");
SAVEPOINT PATH :
String savepointStatusEndpoint =
"<http://10.0.0.19:32530/jobs/>" + flinkJobId + "/savepoints/" + triggerId;
Map<String, Object> response = restTemplate.getForObject(savepointStatusEndpoint,
Map.class);
<http://log.info|log.info>("Savepoint status response: {}", response);
if ("COMPLETED".equals(statusId)) {
operation = (Map<String, Object>) response.get("operation");
savepointPath = (String) operation.get("location");
break;
} else if ("IN_PROGRESS".equals(statusId)) {
<http://log.info|log.info>("Savepoint creation is in progress...");
TimeUnit.SECONDS.sleep(2); // Check again after 2 seconds
}
return savepointPath;
RESUME JOB :
public void resumeFlinkJob(String flinkJobId, String savepointPath) {
String programArgs = String.format( "--json {} --savepoint %s --parallelism %d" , savepointPath, 10);
String resumeJobEndpoint = "<http://10.0.0.19:32530/jars/>" + jarId + "/run";
<http://log.info|log.info>("-----resume job endpoint-----: {}", resumeJobEndpoint);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("entryClass", "com.flink.FlinkApplication");
requestBody.put("programArgs", programArgs);
requestBody.put("parallelism", 10);
HttpEntity<Map<String, Object>> requestEntity = new HttpEntity<>(requestBody, headers);
restTemplate.setRequestFactory(new HttpComponentsClientHttpRequestFactory());
String resumeJobResponse = restTemplate.postForObject(resumeJobEndpoint, requestEntity,
String.class);
<http://log.info|log.info>("Flink job resumed with response: {}", resumeJobResponse);
}
D. Draco O'Brien
07/25/2024, 6:11 AMAnis Shaikh
07/25/2024, 6:12 AMD. Draco O'Brien
07/25/2024, 6:13 AM--json{}
does not conflict with any of the parameters that conflict or override savepoint operationsD. Draco O'Brien
07/25/2024, 6:14 AMAnis Shaikh
07/25/2024, 6:15 AMD. Draco O'Brien
07/25/2024, 6:17 AMD. Draco O'Brien
07/25/2024, 6:17 AMAnis Shaikh
07/25/2024, 6:19 AMD. Draco O'Brien
07/25/2024, 6:20 AMD. Draco O'Brien
07/25/2024, 6:22 AMD. Draco O'Brien
07/25/2024, 6:22 AMAnis Shaikh
07/25/2024, 6:23 AMD. Draco O'Brien
07/25/2024, 6:24 AMD. Draco O'Brien
07/25/2024, 6:25 AMAnis Shaikh
07/25/2024, 6:30 AMD. Draco O'Brien
07/25/2024, 6:33 AMAbhishek Joshi
08/02/2024, 4:42 AM