https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • v

    Vedanth Baliga

    11/23/2025, 9:42 AM
    Hello all, Im trying to setup flink and redpanda on kubernetes but get a connection refused error when running select statements. Creating tables and views is fine so I think the jars are correct. Here's my k8s yaml. The producer is working well and I'm able to see messages on the topic. the problem is when I create a flink table in sql client and try to select from it. The same setup is working well on docker, I have no problems. Any help is appreciated!
    Copy code
    # ---------- Namespace ----------
    apiVersion: v1
    kind: Namespace
    metadata:
      name: telemetry
    ---
    # ---------- Redpanda-1 ----------
    apiVersion: v1
    kind: Service
    metadata:
      name: redpanda-1
      namespace: telemetry
    spec:
      selector:
        app: redpanda-1
      ports:
        - name: kafka
          port: 29092
          targetPort: 29092
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: redpanda-1
      namespace: telemetry
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: redpanda-1
      template:
        metadata:
          labels:
            app: redpanda-1
        spec:
          containers:
            - name: redpanda
              image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
              args:
                - redpanda
                - start
                - --smp
                - "1"
                - --reserve-memory
                - 0M
                - --overprovisioned
                - --node-id
                - "1"
                - --kafka-addr
                - <PLAINTEXT://0.0.0.0:29092>
                - --advertise-kafka-addr
                - <PLAINTEXT://redpanda-1:29092>
                - --rpc-addr
                - 0.0.0.0:33145
                - --advertise-rpc-addr
                - redpanda-1:33145
                - --pandaproxy-addr
                - <PLAINTEXT://0.0.0.0:28082>
                - --advertise-pandaproxy-addr
                - <PLAINTEXT://redpanda-1:28082>
              ports:
                - containerPort: 29092
                - containerPort: 28082
                - containerPort: 33145
              volumeMounts:
                - name: data
                  mountPath: /var/lib/redpanda
          volumes:
            - name: data
              emptyDir: {}
    ---
    # ---------- Redpanda-2 ----------
    apiVersion: v1
    kind: Service
    metadata:
      name: redpanda-2
      namespace: telemetry
    spec:
      selector:
        app: redpanda-2
      ports:
        - name: kafka
          port: 29093
          targetPort: 29093
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: redpanda-2
      namespace: telemetry
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: redpanda-2
      template:
        metadata:
          labels:
            app: redpanda-2
        spec:
          containers:
            - name: redpanda
              image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
              args:
                - redpanda
                - start
                - --smp
                - "1"
                - --reserve-memory
                - 0M
                - --overprovisioned
                - --node-id
                - "2"
                - --seeds
                - redpanda-1:33145
                - --kafka-addr
                - <PLAINTEXT://0.0.0.0:29093>
                - --advertise-kafka-addr
                - <PLAINTEXT://redpanda-2:29093>
                - --rpc-addr
                - 0.0.0.0:33146
                - --advertise-rpc-addr
                - redpanda-2:33146
                - --pandaproxy-addr
                - <PLAINTEXT://0.0.0.0:28083>
                - --advertise-pandaproxy-addr
                - <PLAINTEXT://redpanda-2:28083>
              ports:
                - containerPort: 29093
                - containerPort: 28083
                - containerPort: 33146
              volumeMounts:
                - name: data
                  mountPath: /var/lib/redpanda
          volumes:
            - name: data
              emptyDir: {}
    ---
    # ---------- Redpanda Console ----------
    apiVersion: v1
    kind: Service
    metadata:
      name: redpanda-console
      namespace: telemetry
    spec:
      selector:
        app: redpanda-console
      ports:
        - name: http
          port: 8080
          targetPort: 8080
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: redpanda-console
      namespace: telemetry
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: redpanda-console
      template:
        metadata:
          labels:
            app: redpanda-console
        spec:
          containers:
            - name: redpanda-console
              image: docker.redpanda.com/redpandadata/console:v2.2.4
              command: ["/bin/sh", "-c"]
              args:
                - echo "$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console
              env:
                - name: CONFIG_FILEPATH
                  value: /tmp/config.yml
                - name: CONSOLE_CONFIG_FILE
                  value: |
                    kafka:
                      brokers: ["redpanda-1:29092", "redpanda-2:29093"]
                      schemaRegistry:
                        enabled: false
                    redpanda:
                      adminApi:
                        enabled: false
                    connect:
                      enabled: false
              ports:
                - containerPort: 8080
    ---
    # ---------- Flink: JobManager ----------
    apiVersion: v1
    kind: Service
    metadata:
      name: jobmanager
      namespace: telemetry
    spec:
      selector:
        app: jobmanager
      ports:
        - name: rpc
          port: 6123
          targetPort: 6123
        - name: ui
          port: 8081
          targetPort: 8081
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: jobmanager
      namespace: telemetry
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: jobmanager
      template:
        metadata:
          labels:
            app: jobmanager
        spec:
          containers:
            - name: jobmanager
              image: flink-sql-k8s:1.19
              args: ["jobmanager"]
              env:
                - name: FLINK_PROPERTIES
                  value: |
                    jobmanager.rpc.address: jobmanager
                    jobmanager.bind-host: 0.0.0.0
                    jobmanager.rpc.port: 6123
                    rest.address: jobmanager
                    rest.bind-address: 0.0.0.0
                    rest.port: 8081
              ports:
                - containerPort: 6123
                - containerPort: 8081
    ---
    # ---------- Flink: TaskManager ----------
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: taskmanager
      namespace: telemetry
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: taskmanager
      template:
        metadata:
          labels:
            app: taskmanager
        spec:
          containers:
            - name: taskmanager
              image: flink-sql-k8s:1.19
              args: ["taskmanager"]
              env:
                - name: FLINK_PROPERTIES
                  value: |
                    jobmanager.rpc.address: jobmanager
                    taskmanager.numberOfTaskSlots: 20
                    taskmanager.bind-host: 0.0.0.0
              ports:
                - containerPort: 6121
                - containerPort: 6122
    ---
    # ---------- Flink: SQL Client ----------
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sql-client
      namespace: telemetry
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: sql-client
      template:
        metadata:
          labels:
            app: sql-client
        spec:
          containers:
            - name: sql-client
              image: flink-sql-k8s:1.19
              command: ["bash", "-c", "sleep infinity"]
              env:
                - name: FLINK_PROPERTIES
                  value: |
                    jobmanager.rpc.address: jobmanager
                    rest.address: jobmanager
    ---
    # ---------- ClickHouse ----------
    apiVersion: v1
    kind: Service
    metadata:
      name: clickhouse
      namespace: telemetry
    spec:
      selector:
        app: clickhouse
      ports:
        - name: http
          port: 8123
          targetPort: 8123
        - name: native
          port: 9000
          targetPort: 9000
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: clickhouse
      namespace: telemetry
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: clickhouse
      template:
        metadata:
          labels:
            app: clickhouse
        spec:
          containers:
            - name: clickhouse
              image: clickhouse/clickhouse-server:23.8
              ports:
                - containerPort: 8123
                - containerPort: 9000
              volumeMounts:
                - name: data
                  mountPath: /var/lib/clickhouse
                - name: logs
                  mountPath: /var/log/clickhouse-server
          volumes:
            - name: data
              emptyDir: {}
            - name: logs
              emptyDir: {}
    ---
    # ---------- Telemetry Producer (Python) ----------
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: telemetry-producer
      namespace: telemetry
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: telemetry-producer
      template:
        metadata:
          labels:
            app: telemetry-producer
        spec:
          containers:
            - name: telemetry-producer
              image: telemetry-producer:latest
              imagePullPolicy: IfNotPresent
              env:
                - name: KAFKA_BOOTSTRAP_SERVERS
                  value: "redpanda-1:29092"
                - name: KAFKA_TOPIC
                  value: "fleet.prod.telemetry.raw"
    My Dockerfile
    Copy code
    # base image
    FROM flink:1.19-scala_2.12-java11
    
    USER root
    
    RUN wget -P /opt/flink/lib/ \
        <https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar> && \
        wget -P /opt/flink/lib/ \
        <https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.19.0/flink-json-1.19.0.jar>
    j
    • 2
    • 1
  • b

    Bhargav Vekariya

    11/24/2025, 6:11 PM
    Hello all, I have setup of ◦ flink (1.20.1), ◦ flink-cdc (3.5.0), ◦ starrocks (4.0.0), ◦ mysql(8.0). • I'm using below pipeline yaml for data streaming. the problem is that there is one table that is related to add/delete cart at a time it has more data (more row count) in starrocks then mysql. I have check there is no data duplication of data in starrocks but delete is not getting performed what can be the problem? how can i resolve this? • I have tried the drop the table and restarting the pipeline. for some time it works fine. but after some time when traffic is high then starrocks data count getting more then mysql. even when traffic is almost none in non working hours its still dose not get back to normal
    Copy code
    source:
      type: mysql
      hostname: 10.3.4.168
      name: ms_mysql_prod_sales_us_ca
      port: 3306
      username: flink_replication_admin
      password: Uq^ZXXXXXXX
      tables: |
        ms.sales_flat_quote_item,
        ms_canada.sales_flat_quote_item
      server-id: 8000-8500
      server-time-zone: UTC
      scan.newly-added-table.enabled: true
      schema-change.enabled: true
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:<mysql://10.231.XX.XX:9030>
      load-url: 10.231.XX.XX:8030
      username: root
      password: dpBjIXXXXXX
      sink.buffer-flush.interval-ms: 2000
      sink.buffer-flush.max-bytes: 67108864
      table.create.properties.replication_num: 1
      sink.at-least-once.use-transaction-stream-load: false
      sink.io.thread-count: 6
      sink.label-prefix: flink_sync_custom
    
    route:
      - source-table: ms.\.*
        sink-table: ms.mage_us_<>
        replace-symbol: <>
        description: route all tables in ms to ms
      - source-table: ms_canada.\.*
        sink-table: ms.mage_ca_<>
        replace-symbol: <>
        description: route all tables in ms_canada to ms
    pipeline:
      parallelism: 4
      name: Sync MySQL Sales US/CA DWH Tables to StarRocks
      execution.checkpointing.interval: 30000
      execution.checkpointing.max-concurrent-checkpoints: 1
      execution.checkpointing.mode: EXACTLY_ONCE
      execution.checkpointing.timeout: 300000
      execution.checkpointing.min-pause: 5000
      state.backend.changelog.enabled: true
  • a

    Ashish Marottickal Gopi

    11/24/2025, 8:06 PM
    Hello all, A question on Broadcast State pattern: Scenario: I have a High throughput inventory sales data and another reference data that has details of max stock of each product ( Just around 2000 records but will receive updates and deletes). So basically i would want to alert if the stock sales crosses this thresold. My first solution was to use the Broadcast state pattern, where i broadcasted the Threshold data in a KeyedBroadcastProcessFunction to the sales data. Now my overall parallelism is >1 for the job. The threshold event comes to Kafka as kind of change events ( create, update, deletes ) and so I will have to reconcile the state as per the change type ). This reconciliation Im doing in the
    processBroadcastElement
    function Question: Is Broadcasting the state the right approach ? I'm specifically confused on following points: 1. Will the broadcasted state be consistent across the parallel instances of the non-broadcasted stream ? 2. Since I have the CDC kind of data in the broadcasted stream, I believe the state update that i make will be consistent across the parallel tasks of the non-broadcasted stream ? 3. Should I change the parallelism of the broadcast stream to 1 ? 4. Or does it seems good to just use a KeyedCoProcessJoin for this ?
  • e

    Eddy Agossou

    11/25/2025, 11:51 AM
    Hi all! I'm constantly getting an 's3:// scheme not supported' error when trying to deploy a FlinkSessionJob. I'm currently using a session deployment. One JobManager + TaskManager with a FlinkDeployment, all using the kubernetes-operators. P.S.: Adding the s3-fs-presto plugin to the plugins /opt/flink/plugins folder (in the podTemplate FlinkDeployment.yml) did not help. Has anyone faced that before? Any guidance will be appreciated. I've been stuck on it for 24hours now
    r
    a
    +3
    • 6
    • 24
  • b

    Ben Mali

    11/27/2025, 8:05 AM
    Hi all! I'm new to Flink, and I'm trying to understand how Flink autoscaling works on Kubernetes. I'm using aws kinesis connector, so I want a fixed number of task manager pods. I've deployed Flink using the Flink operator version 1.13.0 with Flink 1.20. Deploying using
    FlinkDeployment
    , defining
    spec.jobManager.replicas
    works as expected, but when I'm trying to specify
    spec.taskManager.replicas
    it seems to be ignored. Does anyone know how can I manually configure the number of task manager pods? Should I configure it using HPA?
    a
    • 2
    • 15
  • p

    Philipp

    11/27/2025, 10:12 AM
    Is it possible to use the Table API for protobuf data in apache kafka? Background of this question is, that the data in kafka is not raw protobuf but is prefixed with some metadata bytes (mostly the used SchemaID)
  • h

    Hiếu trịnh

    11/28/2025, 3:32 AM
    I'm new to Flink, I'm having a problem adding data to jdbc oracle. I read data from kafka confluent with upsert mode - kafka. insert into oracle from the result of joining streaming 2 tables. I can't append only to the oracle table, but only upsert. This is my error: java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.   Here is my code:   CREATE TABLE accounts (  KEY STRING,  NAME STRING, .... y-MM-dd HHmmss.SSS'),  PRIMARY KEY (RECID) NOT ENFORCED ) WITH (  'connector' = 'upsert-kafka',  'topic' = 'account',  'properties.bootstrap.servers' = '${confluent.cluster}',  'key.format' = 'raw',  'value.format' = 'avro-confluent',  'value.avro-confluent.url' = '${confluent.schema.registry}' );     CREATE TABLE transaction(  ID STRING,  BALANCE INT, .... y-MM-dd HHmmss.SSS'),  PRIMARY KEY (RECID) NOT ENFORCED ) WITH (  'connector' = 'upsert-kafka',  'topic' = 'transaction',  'properties.bootstrap.servers' = '${confluent.cluster}',  'key.format' = 'raw',  'value.format' = 'avro-confluent',  'value.avro-confluent.url' = '${confluent.schema.registry}' );   CREATE TABLE oracle_sink (  ... ) WITH (  'connector' = 'jdbc',  'url' = '${oracle.host}',  'table-name' = 'RESULT_TBL',  'username' = '${oracle.ogg.user}',  'password' = '${oracle.ogg.password}',  'driver' = 'oracle.jdbc.driver.OracleDriver'  );   INSERT INTO oracle_sink SELECT  … from account acc inner join transaction tran ....
  • h

    H.P. Grahsl

    12/01/2025, 10:41 AM
    ❓ I was wondering how Flink handles calls to one and the same UDF that's applied multiple times in a single Flink SQL / Table API query running on a task manager... e.g. if I do `SELECT MY_UDF(col1), MY_UDF(col2) FROM some_table`: • will Flink create two instances of MY_UDF, one for each usage? • will Flink create only a single MY_UDF instance and reuse this across the two usages? • is any of the above guaranteed to be consistently applied on the same / across different Flink version? • can any such behavior be configured / forced to some degree? THX for any in-depth insights on this. 🙏 Addendum: Let's assume a single TM with parallelism=1 and deterministic scalar function.
    g
    • 2
    • 3
  • j

    Jon Slusher

    12/02/2025, 6:52 PM
    x-posting just in case this is an error more common than just in PyFlink: https://apache-flink.slack.com/archives/C065944F9M2/p1764698399081389
    • 1
    • 1
  • a

    Aleksei Perminov

    12/03/2025, 3:21 PM
    Hi guys. Maybe anyone faced this issue and know how to fix it. Recently we've updated to Flink 2.0 and started facing the following issues
    Copy code
    2025-11-22 10:53:52.155 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}, current pending count: 2. 
    2025-11-22 10:53:52.156 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.e.ExternalResourceUtils - Enabled external resources: [] 
    2025-11-22 10:53:52.157 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.configuration.Configuration - Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.taskmanager.service-account' 
    2025-11-22 10:53:52.160 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Creating new TaskManager pod with name taskmanager-3-2 and resource <16384,12.0>. 
    2025-11-22 10:53:52.260 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:35310] failed with null 
    2025-11-22 10:53:52.272 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Pod taskmanager-3-2 is created. 
    2025-11-22 10:53:52.281 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Received new TaskManager pod: taskmanager-3-2 
    2025-11-22 10:53:52.282 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requested worker taskmanager-3-2 with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}. 
    2025-11-22 10:53:52.399 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.ReliableDeliverySupervisor - Association with remote system [<pekko.tcp://flink@10.0.0.180:6122>] has failed, address is now gated for [50] ms. Reason: [Disassociated] 
    2025-11-22 10:54:07.260 [flink-pekko.actor.default-dispatcher-19] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:34164] failed with null 
    2025-11-22 10:54:15.283 [flink-pekko.actor.default-dispatcher-15] INFO o.a.f.r.r.a.ActiveResourceManager - Registering TaskManager with ResourceID taskmanager-3-2 (<pekko.tcp://flink@10.0.0.181:6122/user/rpc/taskmanager_0>) at ResourceManager 
    2025-11-22 10:54:15.291 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.s.FineGrainedSlotManager - Registering task executor taskmanager-3-2 under 12848c284ec438712332f1fd3b76a28a at the slot manager. 
    2025-11-22 10:54:15.292 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Worker taskmanager-3-2 is registered. 
    2025-11-22 10:54:15.300 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Closing TaskExecutor connection taskmanager-3-2 because: Failed to send initial slot report to ResourceManager. java.lang.IllegalStateException: Cannot decrease, no worker of spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=25}.
    Basically, in FlinkDeployment the taskmanager.numberOfTaskSlots is set to 24, but for some reason the slot report shows 25 slots in the active Task Manager, which leads to the mentioned exception. It doesn't matter whether I set 24 or 32 numberOfTaskSlots, the issue will be the same - +1 slot in slot report and exception. This happens for about 5 minutes and then suddenly the job starts working fine. We use 1.12.1 Flink K8S Operator version and Flink 2.0.0. However this issue rarely appears on 1.20.1 as well. Thanks in advance.
  • m

    morti

    12/03/2025, 3:50 PM
    Hi friends. After migrating to Flink-2.1, I cannot submit the job with program arguments. I think the bug is related to the issue: https://issues.apache.org/jira/browse/FLINK-36275. Since
    programArgs
    query parameter is deleted, No program arguments (e.g, --batchSize 5000) are sent to the job. I'm currently using flink-2.1.1-java21 image. There is also related stackoverflow question: https://stackoverflow.com/questions/79702619/how-do-i-pass-program-arguments-from-apache-flink-2-0-0-web-gui-to-my-job-proper It works with rest api mentioned in stackoverflow question but as I said, program arguments are not sent to job with webui. Any help or Tip, would be appreciated!
  • j

    Jon Slusher

    12/05/2025, 9:18 PM
    👋 I'm trying to troubleshoot a new error I'm getting when testing my PyFlink job using Flink (1.20.3) and the operator in Kubernetes. The job seems to be timing out to my brokers, but as far as I can tell it has the same broker configuration that my Debezium connector has 🧵
    j
    • 2
    • 17
  • r

    Rion Williams

    12/06/2025, 3:59 AM
    Hey all — is there any way to confirm subscriptions to the mailing lists? I haven't received many/any in quite a long time (user will occasionally come through, but never dev or community). I’ve resubmitted the various $list-subscribe@flink.apache.org to all of the lists but never received the expected confirmation response. Any ideas?
    d
    • 2
    • 6
  • g

    George Leonard

    12/07/2025, 10:42 AM
    hi hi all. trying to put a stream together, Shadowtraffic as data generator of the 2 products into 2 Postgres tables 'accountholders' and 'transactions'. Postgres into Flink using CDC -> This is all working, I can select from either table and get results via flink-sql. from my Postgres catalog based table into a Apache Paimon based table on Minio/S3. this is where I'm having some "sluggishness" atm. I'm expecting buckets to be created allot fast, allot more data to end in Paimon/S3... attached is the log file. For now I'm just trying t confirm this all is working before I start with the intended Pyflink "processing" that will consume from the Postgre source and output to the Paimon target. below is my 2 source table creates and the CTAS that pushes data into Paimon.
    Copy code
    # CDC Sources
    CREATE OR REPLACE TABLE postgres_catalog.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,address            STRING
        ,accounts           STRING
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    
    
    CREATE OR REPLACE TABLE postgres_catalog.demog.transactions (
         _id                            BIGINT              NOT NULL
        ,eventid                        VARCHAR(36)         NOT NULL
        ,transactionid                  VARCHAR(36)         NOT NULL
        ,eventtime                      VARCHAR(30)
        ,direction                      VARCHAR(8)
        ,eventtype                      VARCHAR(10)
        ,creationdate                   VARCHAR(20)
        ,accountholdernationalid        VARCHAR(16)
        ,accountholderaccount           STRING
        ,counterpartynationalid         VARCHAR(16)
        ,counterpartyaccount            STRING
        ,tenantid                       VARCHAR(8)
        ,fromid                         VARCHAR(8)
        ,accountagentid                 VARCHAR(8)
        ,fromfibranchid                 VARCHAR(6)
        ,accountnumber                  VARCHAR(16)
        ,toid                           VARCHAR(8)
        ,accountidcode                  VARCHAR(5)
        ,counterpartyagentid            VARCHAR(8)
        ,tofibranchid                   VARCHAR(6)
        ,counterpartynumber             VARCHAR(16)
        ,counterpartyidcode             VARCHAR(5)
        ,amount                         STRING
        ,msgtype                        VARCHAR(6)
        ,settlementclearingsystemcode   VARCHAR(5)
        ,paymentclearingsystemreference VARCHAR(12)
        ,requestexecutiondate           VARCHAR(10)
        ,settlementdate                 VARCHAR(10)
        ,destinationcountry             VARCHAR(30)
        ,localinstrument                VARCHAR(2)
        ,msgstatus                      VARCHAR(12)
        ,paymentmethod                  VARCHAR(4)
        ,settlementmethod               VARCHAR(4)
        ,transactiontype                VARCHAR(2)
        ,verificationresult             VARCHAR(4)
        ,numberoftransactions           INT
        ,schemaversion                  INT
        ,usercode                       VARCHAR(4)
        ,created_at                     TIMESTAMP_LTZ(3)
        ,WATERMARK                      FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'transactions'
        ,'slot.name'                           = 'transactions0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    and the create table as inserts.
    Copy code
    SET 'execution.checkpointing.interval'   = '10s';
    SET 'pipeline.name' = 'Persist into Paimon: accountholders table';
    
    CREATE OR REPLACE TABLE c_paimon.finflow.accountholders WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM postgres_catalog.demog.accountholders;
    
    SET 'pipeline.name' = 'Persist into Paimon: transactions table';
    
    CREATE OR REPLACE TABLE c_paimon.finflow.transactions WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM postgres_catalog.demog.transactions;
    jobmanager_log-2.txt
  • m

    Manish Jain

    12/09/2025, 9:43 AM
    Hi All, We are using Flink 1.20, and the related kafka-connector.
    Copy code
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.4.0-1.20</version>
    </dependency>
    This version of kafka client has 2 vulnerabilities. These vulnerabilities are only fixed in v3.9.1 of kafka clients. We are wondering how to handle this? Is it recommended to exclude the kafka-client from the connector and then override it with the new version of kafka-client. We are afraid that it might not be as easy as it sounds. 🙂 Is running the test cases for Flink sufficient to verify that the new version is flink compatible. Or are there other things we'd keep in my while doing this upgrade. PS: In the longer run, we might upgrade to Flink 2.0, but right now that is not on our plans.
    t
    • 2
    • 5
  • j

    Jon Slusher

    12/09/2025, 4:36 PM
    👋 Surfacing this issue in case it got lost in the shuffle. I'm still getting a certificate error on my Flink Kafka source trying to connect to my Kafka cluster using SASL_SSL. • Do I need to combine the certificate I exported from the cluster with another set of certificates? • Does anyone know how to set
    <http://javax.net|javax.net>.debug=ssl
    in the connector so I can see what certificate it's trying to use?
    • 1
    • 1
  • u

    Urs Schoenenberger

    12/10/2025, 2:24 PM
    Hi folks, we are still facing FLINK-38582 (GlobalCommitter stops committing, in this case in DeltaSink, after rescaling). Parts of this seems to have been fixed in FLINK-37747, but the issue persists, albeit rarely and not reproducible. I'm trying to further pinpoint the issue, so I wanted to ask: Are there any known general issues when using the (v1) GlobalCommitter with unaligned checkpoints? Either in the general way GlobalCommitter works, or any common implementation issues that the DeltaSink may have when implementing its GlobalCommitter?
  • a

    Almog Golbar

    12/10/2025, 4:10 PM
    Hi guys, We enabled Flink autoscaler memory tuning but aren't seeing recommendations, even with clear over-provisioning. Versions: • Flink: v1_17 (1.17.1) • Operator: 1.13.0 Configuration:
    Copy code
    job.autoscaler.enabled: "true"
    job.autoscaler.memory.tuning.enabled: "true"  #also tried false for dry run
    job.autoscaler.metrics.window: "5m"
    job.autoscaler.stabilization.interval: "30s"
    job.autoscaler.scaling.enabled: "false"
    What we've done: 1. Removed explicit memory configs (taskmanager.memory.process.size, taskmanager.memory.managed.size, taskmanager.memory.network.max) 2. Set taskManager.resource.memory: 12g 3. Verified metrics collection (6+ data points in ConfigMap autoscaler-<deployment-name>) 4. Confirmed autoscaler is active (ScalingExecutor running, metrics being collected) Current situation: • Memory: 12GB allocated, ~24% heap usage (~1.36GB used) • Metrics: Being collected and stored in ConfigMap • No recommendations: Not appearing in operator logs, Kubernetes events, or FlinkDeployment status/annotations What we've checked: • Operator logs show autoscaler activity but no memory tuning recommendations • No errors or warnings related to memory tuning • Metrics window is filling (5+ minutes of data collected) • Job is processing data (not idle) Has anyone seen memory tuning recommendations with this operator version? Are there additional requirements or known issues? Any guidance on where recommendations should appear or how to troubleshoot?
  • j

    Jon Slusher

    12/10/2025, 7:15 PM
    👋 I'm still trying to troubleshoot a connection from a Table API
    kafka
    source connector (using PyFlink) to our Kafka cluster using
    SASL_SSL
    and
    SCRAM-SHA-512
    . • Can anyone at least confirm that this should be possible? • I have some DEBUG logs to share that to me definitely point to the taskManager being the culprit. I see in the logs the jobManager is definitely connecting successfully to my Kafka brokers because it can see the topic I and partitions want to source. A taskManager is spawned shortly afterwards and that's what seems to file. • I'm using Flink 1.20.3, and the Flink operator in Kubernetes to manage the job • I'll post the configuration and a link to a condensed and redacted Gist of the DEBUG logs I captured for my connector in a thread
    h
    j
    • 3
    • 9
  • t

    Taranpreet Kaur

    12/11/2025, 2:57 AM
    Hi team! I have a use case where I want to run multiple separate Flink job instances that share Kafka topic partitions via Kafka's native consumer group balancing, rather than having each Flink job consume all partitions. Current behavior: Flink's KafkaSource automatically assigns all topic partitions to a single job, managing offsets internally for exactly-once semantics. Desired behavior: Multiple Flink job instances with the same Kafka consumer group ID, where Kafka's consumer group coordinator distributes partitions across instances (e.g., Instance A gets partition 0, Instance B gets partition 1). Questions: 1. Is there a way to configure KafkaSource to use Kafka's consumer group partition assignment instead of Flink's internal assignment? 2. How would this impact parallelism within each Flink job? If I use Kafka's subscribe mechanism, would I need to set parallelism=1 to avoid conflicts between Flink's subtasks and Kafka's partition assignment? 3. How would checkpointing work in this scenario? Would each Flink instance only checkpoint offsets for its assigned partitions? What happens during partition rebalancing when a failed instance's partitions get reassigned to surviving instances? 4. Are there any existing patterns or configurations for this use case? Context: We don't require exactly-once semantics and prefer the operational flexibility of having independent Flink instances that can scale based on Kafka's partition distribution. We're particularly concerned about checkpoint recovery behavior when Kafka rebalances partitions across instances. Any guidance or alternative approaches would be appreciated!
    j
    • 2
    • 1
  • g

    George Leonard

    12/11/2025, 5:57 PM
    hi hi all... bit stuck, trying to push data into a iceberg tables. Need someone to see where I went wrong... missed something the below is the code that created the bucket, I can see the bucket in MinIO: CDC Source table
    Copy code
    # CDC Sources
    CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,accounts           STRING
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    flink catalog create
    Copy code
    CREATE CATALOG c_iceberg WITH (
       'type'='iceberg'
      ,'catalog-type'='rest'
      ,'uri'='<http://polaris:8181/api/catalog>'
      ,'warehouse'='icebergcat'
      ,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
      ,'credential'='root:s3cr3t'
      ,'scope'='PRINCIPAL_ROLE:ALL'
      ,'s3.endpoint'='<http://minio:900>'
      ,'s3.access-key-id'='mnadmin'
      ,'s3.secret-access-key'='mnpassword'
      ,'s3.path-style-access'='true'
    );
    
    
    USE CATALOG c_iceberg;
    -- Create using below
    CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
    contents from .env file driving the compose.yaml ROOT_CLIENT_ID=root ROOT_CLIENT_SECRET=s3cr3t CATALOG_NAME=icebergcat POLARIS_REALM=findept MINIO_ROOT_USER=mnadmin MINIO_ROOT_PASSWORD=mnpassword MINIO_ALIAS=minio MINIO_ENDPOINT=http://minio:9000 MINIO_BUCKET=warehouse AWS_ACCESS_KEY_ID=mnadmin AWS_SECRET_ACCESS_KEY=mnpassword AWS_REGION=za-south-1 AWS_DEFAULT_REGION=za-south-1 polaris-bootsrap service from docker-compose
    Copy code
    polaris-bootstrap:
        image: apache/polaris-admin-tool:latest
        hostname: polaris-bootstrap
        container_name: polaris-bootstrap
        depends_on:
          postgrescat:
            condition: service_healthy    
        environment:
          - POLARIS_PERSISTENCE_TYPE=relational-jdbc
          - QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          - QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
          - QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
          - POLARIS_REALM=${POLARIS_REALM}
          - ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
          - ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
        command:
          - "bootstrap"
          - "--realm=${POLARIS_REALM}"
          - "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
    polaris-setup service from docker-compose.
    Copy code
    polaris-setup:
        image: alpine/curl
        hostname: polaris-setup
        container_name: polaris-setup
        depends_on:
          polaris:
            condition: service_healthy
        environment:
          - CLIENT_ID=${ROOT_CLIENT_ID}
          - CLIENT_SECRET=${ROOT_CLIENT_SECRET}
          - CATALOG_NAME=${CATALOG_NAME}
          - POLARIS_REALM=${POLARIS_REALM}
          - MINIO_BUCKET=${MINIO_BUCKET}
          - MINIO_ENDPOINT=${MINIO_ENDPOINT}
        volumes:
          - ./conf/polaris/:/polaris
        entrypoint: "/bin/sh"
        command:
          - "-c"
          - >-
            chmod +x /polaris/create-catalog.sh;
            chmod +x /polaris/obtain-token.sh;
            source /polaris/obtain-token.sh $$POLARIS_REALM;
            echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
            export STORAGE_CONFIG_INFO='{"storageType":"S3",
              "endpoint":"<http://localhost:9000>",
              "endpointInternal":"'$$MINIO_ENDPOINT'",
              "pathStyleAccess":true}';
            export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
            /polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
            echo Extra grants...;
            curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
              -X PUT \
              <http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
              -d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
            echo Polaris Setup Complete.;
    output from fetch catalogs api call, verifying the catalog and full path
    Copy code
    curl -X GET <http://localhost:8181/api/management/v1/catalogs> \
    >   -H "Authorization: Bearer ${TOKEN}" | jq
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100   388  100   388    0     0  13756      0 --:--:-- --:--:-- --:--:-- 13857
    {
      "catalogs": [
        {
          "type": "INTERNAL",
          "name": "icebergcat",
          "properties": {
            "default-base-location": "<s3://warehouse/iceberg>"
          },
          "createTimestamp": 1765474601004,
          "lastUpdateTimestamp": 1765474601004,
          "entityVersion": 1,
          "storageConfigInfo": {
            "endpoint": "<http://localhost:9000>",
            "endpointInternal": "<http://minio:9000>",
            "pathStyleAccess": true,
            "storageType": "S3",
            "allowedLocations": [
              "<s3://warehouse/iceberg>"
            ]
          }
        }
      ]
    }
    ERROR
    Copy code
    > CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
    >      'file.format'                       = 'parquet'
    >     ,'compaction.min.file-num'           = '2'
    >     ,'compaction.early-max.file-num'     = '50'
    >     ,'snapshot.time-retained'            = '1h'
    >     ,'snapshot.num-retained.min'         = '5'
    >     ,'snapshot.num-retained.max'         = '20'
    >     ,'table.exec.sink.upsert-materialize'= 'NONE'
    > ) AS 
    > SELECT * FROM c_cdcsource.demog.accountholders;
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1164/0x0000000801826a00 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
    docker images
    Copy code
    arm64v8/flink:1.20.2-scala_2.12-java17                      79a54781f5d0        789MB             0B        
    georgelza/apacheflink-base-1.20.2-scala_2.12-java17:1.0.2   dd8585afe66c       6.84GB             0B    U
    Flink container build / Dockerfile, as ... error search is saying version incompatibility...
    Copy code
    FROM arm64v8/flink:1.20.2-scala_2.12-java17
    SHELL ["/bin/bash", "-c"]
    
    ENV FLINK_HOME=/opt/flink
    ENV HIVE_HOME=$FLINK_HOME/conf/
    WORKDIR $FLINK_HOME
    
    
    RUN echo "--> Setup Directory Structure" && \
        mkdir -p /opt/flink/conf/ && \
        mkdir -p /opt/flink/checkpoints && \
        mkdir -p /opt/flink/rocksdb 
    
    RUN echo "--> Install JARs: Flink's S3 plugin" && \
        mkdir -p ./plugins/s3-fs-hadoop && \
        mv ./opt/flink-s3-fs-hadoop-1.20.2.jar ./plugins/s3-fs-hadoop/
    
    RUN echo "--> Install Flink JARs: => Generic"
    COPY stage/bundle-2.31.9.jar                            /opt/flink/lib/bundle-2.31.9.jar  
    COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar    /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    
    COPY stage/postgresql-42.7.6.jar                        /opt/flink/lib/postgresql-42.7.6.jar
    # CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
    COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar   /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
    COPY stage/flink-sql-json-1.20.2.jar                    /opt/flink/lib/flink-sql-json-1.20.2.jar     
    COPY stage/flink-sql-parquet-1.20.2.jar                 /opt/flink/lib/flink-sql-parquet-1.20.2.jar
    COPY stage/flink-json-1.20.2.jar                        /opt/flink/lib/flink-json-1.20.2.jar
    COPY stage/flink-connector-jdbc-3.3.0-1.20.jar          /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
    
    RUN echo "--> Install Flink JARs: => Open Table Formats"
    COPY stage/iceberg-flink-1.20-1.9.1.jar                 /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
    COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar         /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
    COPY stage/iceberg-aws-bundle-1.9.1.jar                 /opt/flink/iceberg-aws-bundle-1.9.1.jar
    
        
    RUN echo "--> Set Ownerships of /opt/flink" && \
        chown -R flink:flink $FLINK_HOME 
    
    USER flink:flink
    
    CMD ./bin/start-cluster.sh && sleep infinity
  • g

    George Leonard

    12/11/2025, 6:33 PM
    interesting errors in the docker compose logs wrt namespace
    err.txt
  • g

    George Leonard

    12/12/2025, 9:55 AM
    hi hi all. Need help pls. Downgraded to 1.20.1, Iceberg 1.9.1 hoping my problems was version incompatibility, as all searches pointed at that... below is my dockerfile used to build the flink container being used. I've now added the jackson jar, as previous errors pointed/complaint about that being missing.
    Copy code
    FROM arm64v8/flink:1.20.1-scala_2.12-java17
    SHELL ["/bin/bash", "-c"]
    ENV FLINK_HOME=/opt/flink
    ENV HIVE_HOME=$FLINK_HOME/conf/
    WORKDIR $FLINK_HOME
    
    
    RUN echo "--> Setup Directory Structure" && \
        mkdir -p /opt/flink/conf/ && \
        mkdir -p /opt/flink/checkpoints && \
        mkdir -p /opt/flink/rocksdb 
    
    RUN echo "--> Install JARs: Flink's S3 plugin" && \
        mkdir -p ./plugins/s3-fs-hadoop && \
        mv ./opt/flink-s3-fs-hadoop-1.20.1.jar ./plugins/s3-fs-hadoop/
    
    RUN echo "--> Install JARs: => Generic"
    COPY stage/bundle-2.31.9.jar                            /opt/flink/lib/bundle-2.31.9.jar  
    COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar    /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    COPY stage/jackson-core-2.20.1.jar                      /opt/flink/lib/jackson-core-2.20.1.jar
    COPY stage/postgresql-42.7.6.jar                        /opt/flink/lib/postgresql-42.7.6.jar
    # CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
    COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar   /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
    
    # Flink version specific
    RUN echo "--> Install JARs: => Flink 1.20.1"
    COPY stage/flink-sql-json-1.20.1.jar                    /opt/flink/lib/flink-sql-json-1.20.1.jar     
    COPY stage/flink-sql-parquet-1.20.1.jar                 /opt/flink/lib/flink-sql-parquet-1.20.1.jar
    COPY stage/flink-json-1.20.1.jar                        /opt/flink/lib/flink-json-1.20.1.jar
    COPY stage/flink-connector-jdbc-3.3.0-1.20.jar          /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
    
    RUN echo "--> Install JARs: => Iceberg 1.9.1"
    COPY stage/iceberg-flink-1.20-1.9.1.jar                 /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
    COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar         /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
    COPY stage/iceberg-aws-bundle-1.9.1.jar                 /opt/flink/iceberg-aws-bundle-1.9.1.jar
    
    RUN echo "--> Set Ownerships of /opt/flink" && \
        chown -R flink:flink $FLINK_HOME 
    
    USER flink:flink
    
    CMD ./bin/start-cluster.sh && sleep infinity
    below is the polaris bits from my docker-compose file, included as this creates the polaris catalog, uses the previously created minio warehouse/iceberg bucket
    Copy code
    polaris:
        image: apache/polaris:latest
        hostname: polaris
        container_name: polaris
        depends_on:
          postgrescat:
            condition: service_healthy
          minio:
            condition: service_healthy
          polaris-bootstrap:
            condition: service_completed_successfully
    
        ports:
          # API port
          - "8181:8181"
          # Management port (metrics and health checks)
          - "8182:8182"
          # Optional, allows attaching a debugger to the Polaris JVM
          - "5005:5005"
        environment:
    
          POLARIS_PERSISTENCE_TYPE: relational-jdbc
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES: 5
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS: 100
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DURATION_IN_MS: 5000
          QUARKUS_DATASOURCE_JDBC_URL: jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          QUARKUS_DATASOURCE_USERNAME: ${POSTGRES_CAT_USER}
          QUARKUS_DATASOURCE_PASSWORD: ${POSTGRES_CAT_PASSWORD}
          POLARIS_REALM_CONTEXT_REALMS: ${POLARIS_REALM}
          QUARKUS_OTEL_SDK_DISABLED: true
          polaris.features."ALLOW_INSECURE_STORAGE_TYPES": "true"
          polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES": "[\"FILE\",\"S3\"]"
          polaris.readiness.ignore-severe-issues: "true"
          
          # S3 endpoint for MinIO
          AWS_REGION: ${AWS_REGION}
          AWS_ACCESS_KEY_ID: ${MINIO_ROOT_USER}
          AWS_SECRET_ACCESS_KEY: ${MINIO_ROOT_PASSWORD}
          POLARIS_BOOTSTRAP_CREDENTIALS: ${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}
          polaris.realm-context.realms: ${POLARIS_REALM}
          quarkus.otel.sdk.disabled: "true"
    
        volumes:
          - ./data/polaris:/var/lib/polaris
          - ./conf/polaris:/polaris 
        healthcheck:
          test: ["CMD", "curl", "<http://localhost:8182/q/health>"]
          interval: 5s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: unless-stopped
    
      polaris-bootstrap:
        image: apache/polaris-admin-tool:latest
        hostname: polaris-bootstrap
        container_name: polaris-bootstrap
        depends_on:
          postgrescat:
            condition: service_healthy    
        environment:
          - POLARIS_PERSISTENCE_TYPE=relational-jdbc
          - QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          - QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
          - QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
          - POLARIS_REALM=${POLARIS_REALM}
          - ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
          - ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
        command:
          - "bootstrap"
          - "--realm=${POLARIS_REALM}"
          - "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
    
      polaris-setup:
        image: alpine/curl
        hostname: polaris-setup
        container_name: polaris-setup
        depends_on:
          polaris:
            condition: service_healthy
        environment:
          - CLIENT_ID=${ROOT_CLIENT_ID}
          - CLIENT_SECRET=${ROOT_CLIENT_SECRET}
          - CATALOG_NAME=${CATALOG_NAME}
          - POLARIS_REALM=${POLARIS_REALM}
          - MINIO_BUCKET=${MINIO_BUCKET}
          - MINIO_ENDPOINT=${MINIO_ENDPOINT}
        volumes:
          - ./conf/polaris/:/polaris
        entrypoint: "/bin/sh"
        command:
          - "-c"
          - >-
            chmod +x /polaris/create-catalog.sh;
            chmod +x /polaris/obtain-token.sh;
            source /polaris/obtain-token.sh $$POLARIS_REALM;
            echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
            export STORAGE_CONFIG_INFO='{"storageType":"S3",
              "endpoint":"<http://localhost:9000>",
              "endpointInternal":"'$$MINIO_ENDPOINT'",
              "pathStyleAccess":true}';
            export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
            /polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
            echo Extra grants...;
            curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
              -X PUT \
              <http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
              -d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
            echo Polaris Setup Complete.;
    c_iceberg catalog create
    Copy code
    CREATE CATALOG c_iceberg WITH (
       'type'='iceberg'
      ,'catalog-type'='rest'
      ,'uri'='<http://polaris:8181/api/catalog>'
      ,'warehouse'='icebergcat'
      ,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
      ,'credential'='root:s3cr3t'
      ,'scope'='PRINCIPAL_ROLE:ALL'
      ,'s3.endpoint'='<http://minio:900>'
      ,'s3.access-key-id'='mnadmin'
      ,'s3.secret-access-key'='mnpassword'
      ,'s3.path-style-access'='true'
    );
    
    
    USE CATALOG c_iceberg;
    -- Create using below
    CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
    flink sql being executed
    Copy code
    CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,accounts           STRING
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    
    SET 'execution.checkpointing.interval'   = '60s';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'pipeline.name' = 'Persist into Iceberg: accountholders table';
    CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM c_cdcsource.demog.accountholders;
    The above resulted in the below
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: finflow
    After I got the above namespace problem I executed the following
    Copy code
    curl -X POST <http://localhost:8181/api/catalog/v1/icebergcat/namespaces> \
        -H "Authorization: Bearer ${TOKEN}" \
        -H 'Content-Type: application/json' \
        -d '{"namespace": ["finflow"], "properties": {"description": "Iceberg catalog database"}}' | jq
    I then reran my above CTAS errors below that
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
    
    Flink SQL>
  • g

    George Leonard

    12/12/2025, 10:04 AM
    NOTE I can see my output table in MinIO, there is a metadata.json created. I can also select from my source table. Flink SQL> select * from c_cdcsource.demog.accountholders; [INFO] Result retrieval cancelled. Flink SQL> drop table c_iceberg.finflow.accountholders; [ERROR] Could not execute SQL statement. Reason: java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson. Flink SQL> use catalog c_iceberg; [INFO] Execute statement succeeded. Flink SQL> use finflow; [INFO] Execute statement succeeded. Flink SQL> show tables; +----------------+ | table name | +----------------+ | accountholders | +----------------+ 1 row in set Flink SQL> drop table accountholders; [ERROR] Could not execute SQL statement. Reason: java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
  • g

    George Leonard

    12/12/2025, 10:08 AM
    figured i will pre create the table and then do a insert into <> select * from <>... but well can't even drop the table;
  • f

    Francisco Morillo

    12/12/2025, 10:26 AM
    Hi everyone! While using Flink SQL there are many aggregations such as group bys, joins that turn the streams into an Upsert Stream, and there are many connectors that do not allow upsert stream, one example is Kinesis. Is there a way to disable the stream to become an upsert stream, and simply become an append only stream? We have made a workaround by doing 1 second windowing to turn it into append only, but adds unnecessary latency
    d
    • 2
    • 2
  • a

    Anooj S

    12/12/2025, 1:31 PM
    Hello everyone, We are running a Flink session cluster in standalone mode on Kubernetes, and we deploy multiple TaskManager deployments under the same session cluster, each with different CPU/memory configurations. This allows us to support both lightweight and CPU-heavy pipelines in the same Flink session. Example setup: • TM type A → low-spec (1 CPU, 4GB) • TM type B → high-spec (4 CPU, 16GB) • TM type C → mid-spec (2 CPU, 8GB) All TaskManagers register with the same JobManager. Our problem: When we submit a job, we cannot control which TaskManager type the job will use. Flink may place CPU-heavy tasks on low-spec TMs because we cannot specify: • TaskManager affinity • Slot affinity • TM selection rules • Resource-class filtering (beyond resource profiles) We previously used Native Kubernetes mode via the Flink K8s Operator, but it doesn’t allow different TM resource specs within the same session cluster. Switching to standalone mode gave us flexibility to run multiple TM deployments, but we still cannot control which TM the JobManager assigns tasks to. We found a similar question from 2021:https://stackoverflow.com/questions/68531175/task-manager-affinity-in-flink Any guidance would be appreciated. Thanks!
  • g

    George Leonard

    12/12/2025, 2:48 PM
    RE My ERROR above. I implemented a JDBC catalog and created Paimon based tables onto the same MinIO stack and thats working... so Flink itself is working. this is somewhere in the polaris integration
  • m

    Marco Villalobos

    12/12/2025, 7:05 PM
    Hi everybody. I want to consider using Apache Flink CDC that has Mysql as a Source and Paimon as a Sink. I wanted to deploy this on the Kubernetes Operator, but I noticed that it requires the Mini Cluster. I prefer not to use Mini Cluster for production. How do I change that? My next option is Amazon Managed Service for Apache Flink, but I don't know if people have successfully integrated that with Apache Flink CDC. Is there a way to change Apache Flink CDC to use MiniCluster in the Kubernetes Operator? Does Apache Flink CDC work with Amazon Managed Service for Apache Flink? My alternative is integrate Debezium directly, with Apache Kafka, and Spark structred streaming with Paimon. I'd prefer to use Flink.
  • h

    Hristo Yordanov

    12/12/2025, 9:37 PM
    Hi all, Is possible to enable Watermarks when using Flink CDC Pipeline (yaml file). Version 3.5.0 and Flink 1.20? I want to see watermarks metrics on prometheus and watermarks on Flink UI when using pipeline approach.