Hey, we have a flinkapp, that we want to scale to ...
# troubleshooting
e
Hey, we have a flinkapp, that we want to scale to more than 128 parallelism, because the default max parallelism is 128, if we want to change to anything above 128, we need to update the maxParalellism, then I realised changing maxParalleism is a backward incompatible change which means we won't be able to restore from previous state if we do so. Unless we use state process API to read the state, update the max parallelism, write to a new state, and restore from the new state. So we started to look at state processor API, but I found very little information online on how it works. I'm hoping to get some help from anyone who's experienced with the State Processor API. Our flink app that has a FlinkKafkaConsumer source and 3 StreamingFileSinks, those are flink built in operators, and they both have states, from my understanding, if I want to modify the max parallelism, then I'll need to : 1. Read from existing states from both operators of the existing state. 2. Create StateBootstrapTransformation for each operator (1 source, 3 sinks) 3. Update the max parallelism and write a new savepoint with the transformations 4. Restore from the new savepoint. Not sure if the above steps are correct, please correct me if things don't work this way. I was trying to figure out how to read the existing states from the KafkaConsumer source and StreamingFile sink states, and have a few questions: 1. For the kafka consumer source, we found in the flink source code that it initialise the state as UnionListState, and the type seems to be Tuple2<KafkaTopicPartition, Long>, but I did not see a processFunction that the kafkaConsumer implements to define how the element is processed (how the state is updated), if we want to write a transformation for this state, should I read the state as UnionListState, and write our own StateBootstrapFunction to write it? Or is there a better way to do it? 2. For the s3 sink, it's using the StreamingFileSink, it seems to have two states: bucketState(byte[]) and maxPartCounterState(Long), and because we have 3 different sinks and we were using it with KeyBy, should we deal these as keyedState instead of opertorStates? If we should treat this as keyed state, as I don't see any KeyedProcessFunctions defined in the StreamingFileSink either, would it work? Or is there a better way for us to modify the max parallelism of the existing checkpoints/savepoints without doing all these?
d
For the source, since the offsets are committed back to Kafka as a part of taking a checkpoint/savepoint, I think you can manage this switchover by relying on the kafka broker to be the source of truth during the restart after the upgrade. I believe the following will work, but you'll want to test this: Do a stop with savepoint operation to create the savepoint you'll use for the restart. Change the uid on the source operator so that its state can't be restored, and execute the restart while allowing for non-restored state. Meanwhile, set the config of the kafka consumer so that it starts from the committed offsets. There may be a similar way to hack the sinks, so that you don't really need their states, but I'm not sure.
e
Hey @David Anderson, thanks for the reply. As work around, we were thinking to start another flinkapp in the same namespace without state (with higher parallelism) but just write to different folders in s3 (so not the real sink that we are using), so we will have app 1 (the lower max parallelism one that is currently in production, and writing to the real sink), app 2 (with higher max parallelism, write to a different folder), and have both app running at the same time. And we will introduce a feature flag in the code, so that when the flag is on (with a timestamp), we will switch the two apps at exactly the specified timestamp, then app 2 will start writing to the real sink and app1 will stop writing to the real sink. We've done similar migration before and it worked for us. Do you think this would work? And as for the state processor API, out of curiosity do you think it's not that straight forward to implement the logic of reading the existing states from the kafkaConsumerSource and StreamingFileSink and override the max parallelism? Is that the reason you suggested workaround instead?
d
Yes, I think the workaround is easier.