hi guys, I'm currently dealing with flink-state-pr...
# troubleshooting
a
hi guys, I'm currently dealing with flink-state-processor api. I have some checkpoints from the past that using default maxParallelism settings. We had a change in our app and now we need to run it with maxParallelism = 1 setting. Due to this, our old checkpoints are not compatile with the new changes, we are getting:
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator ab78513f3423aaef9b9d527c6a07d3a5 with max parallelism 128 to new program with max parallelism 1. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.
Thats why I need to use state-processor-api, and try to set maxParallelism to 1 this way.
Copy code
StateBootstrapTransformation transformation = OperatorTransformation
    .bootstrapWith(bootstrapWith)
    .setMaxParallelism(1)
    .transform(transform);
This is failing as: OperatorSubtaskStateReducer constructor has a precondition there.
Copy code
Preconditions.checkState(maxParallelism > 1);
Can you please help me understand why do we have this precheck here, and advice how I could migrate our maxParallelism = 128 checkpoints to maxParallelism = 1?
m
I'm not sure I understand why you need to change your maxParallelism, if you want to lower it. Why wouldn't you just run your application with a parallelism of 1?
a
hi Martijn, thank you for responding to me. Actually, I run my application with maxParallelism = 1, however my old checkpoints are created, when this maxParallelism was default (128) now, when I start the application with maxParallelism = 1, with checkpoints that have maxParallelism = 128, I get the exception above so, I thought I can migrate my checkpoints with state-processor-api, but when I want to set to the checkpoint to be maxParalellism 1, it fails
m
What is that operator doing? Because rescaling should be possible for all checkpoints and savepoints, per https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpoints_vs_savepoints/
a
it is storing in the ListState the incoming events, until another needed event arrives, so then it moves all the ones stored in ListState down the line with ctx.collect(...) etc.
m
What's the error that you're getting when trying to run the same program with a different parallelism?
a
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator ab78513f3423aaef9b9d527c6a07d3a5 with max parallelism 128 to new program with max parallelism 1. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.
the error coming from here Flink 1.16.1
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint
heres the whole trace
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator ab78513f3423aaef9b9d527c6a07d3a5 with max parallelism 128 to new program with max parallelism 1. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.
at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1798)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:214)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:189)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
m
But this looks like the change you've made in your program introduces a max parallelism for your operator?
Of 1?
It feels like you're trying to do something weird in your changed operator, which causes you to go down the changing maxParallelism route. But you should just fix the issue in your changed operator and then run your entire application with a different parallelism, instead of changing the max parallelism of your entire app
a
yes, actually that change includes setMaxParallelism(1), will try to change this to only use setParalellism(1) and see what it changes thank you so far Martijn!
hi @Martijn Visser Thank you for your help so far, I think it was a step forward. I just removed all the maxParallelism(1) setting, modified to parallelism(1) and the app was able to start, however I got a different exception, also the error msg included a ticket number Error msg: Ticket: Caused by: java.lang.UnsupportedOperationException: Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920 https://issues.apache.org/jira/projects/FLINK/issues/FLINK-25920 Do I see correctly, that there is no solution for this yet? Do you guys have any workaround? FYI: @nick christidis
m
Did you stop your job with drain active?
a
No, we do not stop our job by using flink rest api, as we are using the flink operator
FYI: Now I tried to start my app with non-migrated checkpoint. So I skipped the usage of state-processor-api, despite our state definition has been changed. With this incompatible checkpoint, the app was able to start properly without this error
πŸ‘ 1
but, I still need help for this modified checkpoint since our state structure has been changed, and we dont want to lose any data so any idea for the error above?