1. Hi All , I have a flink pipeline which consumes...
# troubleshooting
p
1. Hi All , I have a flink pipeline which consumes data from Kafka and sink data to Kafka. In this pipeline i have enabled checkpoints. Below are the details of my flink pipeline. Flink version 1.16 , Checkpoint storage is external azure file share. ISSUE -- Upon upgrading the certificate used to connect to kafka , the flink taskmanager and jobmanager pod restarts but it still refers to old kafka certificate , thereby giving SSL error while trying to connect to kafka. For workaround we are editing the deployment everytime. But we would like to know if there is a proper solution to this problem
s
You can store the data of the certificates as the variable, pass the data to your script during runtime, create certificate out of it, and destroy once done. Thats what we do. Not sure if it is the correct approach. Or may be you can try changing the name of the certificate files on the AZURE FILE SHARE.
p
Thanks Gaurav for the inputs. But we are using azure key vaults for the storing the kafka certs as secrets. and then these secrets are referred in our deployment. So does it mean that these secrets are somehow stored in azure file share also ?
We use SSL in both brokers and kafka clients for authenticate and authorization, and rotates the certificates every fixed interval. Kafka producers and consumer cannot pick up the rotated certs when checkpoints are enabled. This causes stream processing interruption (e.g. flink connector does not handle ssl exception). And even after bouncing the pods both jobmanager and taskmanager we are seeing SSL exceptions. We need a proper solution to handle this.
s
You need to check the values in the secrets. If the values are stored in the secrets like I refer to AWS Secrets Manager/Parameter store, then just fetch the respective certificate secrets and follow the process as I have listed in my first comment. That will help you.
s
DM’d you in regards with azure
p
But Gaurav clarify me on the fetch part , as my pod which is referring to a secret but somehow due to checkpoint and state store its picking up the old value for secret even if i bounce the pods it still refers old secret value . I do not know if we can change the state related data that is stored manually.