https://flink.apache.org/ logo
Join Slack
Powered by
# pyflink
  • t

    Tiago Pereira

    05/06/2025, 9:38 PM
    i needed to create a ByteArraySerializer that is native on Java Flink but not on PyFlink --"
  • p

    Philipp

    05/08/2025, 12:40 PM
    Alright, thanks for the confirmation, that's what I thought. We dropped our ideas to use PyFlink at all at this point, too many things just don't work out while they are easily done in native java. And before having to mix things up between both worlds we just decided to stick with java only now
  • t

    Tiago Pereira

    05/08/2025, 1:21 PM
    yeah. on our project we are going to move to java or going away from flink
  • t

    Tiago Pereira

    05/08/2025, 1:21 PM
    thats still on decision
  • t

    Tiago Pereira

    05/08/2025, 1:22 PM
    but the one that has most weight right now is Flink Java
  • t

    Tiago Pereira

    05/08/2025, 3:23 PM
    is someone from flink team listening to this chat?
    👋 3
  • r

    Robert Murray

    05/11/2025, 9:38 PM
    Hey does anyone know if there is a work around for https://issues.apache.org/jira/browse/FLINK-37505 that can be applied to Flink 2.0.0? I'm trying to migrate a PyFlink job/ cluster from Flink 1.20 to 2.0. When I start up the job (application mode) I get the following error:
  • r

    Robert Murray

    05/11/2025, 9:44 PM
    Copy code
    2025-05-11 21:24:01,426 INFO  org.apache.flink.client.python.PythonDriver                  [] - py4j.protocol.Py4JJavaError: An error occurred while calling o3.execute.
    : java.net.MalformedURLException: no protocol: ['file:/opt/flink/opt/flink-python-2.0.0.jar', 'file:/opt/flink/opt/flink-python-2.0.0.jar']
    2025-05-11 21:24:01,426 INFO  org.apache.flink.client.python.PythonDriver                  [] - : java.net.MalformedURLException: no protocol: ['file:/opt/flink/opt/flink-python-2.0.0.jar', 'file:/opt/flink/opt/flink-python-2.0.0.jar']
    It appears related to the fix I mention above. 2.0.1 is still in SNAPSHOT I'm looking for a solution to apply to a officially release version of flink. Thanks!
    d
    t
    • 3
    • 10
  • m

    Maxim Matveyev

    05/13/2025, 7:56 PM
    Hello! How can I properly deserialize messages from Kafka using Python Flink with Schema Registry integration?
    👀 1
    i
    • 2
    • 1
  • g

    George Leonard

    05/17/2025, 3:29 PM
    Has anyone gotten pyflink to push payloads onto Prometheus?
  • s

    shivank chaturvedi

    05/30/2025, 6:25 AM
    Hey everyone , How do we assign names to operators in pyflink table api ? In datastream api its straightforward but wanted to know if there is any method for table api as well
    i
    • 2
    • 1
  • n

    Naveen Kumar

    05/30/2025, 10:40 AM
    I am processing 3 tables from kafka as stream in pyflink consumer Table-1 : hmi-primary ( vin, event_time, soc) Table-2 : bcm-secondary ( vin , event_time , BCM_ChargingOnProgress ) I am generating a payload after processing these 2 tables in coprocess fxn () and I wnat to apply another coproces fxn() where i am sending this payload . This payload includes : { vin , soc, event_time } Table-3 : bcm-primary ( vin , event_time , BCM_RangeDisplay ) How to yield object from coprocess function I want to use that object because it keep changing and i wnat to process that object + another stream together I have tried 2 methods -> using ctx.output , collector.collect but both doesn't work Now i dont know how to send our object + I dont know how to use side outputs and main outputs I can give you reference of my main.py , coprocess fxn-1 and fxn-2 env = setup_flink_environment() hmi_source = KafkaConfig.create_hmi_source() bcm_source = KafkaConfig.create_bcm_source() range_source = KafkaConfig.create_range_source() watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_millis(5000)) hmi_stream = env.from_source(source=hmi_source,watermark_strategy=watermark_strategy,source_name="HMI Source")\ .map(MessagePayload, output_type=Types.PICKLED_BYTE_ARRAY())\ .filter(lambda payload: 'EffectiveSOC' in payload.message_json.keys())\ .key_by(lambda payload: payload.vin) bcm_stream = env.from_source(source=bcm_source, watermark_strategy=watermark_strategy, source_name="BCM Source")\ .map( MessagePayload , output_type=Types.PICKLED_BYTE_ARRAY())\ .filter(lambda payload: 'BCM_ChargingOnProgress' in payload.message_json.keys() )\ .key_by(lambda payload: payload.vin) range_stream = env.from_source(source=range_source, watermark_strategy=watermark_strategy, source_name="Range Source")\ .map(MessagePayload, output_type=Types.PICKLED_BYTE_ARRAY())\ .filter(lambda payload: 'BCM_RangeDisplay' in payload.message_json.keys())\ .key_by(lambda payload: payload.vin) enriched_stream = hmi_stream.connect(bcm_stream).process(VehicleStateProcessor()) enriched_stream = enriched_stream.get_side_output(main_output) final_output = enriched_stream.connect(range_stream).process(RangeJoinProcessor()) final_output.print()
  • n

    Naveen Kumar

    05/30/2025, 10:46 AM
    image.png,image.png
  • u

    מייקי בר יעקב

    06/01/2025, 10:25 PM
    Does flink 2.0 includes built in end to end traces?
  • m

    Monika Bednarz

    06/02/2025, 8:17 AM
    Cross-posting here in hope to find a fix. We're using pyflink 🙂🙏
    a
    • 2
    • 2
  • w

    Wouter Zorgdrager

    06/02/2025, 4:56 PM
    hey! I'm trying to setup (Py)Flink for local development to address some unresolved Jira tickets which have been open for a while. Everything seems ok, but once I configure the thread execution mode rather than process mode, all examples error with the following Java exception:
    Copy code
    Caused by: pemja.core.PythonException: <class 'ModuleNotFoundError'>: No module named 'pemja'
            at <string>.<module>(<string>:1)
            at pemja.core.PythonInterpreter.exec(Native Method)
            at pemja.core.PythonInterpreter.exec(PythonInterpreter.java:124)
            at pemja.core.PythonInterpreter.initialize(PythonInterpreter.java:151)
            at pemja.core.PythonInterpreter.<init>(PythonInterpreter.java:45)
            at org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator.open(AbstractEmbeddedPythonFunctionOperator.java:72)
            at org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator.open(AbstractEmbeddedDataStreamPythonFunctionOperator.java:88)
            at org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator.open(AbstractOneInputEmbeddedPythonFunctionOperator.java:68)
            at org.apache.flink.streaming.api.operators.python.embedded.EmbeddedPythonProcessOperator.open(EmbeddedPythonProcessOperator.java:67)
            at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:963)
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
            at java.base/java.lang.Thread.run(Thread.java:840)
    I'm sure pemja is installed in my virtual env and tried a bunch of configs to point explicitly to my venv, but no luck. I'm using Python 3.12. Any thoughts?
    a
    m
    • 3
    • 7
  • t

    Tiago Pereira

    06/04/2025, 7:44 PM
    3,11 is the most updated version supported by pyflink
  • t

    Tiago Pereira

    06/04/2025, 7:44 PM
    3.12 is not supported i think
    m
    • 2
    • 1
  • t

    Tiago Pereira

    06/04/2025, 7:44 PM
    or at least is not officially supported
  • f

    Fabricio Lemos

    06/17/2025, 11:26 PM
    Does the Kinesis Source connector work with Pyflink 1.20.1? When doing
    FlinkKinesisConsumer(stream_name, SimpleStringSchema(), source_properties)
    I'm getting
    Could not found the Java class 'org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper'
    I believe this happens because the jar
    flink-connector-aws-kinesis-streams.5.0.0-1.20
    does not have the class
    KinesisDeserializationSchemaWrapper
    . I'm importing the jar correctly since the Kinesis sink is working. I'm following the documentation here.
    • 1
    • 1
  • s

    Siddhesh Kalgaonkar

    06/18/2025, 4:14 PM
    Hello #C065944F9M2 My Flow is like this:
    AWS MSK -> CDC connectors(EC2 on EKS) -> Flink Framework(expected) - Iceberg tables(s3)
    Currently in my project we are planning to implement Flink framework for creating / managing I*ceberg* tables. We will be using EMR on EKS so wanted to understand what stable version of Flink should we go ahead with? Since this will be deployed in production later on for more than 100 tables that will be synced in real-time. I am planning to use PyFlink since I am only aligned with Python. So, what version of Flink would be best? Also, I have seen that I need to use Flink SQL API to interact with iceberg tables(s3). So wanted some suggestions here. Also, for compaction of metadata and data files, what would be the ideal frequency to compact? every 6 hours or every 24 hours? My experience with PyFlink has majorly been around datastream API and not worked extensively with Flink so need your help. Kafka : 3.6.0
    r
    • 2
    • 12
  • s

    Siddhesh Kalgaonkar

    06/18/2025, 5:45 PM
    Also #C065944F9M2 As per this https://iceberg.apache.org/docs/latest/flink/#iceberg-to-flink column level operations are not supported yet in Flink Iceberg? I want to use glue catalog with schema evolution so how does it work? Am I missing something here?
    r
    m
    • 3
    • 7
  • s

    Siddhesh Kalgaonkar

    06/19/2025, 8:38 PM
    Hello #C065944F9M2 Since my use case involves UPSERT operation, I was checking if I can create any temporary view on the top of the CDC data like we do it in Spark and I came across create table api that has list of columns defined in the syntax but what if I am not sure if a new column is added to the CDC data and if it is not involved in the syntax then that column’s data won’t be captured I guess. So how do I deal with this in PyFlink? I want to use CDC data and existing table data and run a merge command.
    j
    • 2
    • 6
  • n

    Niharika Sakuru (Niha)

    07/01/2025, 2:50 PM
    Hey #C065944F9M2 — I’ve been looking into how Flink starts Python processes via
    PythonEnvUtils.java
    , and I noticed that it logs the entire environment variable map when launching PyFlink jobs. This seems like it could be a security issue, especially in Kubernetes setups using the Flink Kubernetes Operator — since secrets are commonly mounted as env vars in pods. That means things like
    AWS_SECRET_ACCESS_KEY
    ,
    DB_PASSWORD
    , etc. could end up in plaintext JobManager or TaskManager logs. 📌 Here's an example of what’s being logged:
    Copy code
    Starting Python process with environment variables: AWS_SECRET_ACCESS_KEY=..., DB_PASSWORD=...
    Has anyone else run into this? Curious if there's already been discussion or a fix proposed upstream. Would love thoughts from others who are deploying PyFlink in production or using secrets in K8s environments. I've created https://issues.apache.org/jira/browse/FLINK-38035 with all details
    r
    • 2
    • 11
  • d

    Davide Reich

    07/10/2025, 10:03 AM
    Hey #C065944F9M2 I'm running a Flink job on Kubernetes with 1 JobManager and multiple TaskManagers. I'm seeing repeated TaskManager crashes shortly after startup with the following logs:
    Copy code
    Enabling required built-in plugins  
    Linking flink-s3-fs-presto-1.20.0.jar to plugin directory  
    Plugin flink-s3-fs-presto-1.20.0.jar does not exist. Exiting.
    Interestingly, some TaskManagers start up successfully, while others fail with this error. Do you have any idea what could cause this inconsistent behavior? Thanks in advance!
    a
    j
    • 3
    • 2
  • p

    pr3c0g

    07/10/2025, 6:45 PM
    Hey all! I've been trying to separate my Python logs into a separate file, instead of having them in the same file as the Task Manager. I can do this with Java jobs by having separate appenders in the log4j.properties file, but I can't seem to separate the Python ones. The documentation states the following: > "*Note:* The default logging level at server side is
    INFO
    and so only messages with logging level
    INFO
    or above will appear in the log files of the
    TaskManagers
    ." Is there really no way of writing them to another file? Many thanks in advance!
    d
    • 2
    • 5
  • f

    Fabricio Lemos

    07/17/2025, 12:25 AM
    Is this a known bug with PyFlink 1.20? The documentation says to use
    flink-connector-jdbc.3.3.0-1.20
    , but a long time ago, it removed
    JdbcOutputFormat.createRowJdbcStatementBuilder
    , which is needed by Pyflink.
    d
    • 2
    • 3
  • f

    Fabricio Lemos

    07/22/2025, 4:58 AM
    Hey folks 👋 I’ve run into what seems like an issue with
    ListState
    in batch mode. The issue does not occur with Java or when using PyFlink with
    ValueState
    , or when not running in Batch mode. In the repro below, I register an event timer and store values in
    ListState
    . The timer fires as expected, but the state appears empty for some keys only when batch mode is enabled. I've tested with v2.0.0 and v1.20.2
    Copy code
    Output with BATCH mode:
    [on_timer] key=user1 -> collected txns = []
    [on_timer] key=user2 -> collected txns = []
    Copy code
    Output without BATCH mode:
    [on_timer] key=user1 -> collected txns = ['tx1']
    [on_timer] key=user2 -> collected txns = ['tx2']
    Copy code
    from pyflink.common import Types, Row
    from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
    from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext
    from pyflink.datastream.state import ListStateDescriptor
    
    
    class ReproFunction(KeyedProcessFunction):
    
        def open(self, runtime_context: RuntimeContext):
            self.txns = runtime_context.get_list_state(
                ListStateDescriptor("txns", Types.STRING())
            )
    
        def process_element(self, transaction: Row, ctx: "KeyedProcessFunction.Context"):
            self.txns.add(transaction.id)
            print(f"[process_element] user_id={transaction.user_id}, tx_id={transaction.id}")
            ctx.timer_service().register_event_time_timer(0)
    
        def on_timer(self, timestamp: int, ctx: "KeyedProcessFunction.OnTimerContext"):
            collected = list(self.txns.get())
            print(f"[on_timer] key={ctx.get_current_key()} -> collected txns = {collected}")
            return []
    
    
    def main():
        env = StreamExecutionEnvironment.get_execution_environment()
        # env.set_runtime_mode(RuntimeExecutionMode.BATCH) # comment this line for things to work
        env.set_parallelism(1)
    
        transactions = [
            Row(id="tx1", user_id="user1"),
            Row(id="tx2", user_id="user2"),
        ]
    
        (
            env.from_collection(
                transactions,
                type_info=Types.ROW_NAMED(["id", "user_id"], [Types.STRING(), Types.STRING()])
            )
            .key_by(lambda tx: tx.user_id)
            .process(ReproFunction())
        )
    
        env.execute("PyFlink KeyedProcessFunction State Bug Repro")
    
    
    if __name__ == "__main__":
        main()
    • 1
    • 1
  • n

    Noufal Rijal

    07/30/2025, 5:25 AM
    Hi #C065944F9M2 I have been working with the pyflink datastream API to process and sink data to Kafka. Is there any way by which we can sink the data to kafka partitions based on KEY. I am using flink 1.18.
    d
    • 2
    • 3
  • k

    Krishnakumar K

    08/01/2025, 7:26 AM
    Hi #C065944F9M2. Is there any way to use
    AsyncDataStream
    operator in pyflink. From what I research, this operator is only supported in Java currently, and not in python. We have a use case to make successive API calls to an external service, and having them async would greatly boost the performance of our pipeline. Thanks!