Hi, I just want to start a pyflink example that yo...
# troubleshooting
m
Hi, I just want to start a pyflink example that you can see on the apache flink official website: https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/datastream/connectors.html#kafka-with-csv-format When I start this example, producing data to Kafka topic is done successfully. But an error occurred when consuming. This is my error:
Copy code
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
.
.
.
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness
This is my question on the StackOverflow website: https://stackoverflow.com/questions/75976664/reading-from-kafka-with-pyflink-not-working
d
@mohammadreza khedri Could you share the code of the job which consumes the Kafka data?
m
@Dian Fu These are all things I do now. First I start a Kafka cluster Second I produce some data on the topic as you can see in the source code. Third I consume data from that topic and I got errors.
d
From the exception message, there should be Python UDFs in the job. However, actually there are no Python UDFs in the link you shared and so I’m a little confused where the exception comes from.
m
@Dian Fu Do you know any other resources for testing producing to Kafka with
pyflink
. I just want to learn and run it properly.
d
It supports writing to Kafka in either Table API program(see https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/python/table/intro_to_table_api/#emit-results-to-one-sink-table and https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/ for more details) or DataStream API program (which you are currently working on).
@mohammadreza khedri It would be great if you could share the code which you are testing with. It may be helpful to identify the problem.
m
@Dian Fu I'm really thankful. My code is the exact thing that you saw in the link. I just wanted to test and learn it and after that implement my case.
d
@mohammadreza khedri Could you share the whole log file? See https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/debugging/#accessing-logs on how to find the log file.
m
@Dian Fu There are very long logs. Do you want to share just causes here?
d
@mohammadreza khedri The exception itself and also a few logs(e.g. 100 lines or so) before it.
m
Copy code
for worker id 4-1
2023-04-11 12:01:24,935 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> Map -> Sink: Print to Std. Out (12/12)#0 (29ed3a6db3eb632dddb259a915296fa9_cbc357ccb763df2852fee8c4fc7d55f2_11_0) switched from INITIALIZING to FAILED with failure cause:
java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness: D:\charisma\projects\apache-flink\pyflink\flink-env\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=9-1 --provision_endpoint=localhost:65496
INFO:root:Starting up Python harness in loopback mode.

	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:639) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:274) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator.open(AbstractExternalDataStreamPythonFunctionOperator.java:85) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:117) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) [flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.0.jar:1.17.0]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:637) ~[flink-python-1.17.0.jar:1.17.0]
	... 15 more
Caused by: java.lang.IllegalStateException: Process died with exit code 0
	at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:637) ~[flink-python-1.17.0.jar:1.17.0]
	... 15 more
2023-04-11 12:01:24,935 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> Map -> Sink: Print to Std. Out (9/12)#0 (29ed3a6db3eb632dddb259a915296fa9_cbc357ccb763df2852fee8c4fc7d55f2_8_0) switched from INITIALIZING to FAILED with failure cause:
java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness: D:\charisma\projects\apache-flink\pyflink\flink-env\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=9-1 --provision_endpoint=localhost:65496
INFO:root:Starting up Python harness in loopback mode.

	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:639) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:274) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator.open(AbstractExternalDataStreamPythonFunctionOperator.java:85) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:117) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64) ~[flink-python-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) [flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.0.jar:1.17.0]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.0.jar:1.17.0]
	at java.lang.Thread.run(Thread.java:829) [?:?]
