Hey, we have a flinkapp, that we want to scale to more than 128 parallelism, because the default max parallelism is 128, if we want to change to anything above 128, we need to update the maxParalellism, then I realised changing maxParalleism is a backward incompatible change which means we won't be able to restore from previous state if we do so. Unless we use state process API to read the state, update the max parallelism, write to a new state, and restore from the new state.
So we started to look at state processor API, but I found very little information online on how it works. I'm hoping to get some help from anyone who's experienced with the State Processor API. Our flink app that has a FlinkKafkaConsumer source and 3 StreamingFileSinks, those are flink built in operators, and they both have states, from my understanding, if I want to modify the max parallelism, then I'll need to :
1. Read from existing states from both operators of the existing state.
2. Create StateBootstrapTransformation for each operator (1 source, 3 sinks)
3. Update the max parallelism and write a new savepoint with the transformations
4. Restore from the new savepoint.
Not sure if the above steps are correct, please correct me if things don't work this way. I was trying to figure out how to read the existing states from the KafkaConsumer source and StreamingFile sink states, and have a few questions:
1. For the kafka consumer source, we found in the
flink source code that it initialise the state as UnionListState, and the type seems to be Tuple2<KafkaTopicPartition, Long>, but I did not see a processFunction that the kafkaConsumer implements to define how the element is processed (how the state is updated), if we want to write a transformation for this state, should I read the state as UnionListState, and write our own StateBootstrapFunction to write it? Or is there a better way to do it?
2. For the s3 sink, it's using the
StreamingFileSink, it seems to have two states: bucketState(byte[]) and maxPartCounterState(Long), and because we have 3 different sinks and we were using it with KeyBy, should we deal these as keyedState instead of opertorStates?
If we should treat this as keyed state, as I don't see any KeyedProcessFunctions defined in the StreamingFileSink either, would it work?
Or is there a better way for us to modify the max parallelism of the existing checkpoints/savepoints without doing all these?