https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • g

    George Leonard

    12/12/2025, 2:48 PM
    RE My ERROR above. I implemented a JDBC catalog and created Paimon based tables onto the same MinIO stack and thats working... so Flink itself is working. this is somewhere in the polaris integration
    a
    • 2
    • 3
  • m

    Marco Villalobos

    12/12/2025, 7:05 PM
    Hi everybody. I want to consider using Apache Flink CDC that has Mysql as a Source and Paimon as a Sink. I wanted to deploy this on the Kubernetes Operator, but I noticed that it requires the Mini Cluster. I prefer not to use Mini Cluster for production. How do I change that? My next option is Amazon Managed Service for Apache Flink, but I don't know if people have successfully integrated that with Apache Flink CDC. Is there a way to change Apache Flink CDC to NOT use MiniCluster in the Kubernetes Operator? Does Apache Flink CDC work with Amazon Managed Service for Apache Flink? My alternative is integrate Debezium directly, with Apache Kafka, and Spark structred streaming with Paimon. I'd prefer to use Flink.
    a
    • 2
    • 11
  • h

    Hristo Yordanov

    12/12/2025, 9:37 PM
    Hi all, Is possible to enable Watermarks when using Flink CDC Pipeline (yaml file). Version 3.5.0 and Flink 1.20? I want to see watermarks metrics on prometheus and watermarks on Flink UI when using pipeline approach.
  • b

    Brad Murry

    12/15/2025, 3:45 PM
    Howdy! I'm looking to traverse state using the state processor API, however my flink jobs are often a mix of datastream and Table/SQL APIs. I could potentially manually register state descriptors for a job on the datastream portions, follow best practices such as consistent UUIDs on operators, etc.. but for Table/SQL, since that is all generated via the planner, I'm not sure if there is a path forward. Does a methodology exist for interrogating a job graph and building a mapping of state descriptors, etc...?
    a
    a
    d
    • 4
    • 11
  • r

    Royston

    12/16/2025, 12:36 PM
    Hi Was wondering if there is a tool or a combination of tools which anybody has used to act as a coordinator for jobs submitted in session mode Looking for mainly one thing 1. Scale up and scale down taskmanager pods based on some kind of queue of jobs 2. Additionally management of jobs in case of failure of job manager Afaik operator automatically does this part but wanted to confirm Thanks
    j
    • 2
    • 5
  • t

    Tiago Pereira

    12/16/2025, 1:48 PM
    HI @Royston
  • t

    Tiago Pereira

    12/16/2025, 1:48 PM
    i´m not a member of flink development but what i know is that if you are using kubernetes to deploy your cluster
    r
    • 2
    • 1
  • t

    Tiago Pereira

    12/16/2025, 1:49 PM
    changing the paralellism and number of instances you can achieve that. having multiple task managers
  • t

    Tiago Pereira

    12/16/2025, 1:50 PM
    image.png
  • t

    Tiago Pereira

    12/16/2025, 1:50 PM
    image.png
  • t

    Tiago Pereira

    12/16/2025, 1:52 PM
    if i understood your question... but one thing that i´m not understanding, what are your queue jobs?
    r
    • 2
    • 8
  • j

    Jon Slusher

    12/18/2025, 5:47 PM
    👋 I'm trying to make jobs in my FlinkDeployment stateful using the information here in the documentation. I'm running Flink using the operator in Kubernetes (EKS) and my first idea was to use an EBS-backed PVC. It seems that because the jobManager and taskManager run on different pods I would have to configure the PVC as
    ReadWriteMany
    , which is not supported for EBS volumes. Can anyone point me in the right direction? I see that EFS might be an option, but I'm curious what the recommended options are for configuring savepoint/checkpoint and ha volumes for Flink jobManagers in EKS. I'll put my current configuration in a thread
    • 1
    • 6
  • b

    Bruno Cancelinha

    12/18/2025, 6:28 PM
    Hello guys 👋. I’m a software developer and we are developing an alert system using Flink SQL. We want our users to be able to define alerts for specific metrics. In this particular case, we are developing an alert system for the number of users logged in on a particular account. We have two Postgres databases. One defines the alerts (
    alert_definitions
    ) and the other one represents all users’ status (
    user_status
    ). Because these are continuously updating tables, I used
    postgres-cdc
    connector like so:
    Copy code
    CREATE TABLE user_status_binlog (
      id STRING NOT NULL,
      user_id STRING,
      account_id STRING,
      status STRING,
      deleted BOOLEAN,
      inactive BOOLEAN,
      queue_name STRING,
      team_id STRING,
      updated_at TIMESTAMP(3),
      PRIMARY KEY (id) NOT ENFORCED,
      WATERMARK FOR updated_at AS updated_at
    ) WITH (
     'connector' = 'postgres-cdc',
     'hostname' = 'postgresql',
     'port' = '5432',
     'username' = 'admin',
     'password' = 'admin',
     'schema-name' = 'public',
     'database-name' = 'user-status',
     'table-name' = 'live_user_status',
     'slot.name' = 'flink',
     'debezium.plugin.name' = 'pgoutput',
     'scan.startup.mode' = 'initial',
     'changelog-mode' = 'upsert'
    );
    
    
    CREATE TABLE alert_definitions(
      alert_id STRING NOT NULL,
      metric_id STRING,
      account_id STRING,
      filters STRING,
      min_value NUMERIC,
      max_value NUMERIC,
      filter_status AS JSON_QUERY(filters, '$.status' RETURNING ARRAY<STRING>),
      filter_ring_groups AS JSON_QUERY(filters, '$.ring_groups' RETURNING ARRAY<STRING>),
      filter_team_ids AS JSON_QUERY(filters, '$.team_ids' RETURNING ARRAY<STRING>),
      created_at TIMESTAMP(6),
      PRIMARY KEY (alert_id) NOT ENFORCED
    ) WITH (
     'connector' = 'postgres-cdc',
     'hostname' = 'postgresql',
     'port' = '5432',
     'username' = 'admin',
     'password' = 'admin',
     'schema-name' = 'public',
     'database-name' = 'live_sentinel_db',
     'table-name' = 'alert_definition',
     'slot.name' = 'alert_definition',
     'debezium.plugin.name' = 'pgoutput',
     'scan.startup.mode' = 'initial',
     'changelog-mode' = 'upsert'
    );
    Every time a new user changes their status or a new alert is created, I want the query to run and send the results to a kafka topic. Because my query is an updating table, I created a Kafka sink with connector
    upsert-kafka
    (although the behaviour I’m looking for is more akin to an appending-only table). My query, which counts the number of users logged in, is as follows:
    Copy code
    CREATE TEMPORARY VIEW alert_user_counts AS
    SELECT
        alert_id,
        account_id,
        active_user_count,
        min_value,
        max_value
    FROM 
        (
            SELECT alert_id, account_id, min_value, max_value, filter_ring_groups, filter_status, filter_team_ids 
            FROM alert_definitions 
            WHERE account_id IS NOT NULL
            AND metric_id = 'count-users-logged-in'
        ) alerts,
        LATERAL (
            SELECT
                COUNT(DISTINCT user_id) AS active_user_count
            FROM user_status_binlog AS us
            WHERE
                us.account_id = alerts.account_id
                AND us.status NOT IN ('offline', 'hidden')
                AND us.deleted = FALSE
                AND us.inactive = FALSE
                AND (COALESCE(CARDINALITY(alerts.filter_status), 0) = 0 OR ARRAY_CONTAINS(alerts.filter_status, us.status))
        );
    
    INSERT INTO notification_sink
    SELECT 
        counts.account_id,
        counts.alert_id,
        counts.active_user_count as `value`,
        CASE 
            WHEN (counts.min_value IS NULL OR counts.active_user_count >= counts.min_value) 
             AND (counts.max_value IS NULL OR counts.active_user_count <= counts.max_value) 
            THEN 'VIOLATION'
            ELSE 'NOMINAL'
        END AS `type`,
        CURRENT_TIMESTAMP as event_timestamp
    FROM alert_user_counts counts
    Although the idea behind this job seems pretty simple, I keep finding issues with my implementation. I needed to use a LATERAL JOIN because it was the only way not to get multiple values for the same alert. Currently it’s working fine except when the
    alert_definition
    table is updated. If a new alert is created, then a lot of messages are sent to kafka. For example: say the account has 3 users logged in. If a new alert is created for that account, we get 3 messages on the kafka topic, like so:
    Copy code
    {"account_id":"account-id","alert_id":"10","value":1,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.991"}
    {"account_id":"account-id","alert_id":"10","value":2,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.992"}
    {"account_id":"account-id","alert_id":"10","value":3,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.993"}
    I don’t really understand this ‘counting’ behaviour. It’s as if the user_status table was an appending only table and this new alert is joining with multiple past versions. Can someone please explain to me why this behaviour is happening? P.S. You might realize that this implementation doesn’t really work when no users are logged in (In which case it should return a 0 but Flink SQL doesn’t return any lines). I’m aware of that, but right now it’s not where I’m focusing on.
  • u

    Utkarsh Vashishtha

    12/18/2025, 9:02 PM
    Hi, would appreciate some help on this: With an upgrade to using Flink 2.0 and iceberg 1.10, we noticed that the Iceberg Files committer stream and the writer stream were chained together in the same operator (vertex node). This was not the case with versions <= Flink 1.20 where the committer (with max parallelism 1) and writer streams were different chains (occupied different flink vertices). Is this expected? If it is, then functionality (like autoscaling) that was dependent on per-vertex metric consumption breaks. If there is a mistake in my understanding, please correct it.
  • s

    syepes

    12/19/2025, 1:13 PM
    Hello All, I have a question regarding Kafka Consumer Offsets. I'm currently developing a stateless processing job that reads data from a Kafka topic using the Flink DataStream Kafka Source Connector (python), but I'm not using checkpoints within the code and I have not enabled it. I also have configured the following consumer parameters: (enable.auto.commit: true, commit.offsets.on.checkpoint: false, set_starting_offsets(KafkaOffsetsInitializer.latest()). What I'm unclear about is whether the offsets are actually stored and reloaded from the Kafka broker after a Flink job restarts. With this configuration, can I guarantee that already consumed events from the topic will not be re-consumed after the restart? Besides using checkpoints, what other methods or approaches can we use to ensure that jobs do not re-consume the same records after restarts? Now on the monitoring side from what I understand, consumer lag monitoring is not possible using the classic broker lag metrics. instead, we need to use the Flink exposed metrics: (flink_taskmanager_job_task_operator_pendingRecords + flink_taskmanager_job_task_operator_KafkaSourceReader_KafkaConsumer_records_lag_max) Is this actually the correct way to monitor consumption lag when using the Flink consumer?
    j
    • 2
    • 5
  • d

    Dominik

    12/23/2025, 8:52 AM
    Hello to all of you, I have a question regarding the Hive dependencies for Flink 2.0.1. I want to use Hive as a Metastore to store Kafka Topics as Flink Tables to use with FlinkSQL. The thing is the dependencies for the Hive Metastore dont seem to exist the download link in the documentation leads to a 404: https://nightlies.apache.org/flink/flink-docs-release-2.0/docs/connectors/table/hive/overview/#dependencies and the Maven Repo doesnt seem to have dependencies for flink 2.0 at all: https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.3_2.12/ Can anyone help me out here? Does flink 2.0 not support the hive metastore anymore? If so what are alternatives? I wish you all a merry christmas!
  • u

    徐启文

    12/24/2025, 2:08 AM
    Hello everyone, I have a question about the startup mode for MySQL CDC consumers. If I want to start reading binlog data from the Checkpoint position when using the Checkpoint parameter, and perform data synchronization in full + binlog mode when the checkpoint parameter is not specified, how should I configure the value of the scan.startup.mode parameter?
    Copy code
    source:
      type: mysql
      scan.startup.mode: earliest-offset                    # Start from earliest offset
      scan.startup.mode: latest-offset                      # Start from latest offset
      scan.startup.mode: specific-offset                    # Start from specific offset
      scan.startup.mode: timestamp                          # Start from timestamp
      scan.startup.mode: snapshot                          # Read snapshot only
      scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
      scan.startup.specific-offset.pos: 4                   # Binlog position under specific offset mode
      scan.startup.specific-offset.gtid-set: 24DA167-...    # GTID set under specific offset startup mode
      scan.startup.timestamp-millis: 1667232000000          # Timestamp under timestamp startup mode
      # ...
    Copy code
    val savepointPath = Option(
      params.get("s",
        params.get("fromSavepoint", null))
    )
    val isRestoring = savepointPath.isDefined
    if (isRestoring) {
      println(s"Restoring from: ${savepointPath.get}")
      println("Using scan.startup.mode: latest-offset")
    } else {
      println("Starting fresh job")
      println("Using scan.startup.mode: initial")
    }
    val scanMode = if (isRestoring) "latest-offset" else "initial"
    Is it necessary for me to judge the parameter like this, and what should be the correct way to write it? Looking forward to your reply.
  • g

    George Leonard

    12/24/2025, 6:50 AM
    hmmm, battling to keep pyflink flow stable, or is that flink cluster and output to paimon on S3. https://github.com/georgelza/PyFlink_Embedder.git slowed the data push into postgress right down, 3 records a second. the embedidng is like 0.15 seconds, so faster than the inbound flow. i've upped the various memory values for flink in config file. to make more memory avail i stopped the 2nd taskmanager as flink cdc is single/thread consuming via the WAL anyhow. would appreciate if someone can help getting the environment stable and the tps up... configuration file for flink is <project root>/devlab/conf/config.yaml the pyflink job is in <project root>/devlab/pyflink the insert into select from statement that kicks the flink job off is in: the pyflink job is in <project root>/devlab/creFlinkFlows/scripts/4.1.creInsertsAhSingle.sql the above uses the generate_ah_embedding udf defined in devlab/pyflink/udfs the idea is once this single threaded works to maybe see if i can get 4.1.creInsertsAhMulti.sql also to work, with multiple workers. just further on this, at the moment i'm battling to embed and store accountholder profiles, at hmm 10/s, i want to be able to embed/store transactions in parallel at 100/s ()for lab can reduce to 5 spread out across workers to proof the configuration.... as it stands i'm calculating the vector in 0.1txn/sec. so should be able to do say 7/sec / worker realistically, I can def run 10 workers on my laptop, each with 1.5GB dedicated, I have 32GB to play work and 10 cores. which should put that 50/sec well in the do-able zone. i understand I need a supporting configuration at the flink server jobmanager/taskmanager level, but also thinking some per job specific tweeking is needed as not all jobs will/or even should inherit all settings from a global config.
    • 1
    • 2
  • a

    Anatoliy Samsonov

    12/24/2025, 7:46 AM
    Hi everyone! I have a Flink Table/SQL job that reads from table A and writes to table B. Table A is continuously updated by another job, but my job finishes (
    FINISHED
    ) after reading the current snapshot. How can I read table A in streaming (unbounded) mode, so the job keeps running and picks up new data instead of finishing? Context: • Iceberg connector • Flink Table / SQL API What is the correct way to enable streaming reads?
  • g

    George Leonard

    12/24/2025, 9:27 AM
    In your session.
    Copy code
    SET 'execution.runtime-mode'            = 'streaming';
    SET 'execution.planner.type'            = 'streaming';
  • g

    George Leonard

    12/24/2025, 9:28 AM
    here is a postgresql backed cdc table create that i use.
    Copy code
    CREATE OR REPLACE TABLE accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,address            STRING
        ,accounts           STRING
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders_pyflink'           -- Can't include capital letters
        ,'scan.incremental.snapshot.enabled'   = 'true'               
        ,'scan.startup.mode'                   = 'initial'            
        ,'decoding.plugin.name'                = 'pgoutput'
        ,'scan.incremental.snapshot.chunk.size' = '4096'    -- Explicitly set chunk size
        ,'scan.snapshot.fetch.size'             = '512'     -- Add fetch size
        ,'connect.timeout'                      = '30s'     -- Add connection timeout
    );
  • b

    Ben Amiel

    12/24/2025, 9:37 AM
    Hi guys, im deploying flink 1.19.2 on k8s with operator and im receiving errors from pekko saying “remote system has been silent for too long. (More than 2 days)” What does it mean and why does it occur?
  • a

    Anatoliy Samsonov

    12/24/2025, 2:11 PM
    SET 'execution.runtime-mode' = 'streaming'; SET 'execution.planner.type' = 'streaming'; it is not working, its still ends with status 'finished'
  • g

    George Leonard

    12/24/2025, 2:14 PM
    hmmm, you executing all via sql-client. whats your data source? this strangely always worked for me
  • a

    Anatoliy Samsonov

    12/24/2025, 2:28 PM
    whats your data source? iceberg table
  • a

    Anatoliy Samsonov

    12/24/2025, 2:29 PM
    hmmm, you executing all via sql-client. - Yea
  • s

    syepes

    12/28/2025, 11:52 PM
    Flink Kubernetes Operator Question: I am currently running PyFlink jobs using the
    FlinkDeployment
    , and everything is working as expected. Now, I want to migrate some of these jobs to be deployed with the
    FlinkSessionJob
    so that they can run under the same JobManager. The main challenge is that each job requires its own set of environment variables, but I have not been able to figure out how to configure the "podTemplate.podTemplate.containers.env" when using
    FlinkSessionJob
    CRD's Does anyone know if it is currently supported to use the podTemplate, specifically setting environment variables, when deploying a FlinkSessionJob?
    • 1
    • 1
  • u

    徐科

    12/29/2025, 6:19 AM
    I would like to ask a question: Is it recommended to use flink-operator to deploy Flink on a Kubernetes 1.18 cluster? I tested it recently and found that the manifests generated by the operator are not accepted by the API server, with many fields showing as null.
  • u

    徐科

    12/29/2025, 11:18 AM
    May I ask if there is a minimum version requirement for Kubernetes when deploying Flink on it?
    s
    • 2
    • 4
  • b

    Ben Amiel

    12/30/2025, 5:50 PM
    Hey guys, may i ask how schema evolution works and how to implement it using custom serializers?
    d
    • 2
    • 2