Hi All, I have a quick question, I am deploying Fl...
# troubleshooting
s
Hi All, I have a quick question, I am deploying Flink apps using k8 operator, the app code is using
table-api
though Flink SQL, when I changed the
instance-type
and
parallelism
for the Flink app, it failed to restore itself from the savepoint it took earlier, error
Copy code
ava.lang.RuntimeException: Error while getting state
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:109)
	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:232)
	at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasUniqueKey.<init>(JoinRecordStateViews.java:132)
	at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasUniqueKey.<init>(JoinRecordStateViews.java:112)
	at org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:61)
	at org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:97)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
	at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@b5c7d7d5) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@2f21ec15).
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:755)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883)
	at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870)
	at org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47)
	at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:70)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
	at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
	at org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:106)
	... 15 more
I did not make any change to the code itself, do you know why this might be happening.
Copy code
Flink version: 1.18.1
k8 operator: 1.8.0
d
Given no code changes and are still encountering this issue after adjusting instance-type and parallelism through the Kubernetes operator, let’s focus on the indirect factors that could influence serialization compatibility: 1. Even without explicit code modifications, transitive dependencies of your project could have been updated, potentially affecting serialization. Check if any updates to libraries related to data types or serialization that are Flink dependencies 2. Review if any accidental changes to Flink’s configuration related to state handling or serialization defaults in your deployment YAML files or Helm charts. Small differences in configurations between the environment where the savepoint was generated and the current one could cause what you’re seeing. 3. The Flink Kubernetes Operator itself might have its own default configurations or could have been updated to a new version with different defaults. Check that the operator version you’re using doesn’t introduce any new default settings that could affect state compatibility. 4. While not a direct code modification, altering parallelism can fundamentally change how state is partitioned and distributed across tasks. If the state includes maps with keys that aren’t uniformly distributed, changing parallelism could lead to different keys ending up on different tasks, potentially exposing serialization inconsistencies that were otherwise not triggered before. 5. Double-check that the combination of Flink version 1.18.1 and the Kubernetes operator 1.8.0 is fully supported and does not introduce any known compatibility issues regarding state handling or serialization.
gratitude thank you 1
s
Thanks @D. Draco O'Brien for the comprehensive list, this gives me set of good pointers for investigation
d
Sure additionally some possible remediation steps include doing a rollback test. As a diagnostic step, consider rolling back to the previous instance type and parallelism configuration to see if the issue persists. This can help isolate whether the changes are responsible.
You should also inspect the logs of the Flink Kubernetes operator for any clues about automatic configuration adjustments or issues during deployment. Even without explicit code alterations, env and config changes can still impact how Flink applications behave esp. if you’re dealing with stateful processing
s
That makes sense!
d
Yep, let us know what you find and if you have any additional questions
s
absolutely! I think there might be a possibility, that a config for the Flink app, beside
instance-type
and
parallelism
, might have changed, I am trying to zero in on that, appreciate the help!
s
I am also facing this issue. To narrow down the issue i stopped using the Kubernetes operator and started the application with without operator but still getting the error. Will post if i find something.