Hi, I am trying to run a simple wordcount example ...
# troubleshooting
p
Hi, I am trying to run a simple wordcount example using Flink + Beam on kubernetes using Flink K8s Operator. I am getting this error and don't know what it means
org.apache.flink.client.deployment.application.ApplicationExecutionException: The application contains no execute() calls.
Could someone provide me a hint?
Copy code
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
  name: beamapp
spec:
  image: beamapp:latest
  imagePullPolicy: Always
  flinkVersion: v1_15
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "1"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder
    entryClass: "org.apache.flink.client.python.PythonDriver"
    args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/main.py"]
    parallelism: 1
    upgradeMode: stateless
main.py
Copy code
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

input_file = '/opt/flink/usrlib/kinglear.txt'
output_path = '/opt/flink/usrlib/output.txt'

pipeline_options = PipelineOptions(
    runner='FlinkRunner',
    project='my-project-id',
    job_name='unique-job-name',
)

def main():
    with beam.Pipeline(options=pipeline_options) as p:
        output = (p | 'Read lines' >> beam.io.ReadFromText(input_file)
              | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
              | 'Combine per element' >> beam.combiners.Count.PerElement()
              | 'Convert to string' >> beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
              | 'Write output to file' >> beam.io.WriteToText(output_path))
  
  
  
main()
Dockerfile
Copy code
FROM flink:1.15

# install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source, \
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.

RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
wget <https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz> && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Install dependencies
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

# add python script
USER flink
RUN mkdir /opt/flink/usrlib
RUN mkdir /opt/flink/usrlib/src
COPY . /opt/flink/usrlib/
requirements.txt
Copy code
apache-flink==1.15
apache_beam==2.27.0
d
The content inside
main.py
isn’t a PyFlink program, it’s a Beam program.
s
Hey @piby 180 - did you end up getting this working? We're also trying to get Python Beam <> Flink on K8s running.
p
@Stephen Chu No, we gave up on beam and started with Flink directly. Portability is not super important for us and we didn’t want to manage an additional wrapper on top of flink.
🙌 1