Hello, We're using `flink-kubernetes-operator` We...
# troubleshooting
a
Hello, We're using
flink-kubernetes-operator
We are occasionally encountering
org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted
errors on our podmanagers. This completely blocks the podmanager from restarting correctly and launching new tasks. This seems to happen sometimes when a podmanager pod is rotated by the kubernetes cluster. (via karpenter). Sometimes we do see the
SIGTERM
in the pod logs when they are rotated out, but sometimes not. (Could some logs not go through ?) We have tried setting up high-availability (although not sure if it would solve the issue), but the config does not seem to recognise the
kubernetes.jobmanager.replicas
value (Tried setting at 2, but it is still interpreted as 1). We have defined
high-availability.type: kubernetes
high-availability.storageDir
(and we can't explicitly set
kubernetes.cluster-id
it seems handled by the deployment. When we do the app does not start) Do you have any ideas what could be causing these instabilities ? Thanks. Our flink confs ⬇️
Copy code
flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
    kubernetes.operator.savepoint.history.max.age: "72 h"
    kubernetes.operator.savepoint.history.max.count: "3"
    kubernetes.operator.jm-deployment-recovery.enabled: "true"
    kubernetes.operator.cluster.health-check.enabled: "true"
    kubernetes.operator.job.restart.failed: "true"
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: 1m
    kubernetes.operator.job.autoscaler.metrics.window: 5m
    kubernetes.operator.job.autoscaler.target.utilization: "0.6"
    kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
    kubernetes.operator.job.autoscaler.restart.time: 2m
    kubernetes.operator.job.autoscaler.catch-up.duration: 5m
    pipeline.max-parallelism: "6"
    kubernetes.jobmanager.replicas: "2"
    kubernetes.jobmanager.tolerations: key:dedicated,operator:Equal,value:low-churn,effect:NoSchedule
Copy code
flink-conf.yaml: |+
      taskmanager.memory.process.size: 2048m
      jobmanager.memory.process.size: 2048m
      taskmanager.memory.flink.size: 1024m
      jobmanager.memory.flink.size: 1024m
      taskmanager.memory.managed.fraction: 0.6
      kubernetes.jobmanager.cpu.limit-factor: 2
      state.backend.type: rocksdb
      state.backend.incremental: true
      state.backend.local-recovery: false
      execution.checkpointing.max-concurrent-checkpoints: 1
      high-availability.type: kubernetes
      fs.s3a.aws.credentials.provider: com.amazonaws.auth.DefaultAWSCredentialsProviderChain
      fs.allowed-fallback-filesystems: s3
      high-availability.storageDir: <s3://my-bucket/recovery>
      state.savepoints.dir: <s3://my-bucket/savepoints>
      state.checkpoints.dir: <s3://my-bucket/checkpoints>
      process.working-dir: /tmp/workdir
a
Hi, I have exactly the same issue than you with the same config ( Flink Kubernetes operator , Karpenter , etc..) Sometime I am seeing the SIGTERM15 sometime not. Quick words about Karpenter, to me it's seems to not being the source of the issue since I had
Job has already been submitted
before our migration to Karpenter. I also tried to add the
high-availability
but did not solve the issue. However often this issue happen is when I release or rollback my flink Deployment that add or remove a Vertx ( new operator like flatmap etc..) . I dont know if that is correlated and I would need to test this hypothesis in my staging env. In order to restart correctly the job I have to do an
hard
restart ( via argocd ) and in this case I am loosing the Fault Tolerance and also the autoscaling configuration.. Which is pretty anoying Please if you have any advancement let me know πŸ™‚
a
Hello ! Did you have another pod-rotation tool before Karpenter ? I mentioned it, because in our case, it seems the issue occurs often whenever a podmanager is rotated out. (Karpenter is not necessarily the issue, but looks like the flink operator is not always able to gracefully handle restarts, despite us increasing the GracePeriod) I also asked the question with some details in the flink mailing lists. Got a few responses, but nothing that helped too much for now. High availability might be part of the issue since, since I couldn't reproduce without it yet. When you say you do a hard restart on ArgoCD, you mean you resync in replace-force mode ? Or something else ? Do you lose events / data when it happens ? We're still quite stuck on this issue, but will definitely update you if I find something interesting ! πŸ™‚