is that the case `pipeline.max-parallelism` can’t ...
# random
z
is that the case
pipeline.max-parallelism
can’t be updated? Getting the below error when updating the setting for my job.
Copy code
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator 77b992fbfa92ed1320e4d6cad773ae9f with max parallelism 128 to new program with max parallelism 256. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.
	at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
	at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator 77b992fbfa92ed1320e4d6cad773ae9f with max parallelism 128 to new program with max parallelism 256. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint.
	at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:190) ~[flink-dist-1.17.1.jar:1.17.1]
m
From https://nightlies.apache.org/flink/flink-docs-master/docs/ops/production_ready/#set-an-explicit-max-parallelism
There is currently no way to change the maximum parallelism of an operator after a job has started without discarding that operators state.
🙏 1
d
FWIW, the State Processor API can be used to rework a savepoint so that it has a higher max parallelism -- but it's not a trivial exercise, and may not be possible in some cases.
z
Is there a documentation about how to do that?
d
There is some documentation for the State Processor API. If you look here -- https://nightlies.apache.org/flink/flink-docs-stable/docs/libs/state_processor_api/#writing-new-savepoints -- you'll see an example showing how to create a new savepoint with a specific max parallelism. https://docs.immerok.cloud/docs/how-to-guides/development/migrating-state-away-from-kryo/ is an example showing how to use this API to migrate state from Kryo to another serializer. This is more difficult than changing the maximum parallelism, but you may find it helpful to start with a complete, end-to-end example, rather than the code snippets in the docs.
🙏 1