mohammadreza khedri
04/10/2023, 11:43 AMraise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
.
.
.
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness
This is my question on the StackOverflow website:
https://stackoverflow.com/questions/75976664/reading-from-kafka-with-pyflink-not-workingDian Fu
04/10/2023, 11:50 AMmohammadreza khedri
04/10/2023, 12:50 PMDian Fu
04/10/2023, 12:53 PMmohammadreza khedri
04/10/2023, 1:03 PMpyflink. I just want to learn and run it properly.Dian Fu
04/10/2023, 1:08 PMDian Fu
04/10/2023, 1:09 PMmohammadreza khedri
04/10/2023, 1:11 PMDian Fu
04/11/2023, 8:15 AMmohammadreza khedri
04/11/2023, 8:37 AMDian Fu
04/11/2023, 8:42 AMmohammadreza khedri
04/11/2023, 9:53 AMfor worker id 4-1
2023-04-11 12:01:24,935 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Map -> Sink: Print to Std. Out (12/12)#0 (29ed3a6db3eb632dddb259a915296fa9_cbc357ccb763df2852fee8c4fc7d55f2_11_0) switched from INITIALIZING to FAILED with failure cause:
java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness: D:\charisma\projects\apache-flink\pyflink\flink-env\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=9-1 --provision_endpoint=localhost:65496
INFO:root:Starting up Python harness in loopback mode.
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:639) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:274) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator.open(AbstractExternalDataStreamPythonFunctionOperator.java:85) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:117) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.0.jar:1.17.0]
at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:637) ~[flink-python-1.17.0.jar:1.17.0]
... 15 more
Caused by: java.lang.IllegalStateException: Process died with exit code 0
at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:110) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:637) ~[flink-python-1.17.0.jar:1.17.0]
... 15 more
2023-04-11 12:01:24,935 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Custom Source -> Map -> Sink: Print to Std. Out (9/12)#0 (29ed3a6db3eb632dddb259a915296fa9_cbc357ccb763df2852fee8c4fc7d55f2_8_0) switched from INITIALIZING to FAILED with failure cause:
java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness: D:\charisma\projects\apache-flink\pyflink\flink-env\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=9-1 --provision_endpoint=localhost:65496
INFO:root:Starting up Python harness in loopback mode.
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:639) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:274) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalPythonFunctionOperator.open(AbstractExternalPythonFunctionOperator.java:57) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalDataStreamPythonFunctionOperator.open(AbstractExternalDataStreamPythonFunctionOperator.java:85) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:117) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64) ~[flink-python-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.0.jar:1.17.0]
at java.lang.Thread.run(Thread.java:829) [?:?]mohammadreza khedri
04/11/2023, 9:54 AMDian Fu
04/11/2023, 12:15 PMmohammadreza khedri
04/16/2023, 7:35 AMmohammadreza khedri
04/16/2023, 8:15 AMmohammadreza khedri
04/16/2023, 8:27 AMos.environ["_python_worker_execution_mode"] = "process" to the python code yetDian Fu
04/17/2023, 8:05 AMmohammadreza khedri
04/17/2023, 9:59 AMflink-1.17.0 and add all pyflink requirements and modules for the new pyflink-1.17.0 image.
I create a docker-compose file with these services:
broker
zookeeper
jobmanager
taskmanager
So all things are now on Linux.
I'm using all examples that are currently you can see them on the link official website and running theme with flink run cli in the jobmanager container. but I have some issues yet.
When I run an example, It's submitted truly and I can see it in the Flink web UI, but all of them get into trouble.mohammadreza khedri
04/17/2023, 10:00 AMmohammadreza khedri
04/17/2023, 10:02 AMjar file and broker address to broker:9092
I'm sure about the networking of all these containers.mohammadreza khedri
04/17/2023, 10:08 AMFailed to send data to Kafka: Failed to send data to Kafka: Expiring 7 record(s) for test_csv_topic-0:120000 ms has passed since batch creationmohammadreza khedri
04/17/2023, 10:10 AMDian Fu
04/17/2023, 11:39 AMmohammadreza khedri
04/17/2023, 12:38 PMKAFKA_ADVERTISED_LISTENERS: PLAINTEXT_<HOST://localhost:9092>
I saw an example from my search that said you should add <PLAINTEXT://broker:9092> to broker and I just change the above line to the following line:
KAFKA_ADVERTISED_LISTENERS: <PLAINTEXT://broker:9092>
And it worked.mohammadreza khedri
04/17/2023, 12:40 PMDian Fu
04/17/2023, 12:41 PMread_from_kafka also work?mohammadreza khedri
04/17/2023, 12:42 PMDian Fu
04/17/2023, 12:47 PMmohammadreza khedri
04/17/2023, 12:49 PMaused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unexpected character (',' (code 44)): Expected space separating root-level values
at [Source: UNKNOWN; line: 1, column: 3]mohammadreza khedri
04/17/2023, 12:51 PMapache flink committer?Dian Fu
04/17/2023, 12:51 PMDian Fu
04/17/2023, 12:52 PMAre youYes. 😁committer?apache flink
mohammadreza khedri
04/17/2023, 12:55 PMmohammadreza khedri
04/17/2023, 12:57 PMDian Fu
04/17/2023, 1:13 PMmohammadreza khedri
04/17/2023, 1:16 PMDian Fu
04/18/2023, 1:52 AMDian Fu
04/18/2023, 2:36 AMmohammadreza khedri
04/18/2023, 6:40 AMapache flink from 2 week ago. And I want to move forward as you have described my roadmap clearly.
First, It's better I start from reading documentation.Dian Fu
04/18/2023, 6:46 AMmohammadreza khedri
04/18/2023, 6:55 AMmohammadreza khedri
04/18/2023, 6:58 AMDian Fu
04/18/2023, 7:02 AMDian Fu
04/18/2023, 7:03 AMI mean when you suppose to review my pull requests.Sure~ :)