https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • b

    Ben Mali

    11/27/2025, 8:05 AM
    Hi all! I'm new to Flink, and I'm trying to understand how Flink autoscaling works on Kubernetes. I'm using aws kinesis connector, so I want a fixed number of task manager pods. I've deployed Flink using the Flink operator version 1.13.0 with Flink 1.20. Deploying using
    FlinkDeployment
    , defining
    spec.jobManager.replicas
    works as expected, but when I'm trying to specify
    spec.taskManager.replicas
    it seems to be ignored. Does anyone know how can I manually configure the number of task manager pods? Should I configure it using HPA?
    a
    • 2
    • 15
  • p

    Philipp

    11/27/2025, 10:12 AM
    Is it possible to use the Table API for protobuf data in apache kafka? Background of this question is, that the data in kafka is not raw protobuf but is prefixed with some metadata bytes (mostly the used SchemaID)
  • h

    Hiếu trịnh

    11/28/2025, 3:32 AM
    I'm new to Flink, I'm having a problem adding data to jdbc oracle. I read data from kafka confluent with upsert mode - kafka. insert into oracle from the result of joining streaming 2 tables. I can't append only to the oracle table, but only upsert. This is my error: java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.   Here is my code:   CREATE TABLE accounts (  KEY STRING,  NAME STRING, .... y-MM-dd HHmmss.SSS'),  PRIMARY KEY (RECID) NOT ENFORCED ) WITH (  'connector' = 'upsert-kafka',  'topic' = 'account',  'properties.bootstrap.servers' = '${confluent.cluster}',  'key.format' = 'raw',  'value.format' = 'avro-confluent',  'value.avro-confluent.url' = '${confluent.schema.registry}' );     CREATE TABLE transaction(  ID STRING,  BALANCE INT, .... y-MM-dd HHmmss.SSS'),  PRIMARY KEY (RECID) NOT ENFORCED ) WITH (  'connector' = 'upsert-kafka',  'topic' = 'transaction',  'properties.bootstrap.servers' = '${confluent.cluster}',  'key.format' = 'raw',  'value.format' = 'avro-confluent',  'value.avro-confluent.url' = '${confluent.schema.registry}' );   CREATE TABLE oracle_sink (  ... ) WITH (  'connector' = 'jdbc',  'url' = '${oracle.host}',  'table-name' = 'RESULT_TBL',  'username' = '${oracle.ogg.user}',  'password' = '${oracle.ogg.password}',  'driver' = 'oracle.jdbc.driver.OracleDriver'  );   INSERT INTO oracle_sink SELECT  … from account acc inner join transaction tran ....
  • h

    H.P. Grahsl

    12/01/2025, 10:41 AM
    ❓ I was wondering how Flink handles calls to one and the same UDF that's applied multiple times in a single Flink SQL / Table API query running on a task manager... e.g. if I do `SELECT MY_UDF(col1), MY_UDF(col2) FROM some_table`: • will Flink create two instances of MY_UDF, one for each usage? • will Flink create only a single MY_UDF instance and reuse this across the two usages? • is any of the above guaranteed to be consistently applied on the same / across different Flink version? • can any such behavior be configured / forced to some degree? THX for any in-depth insights on this. 🙏 Addendum: Let's assume a single TM with parallelism=1 and deterministic scalar function.
    g
    • 2
    • 3
  • j

    Jon Slusher

    12/02/2025, 6:52 PM
    x-posting just in case this is an error more common than just in PyFlink: https://apache-flink.slack.com/archives/C065944F9M2/p1764698399081389
    • 1
    • 1
  • a

    Aleksei Perminov

    12/03/2025, 3:21 PM
    Hi guys. Maybe anyone faced this issue and know how to fix it. Recently we've updated to Flink 2.0 and started facing the following issues
    Copy code
    2025-11-22 10:53:52.155 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}, current pending count: 2. 
    2025-11-22 10:53:52.156 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.e.ExternalResourceUtils - Enabled external resources: [] 
    2025-11-22 10:53:52.157 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.configuration.Configuration - Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.taskmanager.service-account' 
    2025-11-22 10:53:52.160 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Creating new TaskManager pod with name taskmanager-3-2 and resource <16384,12.0>. 
    2025-11-22 10:53:52.260 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:35310] failed with null 
    2025-11-22 10:53:52.272 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Pod taskmanager-3-2 is created. 
    2025-11-22 10:53:52.281 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Received new TaskManager pod: taskmanager-3-2 
    2025-11-22 10:53:52.282 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requested worker taskmanager-3-2 with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}. 
    2025-11-22 10:53:52.399 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.ReliableDeliverySupervisor - Association with remote system [<pekko.tcp://flink@10.0.0.180:6122>] has failed, address is now gated for [50] ms. Reason: [Disassociated] 
    2025-11-22 10:54:07.260 [flink-pekko.actor.default-dispatcher-19] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:34164] failed with null 
    2025-11-22 10:54:15.283 [flink-pekko.actor.default-dispatcher-15] INFO o.a.f.r.r.a.ActiveResourceManager - Registering TaskManager with ResourceID taskmanager-3-2 (<pekko.tcp://flink@10.0.0.181:6122/user/rpc/taskmanager_0>) at ResourceManager 
    2025-11-22 10:54:15.291 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.s.FineGrainedSlotManager - Registering task executor taskmanager-3-2 under 12848c284ec438712332f1fd3b76a28a at the slot manager. 
    2025-11-22 10:54:15.292 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Worker taskmanager-3-2 is registered. 
    2025-11-22 10:54:15.300 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Closing TaskExecutor connection taskmanager-3-2 because: Failed to send initial slot report to ResourceManager. java.lang.IllegalStateException: Cannot decrease, no worker of spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=25}.
    Basically, in FlinkDeployment the taskmanager.numberOfTaskSlots is set to 24, but for some reason the slot report shows 25 slots in the active Task Manager, which leads to the mentioned exception. It doesn't matter whether I set 24 or 32 numberOfTaskSlots, the issue will be the same - +1 slot in slot report and exception. This happens for about 5 minutes and then suddenly the job starts working fine. We use 1.12.1 Flink K8S Operator version and Flink 2.0.0. However this issue rarely appears on 1.20.1 as well. Thanks in advance.
  • m

    morti

    12/03/2025, 3:50 PM
    Hi friends. After migrating to Flink-2.1, I cannot submit the job with program arguments. I think the bug is related to the issue: https://issues.apache.org/jira/browse/FLINK-36275. Since
    programArgs
    query parameter is deleted, No program arguments (e.g, --batchSize 5000) are sent to the job. I'm currently using flink-2.1.1-java21 image. There is also related stackoverflow question: https://stackoverflow.com/questions/79702619/how-do-i-pass-program-arguments-from-apache-flink-2-0-0-web-gui-to-my-job-proper It works with rest api mentioned in stackoverflow question but as I said, program arguments are not sent to job with webui. Any help or Tip, would be appreciated!
  • j

    Jon Slusher

    12/05/2025, 9:18 PM
    👋 I'm trying to troubleshoot a new error I'm getting when testing my PyFlink job using Flink (1.20.3) and the operator in Kubernetes. The job seems to be timing out to my brokers, but as far as I can tell it has the same broker configuration that my Debezium connector has 🧵
    j
    • 2
    • 17
  • r

    Rion Williams

    12/06/2025, 3:59 AM
    Hey all — is there any way to confirm subscriptions to the mailing lists? I haven't received many/any in quite a long time (user will occasionally come through, but never dev or community). I’ve resubmitted the various $list-subscribe@flink.apache.org to all of the lists but never received the expected confirmation response. Any ideas?
    d
    m
    • 3
    • 8
  • g

    George Leonard

    12/07/2025, 10:42 AM
    hi hi all. trying to put a stream together, Shadowtraffic as data generator of the 2 products into 2 Postgres tables 'accountholders' and 'transactions'. Postgres into Flink using CDC -> This is all working, I can select from either table and get results via flink-sql. from my Postgres catalog based table into a Apache Paimon based table on Minio/S3. this is where I'm having some "sluggishness" atm. I'm expecting buckets to be created allot fast, allot more data to end in Paimon/S3... attached is the log file. For now I'm just trying t confirm this all is working before I start with the intended Pyflink "processing" that will consume from the Postgre source and output to the Paimon target. below is my 2 source table creates and the CTAS that pushes data into Paimon.
    Copy code
    # CDC Sources
    CREATE OR REPLACE TABLE postgres_catalog.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,address            STRING
        ,accounts           STRING
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    
    
    CREATE OR REPLACE TABLE postgres_catalog.demog.transactions (
         _id                            BIGINT              NOT NULL
        ,eventid                        VARCHAR(36)         NOT NULL
        ,transactionid                  VARCHAR(36)         NOT NULL
        ,eventtime                      VARCHAR(30)
        ,direction                      VARCHAR(8)
        ,eventtype                      VARCHAR(10)
        ,creationdate                   VARCHAR(20)
        ,accountholdernationalid        VARCHAR(16)
        ,accountholderaccount           STRING
        ,counterpartynationalid         VARCHAR(16)
        ,counterpartyaccount            STRING
        ,tenantid                       VARCHAR(8)
        ,fromid                         VARCHAR(8)
        ,accountagentid                 VARCHAR(8)
        ,fromfibranchid                 VARCHAR(6)
        ,accountnumber                  VARCHAR(16)
        ,toid                           VARCHAR(8)
        ,accountidcode                  VARCHAR(5)
        ,counterpartyagentid            VARCHAR(8)
        ,tofibranchid                   VARCHAR(6)
        ,counterpartynumber             VARCHAR(16)
        ,counterpartyidcode             VARCHAR(5)
        ,amount                         STRING
        ,msgtype                        VARCHAR(6)
        ,settlementclearingsystemcode   VARCHAR(5)
        ,paymentclearingsystemreference VARCHAR(12)
        ,requestexecutiondate           VARCHAR(10)
        ,settlementdate                 VARCHAR(10)
        ,destinationcountry             VARCHAR(30)
        ,localinstrument                VARCHAR(2)
        ,msgstatus                      VARCHAR(12)
        ,paymentmethod                  VARCHAR(4)
        ,settlementmethod               VARCHAR(4)
        ,transactiontype                VARCHAR(2)
        ,verificationresult             VARCHAR(4)
        ,numberoftransactions           INT
        ,schemaversion                  INT
        ,usercode                       VARCHAR(4)
        ,created_at                     TIMESTAMP_LTZ(3)
        ,WATERMARK                      FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'transactions'
        ,'slot.name'                           = 'transactions0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    and the create table as inserts.
    Copy code
    SET 'execution.checkpointing.interval'   = '10s';
    SET 'pipeline.name' = 'Persist into Paimon: accountholders table';
    
    CREATE OR REPLACE TABLE c_paimon.finflow.accountholders WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM postgres_catalog.demog.accountholders;
    
    SET 'pipeline.name' = 'Persist into Paimon: transactions table';
    
    CREATE OR REPLACE TABLE c_paimon.finflow.transactions WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM postgres_catalog.demog.transactions;
    jobmanager_log-2.txt
  • m

    Manish Jain

    12/09/2025, 9:43 AM
    Hi All, We are using Flink 1.20, and the related kafka-connector.
    Copy code
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.4.0-1.20</version>
    </dependency>
    This version of kafka client has 2 vulnerabilities. These vulnerabilities are only fixed in v3.9.1 of kafka clients. We are wondering how to handle this? Is it recommended to exclude the kafka-client from the connector and then override it with the new version of kafka-client. We are afraid that it might not be as easy as it sounds. 🙂 Is running the test cases for Flink sufficient to verify that the new version is flink compatible. Or are there other things we'd keep in my while doing this upgrade. PS: In the longer run, we might upgrade to Flink 2.0, but right now that is not on our plans.
    t
    • 2
    • 6
  • j

    Jon Slusher

    12/09/2025, 4:36 PM
    👋 Surfacing this issue in case it got lost in the shuffle. I'm still getting a certificate error on my Flink Kafka source trying to connect to my Kafka cluster using SASL_SSL. • Do I need to combine the certificate I exported from the cluster with another set of certificates? • Does anyone know how to set
    <http://javax.net|javax.net>.debug=ssl
    in the connector so I can see what certificate it's trying to use?
    • 1
    • 1
  • u

    Urs Schoenenberger

    12/10/2025, 2:24 PM
    Hi folks, we are still facing FLINK-38582 (GlobalCommitter stops committing, in this case in DeltaSink, after rescaling). Parts of this seems to have been fixed in FLINK-37747, but the issue persists, albeit rarely and not reproducible. I'm trying to further pinpoint the issue, so I wanted to ask: Are there any known general issues when using the (v1) GlobalCommitter with unaligned checkpoints? Either in the general way GlobalCommitter works, or any common implementation issues that the DeltaSink may have when implementing its GlobalCommitter?
  • a

    Almog Golbar

    12/10/2025, 4:10 PM
    Hi guys, We enabled Flink autoscaler memory tuning but aren't seeing recommendations, even with clear over-provisioning. Versions: • Flink: v1_17 (1.17.1) • Operator: 1.13.0 Configuration:
    Copy code
    job.autoscaler.enabled: "true"
    job.autoscaler.memory.tuning.enabled: "true"  #also tried false for dry run
    job.autoscaler.metrics.window: "5m"
    job.autoscaler.stabilization.interval: "30s"
    job.autoscaler.scaling.enabled: "false"
    What we've done: 1. Removed explicit memory configs (taskmanager.memory.process.size, taskmanager.memory.managed.size, taskmanager.memory.network.max) 2. Set taskManager.resource.memory: 12g 3. Verified metrics collection (6+ data points in ConfigMap autoscaler-<deployment-name>) 4. Confirmed autoscaler is active (ScalingExecutor running, metrics being collected) Current situation: • Memory: 12GB allocated, ~24% heap usage (~1.36GB used) • Metrics: Being collected and stored in ConfigMap • No recommendations: Not appearing in operator logs, Kubernetes events, or FlinkDeployment status/annotations What we've checked: • Operator logs show autoscaler activity but no memory tuning recommendations • No errors or warnings related to memory tuning • Metrics window is filling (5+ minutes of data collected) • Job is processing data (not idle) Has anyone seen memory tuning recommendations with this operator version? Are there additional requirements or known issues? Any guidance on where recommendations should appear or how to troubleshoot?
  • j

    Jon Slusher

    12/10/2025, 7:15 PM
    👋 I'm still trying to troubleshoot a connection from a Table API
    kafka
    source connector (using PyFlink) to our Kafka cluster using
    SASL_SSL
    and
    SCRAM-SHA-512
    . • Can anyone at least confirm that this should be possible? • I have some DEBUG logs to share that to me definitely point to the taskManager being the culprit. I see in the logs the jobManager is definitely connecting successfully to my Kafka brokers because it can see the topic I and partitions want to source. A taskManager is spawned shortly afterwards and that's what seems to file. • I'm using Flink 1.20.3, and the Flink operator in Kubernetes to manage the job • I'll post the configuration and a link to a condensed and redacted Gist of the DEBUG logs I captured for my connector in a thread
    h
    j
    • 3
    • 9
  • t

    Taranpreet Kaur

    12/11/2025, 2:57 AM
    Hi team! I have a use case where I want to run multiple separate Flink job instances that share Kafka topic partitions via Kafka's native consumer group balancing, rather than having each Flink job consume all partitions. Current behavior: Flink's KafkaSource automatically assigns all topic partitions to a single job, managing offsets internally for exactly-once semantics. Desired behavior: Multiple Flink job instances with the same Kafka consumer group ID, where Kafka's consumer group coordinator distributes partitions across instances (e.g., Instance A gets partition 0, Instance B gets partition 1). Questions: 1. Is there a way to configure KafkaSource to use Kafka's consumer group partition assignment instead of Flink's internal assignment? 2. How would this impact parallelism within each Flink job? If I use Kafka's subscribe mechanism, would I need to set parallelism=1 to avoid conflicts between Flink's subtasks and Kafka's partition assignment? 3. How would checkpointing work in this scenario? Would each Flink instance only checkpoint offsets for its assigned partitions? What happens during partition rebalancing when a failed instance's partitions get reassigned to surviving instances? 4. Are there any existing patterns or configurations for this use case? Context: We don't require exactly-once semantics and prefer the operational flexibility of having independent Flink instances that can scale based on Kafka's partition distribution. We're particularly concerned about checkpoint recovery behavior when Kafka rebalances partitions across instances. Any guidance or alternative approaches would be appreciated!
    j
    • 2
    • 1
  • g

    George Leonard

    12/11/2025, 5:57 PM
    hi hi all... bit stuck, trying to push data into a iceberg tables. Need someone to see where I went wrong... missed something the below is the code that created the bucket, I can see the bucket in MinIO: CDC Source table
    Copy code
    # CDC Sources
    CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,accounts           STRING
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    flink catalog create
    Copy code
    CREATE CATALOG c_iceberg WITH (
       'type'='iceberg'
      ,'catalog-type'='rest'
      ,'uri'='<http://polaris:8181/api/catalog>'
      ,'warehouse'='icebergcat'
      ,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
      ,'credential'='root:s3cr3t'
      ,'scope'='PRINCIPAL_ROLE:ALL'
      ,'s3.endpoint'='<http://minio:900>'
      ,'s3.access-key-id'='mnadmin'
      ,'s3.secret-access-key'='mnpassword'
      ,'s3.path-style-access'='true'
    );
    
    
    USE CATALOG c_iceberg;
    -- Create using below
    CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
    contents from .env file driving the compose.yaml ROOT_CLIENT_ID=root ROOT_CLIENT_SECRET=s3cr3t CATALOG_NAME=icebergcat POLARIS_REALM=findept MINIO_ROOT_USER=mnadmin MINIO_ROOT_PASSWORD=mnpassword MINIO_ALIAS=minio MINIO_ENDPOINT=http://minio:9000 MINIO_BUCKET=warehouse AWS_ACCESS_KEY_ID=mnadmin AWS_SECRET_ACCESS_KEY=mnpassword AWS_REGION=za-south-1 AWS_DEFAULT_REGION=za-south-1 polaris-bootsrap service from docker-compose
    Copy code
    polaris-bootstrap:
        image: apache/polaris-admin-tool:latest
        hostname: polaris-bootstrap
        container_name: polaris-bootstrap
        depends_on:
          postgrescat:
            condition: service_healthy    
        environment:
          - POLARIS_PERSISTENCE_TYPE=relational-jdbc
          - QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          - QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
          - QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
          - POLARIS_REALM=${POLARIS_REALM}
          - ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
          - ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
        command:
          - "bootstrap"
          - "--realm=${POLARIS_REALM}"
          - "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
    polaris-setup service from docker-compose.
    Copy code
    polaris-setup:
        image: alpine/curl
        hostname: polaris-setup
        container_name: polaris-setup
        depends_on:
          polaris:
            condition: service_healthy
        environment:
          - CLIENT_ID=${ROOT_CLIENT_ID}
          - CLIENT_SECRET=${ROOT_CLIENT_SECRET}
          - CATALOG_NAME=${CATALOG_NAME}
          - POLARIS_REALM=${POLARIS_REALM}
          - MINIO_BUCKET=${MINIO_BUCKET}
          - MINIO_ENDPOINT=${MINIO_ENDPOINT}
        volumes:
          - ./conf/polaris/:/polaris
        entrypoint: "/bin/sh"
        command:
          - "-c"
          - >-
            chmod +x /polaris/create-catalog.sh;
            chmod +x /polaris/obtain-token.sh;
            source /polaris/obtain-token.sh $$POLARIS_REALM;
            echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
            export STORAGE_CONFIG_INFO='{"storageType":"S3",
              "endpoint":"<http://localhost:9000>",
              "endpointInternal":"'$$MINIO_ENDPOINT'",
              "pathStyleAccess":true}';
            export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
            /polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
            echo Extra grants...;
            curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
              -X PUT \
              <http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
              -d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
            echo Polaris Setup Complete.;
    output from fetch catalogs api call, verifying the catalog and full path
    Copy code
    curl -X GET <http://localhost:8181/api/management/v1/catalogs> \
    >   -H "Authorization: Bearer ${TOKEN}" | jq
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100   388  100   388    0     0  13756      0 --:--:-- --:--:-- --:--:-- 13857
    {
      "catalogs": [
        {
          "type": "INTERNAL",
          "name": "icebergcat",
          "properties": {
            "default-base-location": "<s3://warehouse/iceberg>"
          },
          "createTimestamp": 1765474601004,
          "lastUpdateTimestamp": 1765474601004,
          "entityVersion": 1,
          "storageConfigInfo": {
            "endpoint": "<http://localhost:9000>",
            "endpointInternal": "<http://minio:9000>",
            "pathStyleAccess": true,
            "storageType": "S3",
            "allowedLocations": [
              "<s3://warehouse/iceberg>"
            ]
          }
        }
      ]
    }
    ERROR
    Copy code
    > CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
    >      'file.format'                       = 'parquet'
    >     ,'compaction.min.file-num'           = '2'
    >     ,'compaction.early-max.file-num'     = '50'
    >     ,'snapshot.time-retained'            = '1h'
    >     ,'snapshot.num-retained.min'         = '5'
    >     ,'snapshot.num-retained.max'         = '20'
    >     ,'table.exec.sink.upsert-materialize'= 'NONE'
    > ) AS 
    > SELECT * FROM c_cdcsource.demog.accountholders;
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1164/0x0000000801826a00 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
    docker images
    Copy code
    arm64v8/flink:1.20.2-scala_2.12-java17                      79a54781f5d0        789MB             0B        
    georgelza/apacheflink-base-1.20.2-scala_2.12-java17:1.0.2   dd8585afe66c       6.84GB             0B    U
    Flink container build / Dockerfile, as ... error search is saying version incompatibility...
    Copy code
    FROM arm64v8/flink:1.20.2-scala_2.12-java17
    SHELL ["/bin/bash", "-c"]
    
    ENV FLINK_HOME=/opt/flink
    ENV HIVE_HOME=$FLINK_HOME/conf/
    WORKDIR $FLINK_HOME
    
    
    RUN echo "--> Setup Directory Structure" && \
        mkdir -p /opt/flink/conf/ && \
        mkdir -p /opt/flink/checkpoints && \
        mkdir -p /opt/flink/rocksdb 
    
    RUN echo "--> Install JARs: Flink's S3 plugin" && \
        mkdir -p ./plugins/s3-fs-hadoop && \
        mv ./opt/flink-s3-fs-hadoop-1.20.2.jar ./plugins/s3-fs-hadoop/
    
    RUN echo "--> Install Flink JARs: => Generic"
    COPY stage/bundle-2.31.9.jar                            /opt/flink/lib/bundle-2.31.9.jar  
    COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar    /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    
    COPY stage/postgresql-42.7.6.jar                        /opt/flink/lib/postgresql-42.7.6.jar
    # CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
    COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar   /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
    COPY stage/flink-sql-json-1.20.2.jar                    /opt/flink/lib/flink-sql-json-1.20.2.jar     
    COPY stage/flink-sql-parquet-1.20.2.jar                 /opt/flink/lib/flink-sql-parquet-1.20.2.jar
    COPY stage/flink-json-1.20.2.jar                        /opt/flink/lib/flink-json-1.20.2.jar
    COPY stage/flink-connector-jdbc-3.3.0-1.20.jar          /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
    
    RUN echo "--> Install Flink JARs: => Open Table Formats"
    COPY stage/iceberg-flink-1.20-1.9.1.jar                 /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
    COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar         /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
    COPY stage/iceberg-aws-bundle-1.9.1.jar                 /opt/flink/iceberg-aws-bundle-1.9.1.jar
    
        
    RUN echo "--> Set Ownerships of /opt/flink" && \
        chown -R flink:flink $FLINK_HOME 
    
    USER flink:flink
    
    CMD ./bin/start-cluster.sh && sleep infinity
  • g

    George Leonard

    12/11/2025, 6:33 PM
    interesting errors in the docker compose logs wrt namespace
    err.txt
  • g

    George Leonard

    12/12/2025, 9:55 AM
    hi hi all. Need help pls. Downgraded to 1.20.1, Iceberg 1.9.1 hoping my problems was version incompatibility, as all searches pointed at that... below is my dockerfile used to build the flink container being used. I've now added the jackson jar, as previous errors pointed/complaint about that being missing.
    Copy code
    FROM arm64v8/flink:1.20.1-scala_2.12-java17
    SHELL ["/bin/bash", "-c"]
    ENV FLINK_HOME=/opt/flink
    ENV HIVE_HOME=$FLINK_HOME/conf/
    WORKDIR $FLINK_HOME
    
    
    RUN echo "--> Setup Directory Structure" && \
        mkdir -p /opt/flink/conf/ && \
        mkdir -p /opt/flink/checkpoints && \
        mkdir -p /opt/flink/rocksdb 
    
    RUN echo "--> Install JARs: Flink's S3 plugin" && \
        mkdir -p ./plugins/s3-fs-hadoop && \
        mv ./opt/flink-s3-fs-hadoop-1.20.1.jar ./plugins/s3-fs-hadoop/
    
    RUN echo "--> Install JARs: => Generic"
    COPY stage/bundle-2.31.9.jar                            /opt/flink/lib/bundle-2.31.9.jar  
    COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar    /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    COPY stage/jackson-core-2.20.1.jar                      /opt/flink/lib/jackson-core-2.20.1.jar
    COPY stage/postgresql-42.7.6.jar                        /opt/flink/lib/postgresql-42.7.6.jar
    # CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
    COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar   /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
    
    # Flink version specific
    RUN echo "--> Install JARs: => Flink 1.20.1"
    COPY stage/flink-sql-json-1.20.1.jar                    /opt/flink/lib/flink-sql-json-1.20.1.jar     
    COPY stage/flink-sql-parquet-1.20.1.jar                 /opt/flink/lib/flink-sql-parquet-1.20.1.jar
    COPY stage/flink-json-1.20.1.jar                        /opt/flink/lib/flink-json-1.20.1.jar
    COPY stage/flink-connector-jdbc-3.3.0-1.20.jar          /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
    
    RUN echo "--> Install JARs: => Iceberg 1.9.1"
    COPY stage/iceberg-flink-1.20-1.9.1.jar                 /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
    COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar         /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
    COPY stage/iceberg-aws-bundle-1.9.1.jar                 /opt/flink/iceberg-aws-bundle-1.9.1.jar
    
    RUN echo "--> Set Ownerships of /opt/flink" && \
        chown -R flink:flink $FLINK_HOME 
    
    USER flink:flink
    
    CMD ./bin/start-cluster.sh && sleep infinity
    below is the polaris bits from my docker-compose file, included as this creates the polaris catalog, uses the previously created minio warehouse/iceberg bucket
    Copy code
    polaris:
        image: apache/polaris:latest
        hostname: polaris
        container_name: polaris
        depends_on:
          postgrescat:
            condition: service_healthy
          minio:
            condition: service_healthy
          polaris-bootstrap:
            condition: service_completed_successfully
    
        ports:
          # API port
          - "8181:8181"
          # Management port (metrics and health checks)
          - "8182:8182"
          # Optional, allows attaching a debugger to the Polaris JVM
          - "5005:5005"
        environment:
    
          POLARIS_PERSISTENCE_TYPE: relational-jdbc
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES: 5
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS: 100
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DURATION_IN_MS: 5000
          QUARKUS_DATASOURCE_JDBC_URL: jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          QUARKUS_DATASOURCE_USERNAME: ${POSTGRES_CAT_USER}
          QUARKUS_DATASOURCE_PASSWORD: ${POSTGRES_CAT_PASSWORD}
          POLARIS_REALM_CONTEXT_REALMS: ${POLARIS_REALM}
          QUARKUS_OTEL_SDK_DISABLED: true
          polaris.features."ALLOW_INSECURE_STORAGE_TYPES": "true"
          polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES": "[\"FILE\",\"S3\"]"
          polaris.readiness.ignore-severe-issues: "true"
          
          # S3 endpoint for MinIO
          AWS_REGION: ${AWS_REGION}
          AWS_ACCESS_KEY_ID: ${MINIO_ROOT_USER}
          AWS_SECRET_ACCESS_KEY: ${MINIO_ROOT_PASSWORD}
          POLARIS_BOOTSTRAP_CREDENTIALS: ${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}
          polaris.realm-context.realms: ${POLARIS_REALM}
          quarkus.otel.sdk.disabled: "true"
    
        volumes:
          - ./data/polaris:/var/lib/polaris
          - ./conf/polaris:/polaris 
        healthcheck:
          test: ["CMD", "curl", "<http://localhost:8182/q/health>"]
          interval: 5s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: unless-stopped
    
      polaris-bootstrap:
        image: apache/polaris-admin-tool:latest
        hostname: polaris-bootstrap
        container_name: polaris-bootstrap
        depends_on:
          postgrescat:
            condition: service_healthy    
        environment:
          - POLARIS_PERSISTENCE_TYPE=relational-jdbc
          - QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          - QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
          - QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
          - POLARIS_REALM=${POLARIS_REALM}
          - ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
          - ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
        command:
          - "bootstrap"
          - "--realm=${POLARIS_REALM}"
          - "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
    
      polaris-setup:
        image: alpine/curl
        hostname: polaris-setup
        container_name: polaris-setup
        depends_on:
          polaris:
            condition: service_healthy
        environment:
          - CLIENT_ID=${ROOT_CLIENT_ID}
          - CLIENT_SECRET=${ROOT_CLIENT_SECRET}
          - CATALOG_NAME=${CATALOG_NAME}
          - POLARIS_REALM=${POLARIS_REALM}
          - MINIO_BUCKET=${MINIO_BUCKET}
          - MINIO_ENDPOINT=${MINIO_ENDPOINT}
        volumes:
          - ./conf/polaris/:/polaris
        entrypoint: "/bin/sh"
        command:
          - "-c"
          - >-
            chmod +x /polaris/create-catalog.sh;
            chmod +x /polaris/obtain-token.sh;
            source /polaris/obtain-token.sh $$POLARIS_REALM;
            echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
            export STORAGE_CONFIG_INFO='{"storageType":"S3",
              "endpoint":"<http://localhost:9000>",
              "endpointInternal":"'$$MINIO_ENDPOINT'",
              "pathStyleAccess":true}';
            export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
            /polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
            echo Extra grants...;
            curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
              -X PUT \
              <http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
              -d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
            echo Polaris Setup Complete.;
    c_iceberg catalog create
    Copy code
    CREATE CATALOG c_iceberg WITH (
       'type'='iceberg'
      ,'catalog-type'='rest'
      ,'uri'='<http://polaris:8181/api/catalog>'
      ,'warehouse'='icebergcat'
      ,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
      ,'credential'='root:s3cr3t'
      ,'scope'='PRINCIPAL_ROLE:ALL'
      ,'s3.endpoint'='<http://minio:900>'
      ,'s3.access-key-id'='mnadmin'
      ,'s3.secret-access-key'='mnpassword'
      ,'s3.path-style-access'='true'
    );
    
    
    USE CATALOG c_iceberg;
    -- Create using below
    CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
    flink sql being executed
    Copy code
    CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,accounts           STRING
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    
    SET 'execution.checkpointing.interval'   = '60s';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'pipeline.name' = 'Persist into Iceberg: accountholders table';
    CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM c_cdcsource.demog.accountholders;
    The above resulted in the below
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: finflow
    After I got the above namespace problem I executed the following
    Copy code
    curl -X POST <http://localhost:8181/api/catalog/v1/icebergcat/namespaces> \
        -H "Authorization: Bearer ${TOKEN}" \
        -H 'Content-Type: application/json' \
        -d '{"namespace": ["finflow"], "properties": {"description": "Iceberg catalog database"}}' | jq
    I then reran my above CTAS errors below that
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
    
    Flink SQL>
  • g

    George Leonard

    12/12/2025, 10:04 AM
    NOTE I can see my output table in MinIO, there is a metadata.json created. I can also select from my source table. Flink SQL> select * from c_cdcsource.demog.accountholders; [INFO] Result retrieval cancelled. Flink SQL> drop table c_iceberg.finflow.accountholders; [ERROR] Could not execute SQL statement. Reason: java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson. Flink SQL> use catalog c_iceberg; [INFO] Execute statement succeeded. Flink SQL> use finflow; [INFO] Execute statement succeeded. Flink SQL> show tables; +----------------+ | table name | +----------------+ | accountholders | +----------------+ 1 row in set Flink SQL> drop table accountholders; [ERROR] Could not execute SQL statement. Reason: java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
  • g

    George Leonard

    12/12/2025, 10:08 AM
    figured i will pre create the table and then do a insert into <> select * from <>... but well can't even drop the table;
  • f

    Francisco Morillo

    12/12/2025, 10:26 AM
    Hi everyone! While using Flink SQL there are many aggregations such as group bys, joins that turn the streams into an Upsert Stream, and there are many connectors that do not allow upsert stream, one example is Kinesis. Is there a way to disable the stream to become an upsert stream, and simply become an append only stream? We have made a workaround by doing 1 second windowing to turn it into append only, but adds unnecessary latency
    d
    • 2
    • 2
  • a

    Anooj S

    12/12/2025, 1:31 PM
    Hello everyone, We are running a Flink session cluster in standalone mode on Kubernetes, and we deploy multiple TaskManager deployments under the same session cluster, each with different CPU/memory configurations. This allows us to support both lightweight and CPU-heavy pipelines in the same Flink session. Example setup: • TM type A → low-spec (1 CPU, 4GB) • TM type B → high-spec (4 CPU, 16GB) • TM type C → mid-spec (2 CPU, 8GB) All TaskManagers register with the same JobManager. Our problem: When we submit a job, we cannot control which TaskManager type the job will use. Flink may place CPU-heavy tasks on low-spec TMs because we cannot specify: • TaskManager affinity • Slot affinity • TM selection rules • Resource-class filtering (beyond resource profiles) We previously used Native Kubernetes mode via the Flink K8s Operator, but it doesn’t allow different TM resource specs within the same session cluster. Switching to standalone mode gave us flexibility to run multiple TM deployments, but we still cannot control which TM the JobManager assigns tasks to. We found a similar question from 2021:https://stackoverflow.com/questions/68531175/task-manager-affinity-in-flink Any guidance would be appreciated. Thanks!
  • g

    George Leonard

    12/12/2025, 2:48 PM
    RE My ERROR above. I implemented a JDBC catalog and created Paimon based tables onto the same MinIO stack and thats working... so Flink itself is working. this is somewhere in the polaris integration
    a
    • 2
    • 2
  • m

    Marco Villalobos

    12/12/2025, 7:05 PM
    Hi everybody. I want to consider using Apache Flink CDC that has Mysql as a Source and Paimon as a Sink. I wanted to deploy this on the Kubernetes Operator, but I noticed that it requires the Mini Cluster. I prefer not to use Mini Cluster for production. How do I change that? My next option is Amazon Managed Service for Apache Flink, but I don't know if people have successfully integrated that with Apache Flink CDC. Is there a way to change Apache Flink CDC to NOT use MiniCluster in the Kubernetes Operator? Does Apache Flink CDC work with Amazon Managed Service for Apache Flink? My alternative is integrate Debezium directly, with Apache Kafka, and Spark structred streaming with Paimon. I'd prefer to use Flink.
    a
    • 2
    • 9
  • h

    Hristo Yordanov

    12/12/2025, 9:37 PM
    Hi all, Is possible to enable Watermarks when using Flink CDC Pipeline (yaml file). Version 3.5.0 and Flink 1.20? I want to see watermarks metrics on prometheus and watermarks on Flink UI when using pipeline approach.
  • b

    Brad Murry

    12/15/2025, 3:45 PM
    Howdy! I'm looking to traverse state using the state processor API, however my flink jobs are often a mix of datastream and Table/SQL APIs. I could potentially manually register state descriptors for a job on the datastream portions, follow best practices such as consistent UUIDs on operators, etc.. but for Table/SQL, since that is all generated via the planner, I'm not sure if there is a path forward. Does a methodology exist for interrogating a job graph and building a mapping of state descriptors, etc...?
    a
    a
    • 3
    • 10
  • r

    Royston

    12/16/2025, 12:36 PM
    Hi Was wondering if there is a tool or a combination of tools which anybody has used to act as a coordinator for jobs submitted in session mode Looking for mainly one thing 1. Scale up and scale down taskmanager pods based on some kind of queue of jobs 2. Additionally management of jobs in case of failure of job manager Afaik operator automatically does this part but wanted to confirm Thanks
    • 1
    • 1
  • t

    Tiago Pereira

    12/16/2025, 1:48 PM
    HI @Royston
  • t

    Tiago Pereira

    12/16/2025, 1:48 PM
    i´m not a member of flink development but what i know is that if you are using kubernetes to deploy your cluster
    r
    • 2
    • 1
1...9495969798Latest