Hey guys, Is somebody using the Flink Kubernetes O...
# troubleshooting
j
Hey guys, Is somebody using the Flink Kubernetes Operator in Session mode with multiple jobs defined by CRD (PyFlink) ? I think there is some kind of race condition causing errors when 2 FlinkDeployment instances are being applied for the same session at the same time. One of the FlinkDeployment get submitted successfully, but the other one always get a DuplicateJobSubmissionException but eventually (few seconds later, after the first retry) succeed. I am running Flink 1.19 on the operator and the flink runtime
This is the log file from the JobManager
The dockerfile is trivial
from flink:1.19
RUN apt-get update -y && \
apt-get install -y python3.11 python3-pip net-tools vim git && \
rm -rf /var/lib/apt/lists/*
ENV PYTHONPATH="/opt/flink/usrlib/"
RUN ln -s /usr/bin/python3 /usr/bin/python
ADD --chown=flink:flink test.py /opt/flink/usrlib/
from pyflink.table import StreamTableEnvironment
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(stream_execution_environment=env)
table_env.execute_sql("""
CREATE TABLE datagen (
id INT,
data STRING
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '10000000'
)
""")
table_env.execute_sql("""
CREATE TABLE print (
id INT,
data STRING
) WITH (
'connector' = 'print'
)
""")
stmt_set = table_env.create_statement_set()
stmt_set.add_insert_sql("INSERT INTO print SELECT * FROM datagen")
stmt_set.execute()
The Pyflink job is very trivial too
I tried to protect my python flink env with FileLock. I am getting things like yflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath or other unrelated errors. I think I will just avoid using session mode for now.
The fun part is that even when I increase this SLEEP_BETWEEN_JOB_CREATE to 60, I still have the same issue but if I instanciate the CRDs one by one at a few seconds of interval, it all works without any error