@Dian Fu Slack doesn't allow me to share more than specific lines of code.
d
@mohammadreza khedri Is there suspicious log before the above exception?
m
@Dian Fu In my view, No.
@Dian Fu I tyhink I found the problem. Here is your answer in the StackOverflow: https://stackoverflow.com/questions/72028346/pyflink-with-kafka-java-lang-runtimeexception-failed-to-create-stage-bundle-fac My error is exact things that the questioner have mentioned.
But it doesn't work by adding
os.environ["_python_worker_execution_mode"] = "process"
to the python code yet
d
@mohammadreza khedri From the exception message, there is no useful information. I will find a windows machine and see if I can reproduce this issue.
m
@Dian Fu Thank you for your trying. I create a new docker image from
flink-1.17.0
and add all
pyflink
requirements and modules for the new
pyflink-1.17.0
image. I create a docker-compose file with these services:
Copy code
broker
zookeeper
jobmanager
taskmanager
So all things are now on Linux. I'm using all examples that are currently you can see them on the link official website and running theme with
flink run
cli in the
jobmanager
container. but I have some issues yet. When I run an example, It's submitted truly and I can see it in the Flink web UI, but all of them get into trouble.
I just changed the path of my
jar
file and broker address to
broker:9092
I'm sure about the networking of all these containers.
Error message about the above theme:
Copy code
Failed to send data to Kafka: Failed to send data to Kafka: Expiring 7 record(s) for test_csv_topic-0:120000 ms has passed since batch creation
@Dian Fu Currently, I know that I have prevented Windows thing issues. The above error is something else.
d
@mohammadreza khedri It seems like network issues. Have you figured out the root cause?
m
@Dian Fu Oh God, I finally figured out the problem. I had an env in my broker service in the docker-compose file like the one below:
Copy code
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_<HOST://localhost:9092>
I saw an example from my search that said you should add
<PLAINTEXT://broker:9092>
to broker and I just change the above line to the following line:
Copy code
KAFKA_ADVERTISED_LISTENERS: <PLAINTEXT://broker:9092>
And it worked.
@Dian Fu Thanks a bunch for your help.
👍 1
d
Does the example
read_from_kafka
also work?
m
@Dian Fu But as far as I understand these days. Flink has some problems with the Windows environment.
d
@mohammadreza khedri I saw quite a few users running PyFlink on windows. I guess there is a bug which makes it doesn’t work in certain conditions. I will try to see if I could reproduce it to be able to dig into this problem.
m
Writing working fine, But reading has a little issue and I think it's not very important and I can fix it after some playing with flink For example this error:
Copy code
aused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unexpected character (',' (code 44)): Expected space separating root-level values
 at [Source: UNKNOWN; line: 1, column: 3]
🎉 1
@Dian Fu Yes. Are you
apache flink
committer?
d
Linux is indeed the most well-tested platform as the CI tests are currently running on linux.
Are you
apache flink
committer?
Yes. 😁
m
Very Nice. Can you give me some references to learning this tool completely and be a committer in the future? I really love this tool and I want to help and participate in growing that.
@Dian Fu Resource or reference or any advice. Thanks in advance.
d
@mohammadreza khedri You can start by • Reading the documentation to get more understanding of PyFlink: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/python/overview/ • Take a look at PyFlink code: https://github.com/apache/flink/tree/master/flink-python After that, you could start with some minor issues. There are many open issues: https://issues.apache.org/jira/browse/FLINK-31172?jql=project%20%3D%20FLINK%20AND%20s[…]3D%20Open%20AND%20component%20%3D%20%22API%20%2F%20Python%22 I guess you could start with the following issue: Support PubSub Connector: https://issues.apache.org/jira/browse/FLINK-28022 Maybe you could also help to dig into the current problem you have encountered since you could reproduce it~
m
@Dian Fu So we can take a task and work on it. There isn't any plan or something like this for that?
d
@mohammadreza khedri Most functionalities have already been supported in PyFlink. Besides continuing to align with the Java API for the remaining functionalities, in the next steps, the most important things in my mind is to improve the usability, this may include many minor things, however, very important for users.
Usability: • Improve the API documentation: Currently it lacks examples for many API (e.g. https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/data_stream.py#L273) on how they could be used. I think we need to go though all the APIs to make sure they are carefully documented. • Improve the exception: Sometimes the exception message are not very useful (the root cause may be hidden for some reason), just like what you have encountered in this thread. This is very harmful for beginners and we should improve this. • Adding more examples (that’s https://github.com/apache/flink/tree/master/flink-python/pyflink/examples): Currently the examples are very limited and this also add burdens for beginners. • Improve the test utilities to make it possible for users to write unit tests more easy (https://github.com/dianfu/pyflink-faq/tree/main/testing) Function wise: • Support AsyncIO in Python DataStream API (https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/operators/asyncio/) • https://issues.apache.org/jira/browse/FLINK-31532https://issues.apache.org/jira/browse/FLINK-31172
💡 1
m
@Dian Fu Thank you Dian. I have been reading
apache flink
from 2 week ago. And I want to move forward as you have described my roadmap clearly. First, It's better I start from reading documentation.
d
@mohammadreza khedri Welcome~. After reading though the documentation, you could contribute starting with some minor tasks as mentioned above. Feel free to ping me for code review or any other things I could help ~
🙏 1
m
@Dian Fu Please tell me every things that suppose to help me to be a good contributor.
@Dian Fu I mean when you suppose to review my pull requests. By for now Dian. I bothered you in these days.
I mean when you suppose to review my pull requests.
Sure~ :)
🙏 1