Tiago Pereira
05/06/2025, 9:38 PMPhilipp
05/08/2025, 12:40 PMTiago Pereira
05/08/2025, 1:21 PMTiago Pereira
05/08/2025, 1:21 PMTiago Pereira
05/08/2025, 1:22 PMTiago Pereira
05/08/2025, 3:23 PMRobert Murray
05/11/2025, 9:38 PMRobert Murray
05/11/2025, 9:44 PM2025-05-11 21:24:01,426 INFO org.apache.flink.client.python.PythonDriver [] - py4j.protocol.Py4JJavaError: An error occurred while calling o3.execute.
: java.net.MalformedURLException: no protocol: ['file:/opt/flink/opt/flink-python-2.0.0.jar', 'file:/opt/flink/opt/flink-python-2.0.0.jar']
2025-05-11 21:24:01,426 INFO org.apache.flink.client.python.PythonDriver [] - : java.net.MalformedURLException: no protocol: ['file:/opt/flink/opt/flink-python-2.0.0.jar', 'file:/opt/flink/opt/flink-python-2.0.0.jar']
It appears related to the fix I mention above. 2.0.1 is still in SNAPSHOT I'm looking for a solution to apply to a officially release version of flink. Thanks!Maxim Matveyev
05/13/2025, 7:56 PMGeorge Leonard
05/17/2025, 3:29 PMshivank chaturvedi
05/30/2025, 6:25 AMNaveen Kumar
05/30/2025, 10:40 AMNaveen Kumar
05/30/2025, 10:46 AMמייקי בר יעקב
06/01/2025, 10:25 PMMonika Bednarz
06/02/2025, 8:17 AMWouter Zorgdrager
06/02/2025, 4:56 PMCaused by: pemja.core.PythonException: <class 'ModuleNotFoundError'>: No module named 'pemja'
at <string>.<module>(<string>:1)
at pemja.core.PythonInterpreter.exec(Native Method)
at pemja.core.PythonInterpreter.exec(PythonInterpreter.java:124)
at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:151)
at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:45)
at org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator.open(AbstractEmbeddedPythonFunctionOperator.java:72)
at org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator.open(AbstractEmbeddedDataStreamPythonFunctionOperator.java:88)
at org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.open(AbstractOneInputEmbeddedPythonFunctionOperator.java:68)
at org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator.open(EmbeddedPythonProcessOperator.java:67)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:840)
I'm sure pemja is installed in my virtual env and tried a bunch of configs to point explicitly to my venv, but no luck. I'm using Python 3.12. Any thoughts?Tiago Pereira
06/04/2025, 7:44 PMTiago Pereira
06/04/2025, 7:44 PMTiago Pereira
06/04/2025, 7:44 PMFabricio Lemos
06/17/2025, 11:26 PMFlinkKinesisConsumer(stream_name, SimpleStringSchema(), source_properties)
I'm getting Could not found the Java class 'org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper'
I believe this happens because the jar flink-connector-aws-kinesis-streams.5.0.0-1.20
does not have the class KinesisDeserializationSchemaWrapper
. I'm importing the jar correctly since the Kinesis sink is working.
I'm following the documentation here.Siddhesh Kalgaonkar
06/18/2025, 4:14 PMAWS MSK -> CDC connectors(EC2 on EKS) -> Flink Framework(expected) - Iceberg tables(s3)
Currently in my project we are planning to implement Flink framework for creating / managing I*ceberg* tables. We will be using EMR on EKS so wanted to understand what stable version of Flink should we go ahead with? Since this will be deployed in production later on for more than 100 tables that will be synced in real-time. I am planning to use PyFlink since I am only aligned with Python. So, what version of Flink would be best? Also, I have seen that I need to use Flink SQL API to interact with iceberg tables(s3). So wanted some suggestions here.
Also, for compaction of metadata and data files, what would be the ideal frequency to compact? every 6 hours or every 24 hours?
My experience with PyFlink has majorly been around datastream API and not worked extensively with Flink so need your help.
Kafka : 3.6.0Siddhesh Kalgaonkar
06/18/2025, 5:45 PMSiddhesh Kalgaonkar
06/19/2025, 8:38 PMNiharika Sakuru (Niha)
07/01/2025, 2:50 PMPythonEnvUtils.java
, and I noticed that it logs the entire environment variable map when launching PyFlink jobs.
This seems like it could be a security issue, especially in Kubernetes setups using the Flink Kubernetes Operator — since secrets are commonly mounted as env vars in pods. That means things like AWS_SECRET_ACCESS_KEY
, DB_PASSWORD
, etc. could end up in plaintext JobManager or TaskManager logs.
📌 Here's an example of what’s being logged:
Starting Python process with environment variables: AWS_SECRET_ACCESS_KEY=..., DB_PASSWORD=...
Has anyone else run into this?
Curious if there's already been discussion or a fix proposed upstream.
Would love thoughts from others who are deploying PyFlink in production or using secrets in K8s environments.
I've created https://issues.apache.org/jira/browse/FLINK-38035 with all detailsDavide Reich
07/10/2025, 10:03 AMEnabling required built-in plugins
Linking flink-s3-fs-presto-1.20.0.jar to plugin directory
Plugin flink-s3-fs-presto-1.20.0.jar does not exist. Exiting.
Interestingly, some TaskManagers start up successfully, while others fail with this error.
Do you have any idea what could cause this inconsistent behavior?
Thanks in advance!pr3c0g
07/10/2025, 6:45 PMINFO
and so only messages with logging level INFO
or above will appear in the log files of the TaskManagers
."
Is there really no way of writing them to another file?
Many thanks in advance!Fabricio Lemos
07/17/2025, 12:25 AMflink-connector-jdbc.3.3.0-1.20
, but a long time ago, it removed JdbcOutputFormat.createRowJdbcStatementBuilder
, which is needed by Pyflink.Fabricio Lemos
07/22/2025, 4:58 AMListState
in batch mode.
The issue does not occur with Java or when using PyFlink with ValueState
, or when not running in Batch mode.
In the repro below, I register an event timer and store values in ListState
. The timer fires as expected, but the state appears empty for some keys only when batch mode is enabled.
I've tested with v2.0.0 and v1.20.2
Output with BATCH mode:
[on_timer] key=user1 -> collected txns = []
[on_timer] key=user2 -> collected txns = []
Output without BATCH mode:
[on_timer] key=user1 -> collected txns = ['tx1']
[on_timer] key=user2 -> collected txns = ['tx2']
from pyflink.common import Types, Row
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ListStateDescriptor
class ReproFunction(KeyedProcessFunction):
def open(self, runtime_context: RuntimeContext):
self.txns = runtime_context.get_list_state(
ListStateDescriptor("txns", Types.STRING())
)
def process_element(self, transaction: Row, ctx: "KeyedProcessFunction.Context"):
self.txns.add(transaction.id)
print(f"[process_element] user_id={transaction.user_id}, tx_id={transaction.id}")
ctx.timer_service().register_event_time_timer(0)
def on_timer(self, timestamp: int, ctx: "KeyedProcessFunction.OnTimerContext"):
collected = list(self.txns.get())
print(f"[on_timer] key={ctx.get_current_key()} -> collected txns = {collected}")
return []
def main():
env = StreamExecutionEnvironment.get_execution_environment()
# env.set_runtime_mode(RuntimeExecutionMode.BATCH) # comment this line for things to work
env.set_parallelism(1)
transactions = [
Row(id="tx1", user_id="user1"),
Row(id="tx2", user_id="user2"),
]
(
env.from_collection(
transactions,
type_info=Types.ROW_NAMED(["id", "user_id"], [Types.STRING(), Types.STRING()])
)
.key_by(lambda tx: tx.user_id)
.process(ReproFunction())
)
env.execute("PyFlink KeyedProcessFunction State Bug Repro")
if __name__ == "__main__":
main()
Noufal Rijal
07/30/2025, 5:25 AMKrishnakumar K
08/01/2025, 7:26 AMAsyncDataStream
operator in pyflink. From what I research, this operator is only supported in Java currently, and not in python. We have a use case to make successive API calls to an external service, and having them async would greatly boost the performance of our pipeline.
Thanks!