https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    Jon Slusher

    12/05/2025, 9:18 PM
    👋 I'm trying to troubleshoot a new error I'm getting when testing my PyFlink job using Flink (1.20.3) and the operator in Kubernetes. The job seems to be timing out to my brokers, but as far as I can tell it has the same broker configuration that my Debezium connector has 🧵
    j
    • 2
    • 17
  • r

    Rion Williams

    12/06/2025, 3:59 AM
    Hey all — is there any way to confirm subscriptions to the mailing lists? I haven't received many/any in quite a long time (user will occasionally come through, but never dev or community). I’ve resubmitted the various $list-subscribe@flink.apache.org to all of the lists but never received the expected confirmation response. Any ideas?
    d
    m
    • 3
    • 8
  • g

    George Leonard

    12/07/2025, 10:42 AM
    hi hi all. trying to put a stream together, Shadowtraffic as data generator of the 2 products into 2 Postgres tables 'accountholders' and 'transactions'. Postgres into Flink using CDC -> This is all working, I can select from either table and get results via flink-sql. from my Postgres catalog based table into a Apache Paimon based table on Minio/S3. this is where I'm having some "sluggishness" atm. I'm expecting buckets to be created allot fast, allot more data to end in Paimon/S3... attached is the log file. For now I'm just trying t confirm this all is working before I start with the intended Pyflink "processing" that will consume from the Postgre source and output to the Paimon target. below is my 2 source table creates and the CTAS that pushes data into Paimon.
    Copy code
    # CDC Sources
    CREATE OR REPLACE TABLE postgres_catalog.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,address            STRING
        ,accounts           STRING
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    
    
    CREATE OR REPLACE TABLE postgres_catalog.demog.transactions (
         _id                            BIGINT              NOT NULL
        ,eventid                        VARCHAR(36)         NOT NULL
        ,transactionid                  VARCHAR(36)         NOT NULL
        ,eventtime                      VARCHAR(30)
        ,direction                      VARCHAR(8)
        ,eventtype                      VARCHAR(10)
        ,creationdate                   VARCHAR(20)
        ,accountholdernationalid        VARCHAR(16)
        ,accountholderaccount           STRING
        ,counterpartynationalid         VARCHAR(16)
        ,counterpartyaccount            STRING
        ,tenantid                       VARCHAR(8)
        ,fromid                         VARCHAR(8)
        ,accountagentid                 VARCHAR(8)
        ,fromfibranchid                 VARCHAR(6)
        ,accountnumber                  VARCHAR(16)
        ,toid                           VARCHAR(8)
        ,accountidcode                  VARCHAR(5)
        ,counterpartyagentid            VARCHAR(8)
        ,tofibranchid                   VARCHAR(6)
        ,counterpartynumber             VARCHAR(16)
        ,counterpartyidcode             VARCHAR(5)
        ,amount                         STRING
        ,msgtype                        VARCHAR(6)
        ,settlementclearingsystemcode   VARCHAR(5)
        ,paymentclearingsystemreference VARCHAR(12)
        ,requestexecutiondate           VARCHAR(10)
        ,settlementdate                 VARCHAR(10)
        ,destinationcountry             VARCHAR(30)
        ,localinstrument                VARCHAR(2)
        ,msgstatus                      VARCHAR(12)
        ,paymentmethod                  VARCHAR(4)
        ,settlementmethod               VARCHAR(4)
        ,transactiontype                VARCHAR(2)
        ,verificationresult             VARCHAR(4)
        ,numberoftransactions           INT
        ,schemaversion                  INT
        ,usercode                       VARCHAR(4)
        ,created_at                     TIMESTAMP_LTZ(3)
        ,WATERMARK                      FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'transactions'
        ,'slot.name'                           = 'transactions0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    and the create table as inserts.
    Copy code
    SET 'execution.checkpointing.interval'   = '10s';
    SET 'pipeline.name' = 'Persist into Paimon: accountholders table';
    
    CREATE OR REPLACE TABLE c_paimon.finflow.accountholders WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM postgres_catalog.demog.accountholders;
    
    SET 'pipeline.name' = 'Persist into Paimon: transactions table';
    
    CREATE OR REPLACE TABLE c_paimon.finflow.transactions WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM postgres_catalog.demog.transactions;
    jobmanager_log-2.txt
  • m

    Manish Jain

    12/09/2025, 9:43 AM
    Hi All, We are using Flink 1.20, and the related kafka-connector.
    Copy code
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>3.4.0-1.20</version>
    </dependency>
    This version of kafka client has 2 vulnerabilities. These vulnerabilities are only fixed in v3.9.1 of kafka clients. We are wondering how to handle this? Is it recommended to exclude the kafka-client from the connector and then override it with the new version of kafka-client. We are afraid that it might not be as easy as it sounds. 🙂 Is running the test cases for Flink sufficient to verify that the new version is flink compatible. Or are there other things we'd keep in my while doing this upgrade. PS: In the longer run, we might upgrade to Flink 2.0, but right now that is not on our plans.
    t
    • 2
    • 6
  • j

    Jon Slusher

    12/09/2025, 4:36 PM
    👋 Surfacing this issue in case it got lost in the shuffle. I'm still getting a certificate error on my Flink Kafka source trying to connect to my Kafka cluster using SASL_SSL. • Do I need to combine the certificate I exported from the cluster with another set of certificates? • Does anyone know how to set
    <http://javax.net|javax.net>.debug=ssl
    in the connector so I can see what certificate it's trying to use?
    • 1
    • 1
  • u

    Urs Schoenenberger

    12/10/2025, 2:24 PM
    Hi folks, we are still facing FLINK-38582 (GlobalCommitter stops committing, in this case in DeltaSink, after rescaling). Parts of this seems to have been fixed in FLINK-37747, but the issue persists, albeit rarely and not reproducible. I'm trying to further pinpoint the issue, so I wanted to ask: Are there any known general issues when using the (v1) GlobalCommitter with unaligned checkpoints? Either in the general way GlobalCommitter works, or any common implementation issues that the DeltaSink may have when implementing its GlobalCommitter?
  • a

    Almog Golbar

    12/10/2025, 4:10 PM
    Hi guys, We enabled Flink autoscaler memory tuning but aren't seeing recommendations, even with clear over-provisioning. Versions: • Flink: v1_17 (1.17.1) • Operator: 1.13.0 Configuration:
    Copy code
    job.autoscaler.enabled: "true"
    job.autoscaler.memory.tuning.enabled: "true"  #also tried false for dry run
    job.autoscaler.metrics.window: "5m"
    job.autoscaler.stabilization.interval: "30s"
    job.autoscaler.scaling.enabled: "false"
    What we've done: 1. Removed explicit memory configs (taskmanager.memory.process.size, taskmanager.memory.managed.size, taskmanager.memory.network.max) 2. Set taskManager.resource.memory: 12g 3. Verified metrics collection (6+ data points in ConfigMap autoscaler-<deployment-name>) 4. Confirmed autoscaler is active (ScalingExecutor running, metrics being collected) Current situation: • Memory: 12GB allocated, ~24% heap usage (~1.36GB used) • Metrics: Being collected and stored in ConfigMap • No recommendations: Not appearing in operator logs, Kubernetes events, or FlinkDeployment status/annotations What we've checked: • Operator logs show autoscaler activity but no memory tuning recommendations • No errors or warnings related to memory tuning • Metrics window is filling (5+ minutes of data collected) • Job is processing data (not idle) Has anyone seen memory tuning recommendations with this operator version? Are there additional requirements or known issues? Any guidance on where recommendations should appear or how to troubleshoot?
  • j

    Jon Slusher

    12/10/2025, 7:15 PM
    👋 I'm still trying to troubleshoot a connection from a Table API
    kafka
    source connector (using PyFlink) to our Kafka cluster using
    SASL_SSL
    and
    SCRAM-SHA-512
    . • Can anyone at least confirm that this should be possible? • I have some DEBUG logs to share that to me definitely point to the taskManager being the culprit. I see in the logs the jobManager is definitely connecting successfully to my Kafka brokers because it can see the topic I and partitions want to source. A taskManager is spawned shortly afterwards and that's what seems to file. • I'm using Flink 1.20.3, and the Flink operator in Kubernetes to manage the job • I'll post the configuration and a link to a condensed and redacted Gist of the DEBUG logs I captured for my connector in a thread
    h
    j
    • 3
    • 9
  • t

    Taranpreet Kaur

    12/11/2025, 2:57 AM
    Hi team! I have a use case where I want to run multiple separate Flink job instances that share Kafka topic partitions via Kafka's native consumer group balancing, rather than having each Flink job consume all partitions. Current behavior: Flink's KafkaSource automatically assigns all topic partitions to a single job, managing offsets internally for exactly-once semantics. Desired behavior: Multiple Flink job instances with the same Kafka consumer group ID, where Kafka's consumer group coordinator distributes partitions across instances (e.g., Instance A gets partition 0, Instance B gets partition 1). Questions: 1. Is there a way to configure KafkaSource to use Kafka's consumer group partition assignment instead of Flink's internal assignment? 2. How would this impact parallelism within each Flink job? If I use Kafka's subscribe mechanism, would I need to set parallelism=1 to avoid conflicts between Flink's subtasks and Kafka's partition assignment? 3. How would checkpointing work in this scenario? Would each Flink instance only checkpoint offsets for its assigned partitions? What happens during partition rebalancing when a failed instance's partitions get reassigned to surviving instances? 4. Are there any existing patterns or configurations for this use case? Context: We don't require exactly-once semantics and prefer the operational flexibility of having independent Flink instances that can scale based on Kafka's partition distribution. We're particularly concerned about checkpoint recovery behavior when Kafka rebalances partitions across instances. Any guidance or alternative approaches would be appreciated!
    j
    • 2
    • 1
  • g

    George Leonard

    12/11/2025, 5:57 PM
    hi hi all... bit stuck, trying to push data into a iceberg tables. Need someone to see where I went wrong... missed something the below is the code that created the bucket, I can see the bucket in MinIO: CDC Source table
    Copy code
    # CDC Sources
    CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,accounts           STRING
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    flink catalog create
    Copy code
    CREATE CATALOG c_iceberg WITH (
       'type'='iceberg'
      ,'catalog-type'='rest'
      ,'uri'='<http://polaris:8181/api/catalog>'
      ,'warehouse'='icebergcat'
      ,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
      ,'credential'='root:s3cr3t'
      ,'scope'='PRINCIPAL_ROLE:ALL'
      ,'s3.endpoint'='<http://minio:900>'
      ,'s3.access-key-id'='mnadmin'
      ,'s3.secret-access-key'='mnpassword'
      ,'s3.path-style-access'='true'
    );
    
    
    USE CATALOG c_iceberg;
    -- Create using below
    CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
    contents from .env file driving the compose.yaml ROOT_CLIENT_ID=root ROOT_CLIENT_SECRET=s3cr3t CATALOG_NAME=icebergcat POLARIS_REALM=findept MINIO_ROOT_USER=mnadmin MINIO_ROOT_PASSWORD=mnpassword MINIO_ALIAS=minio MINIO_ENDPOINT=http://minio:9000 MINIO_BUCKET=warehouse AWS_ACCESS_KEY_ID=mnadmin AWS_SECRET_ACCESS_KEY=mnpassword AWS_REGION=za-south-1 AWS_DEFAULT_REGION=za-south-1 polaris-bootsrap service from docker-compose
    Copy code
    polaris-bootstrap:
        image: apache/polaris-admin-tool:latest
        hostname: polaris-bootstrap
        container_name: polaris-bootstrap
        depends_on:
          postgrescat:
            condition: service_healthy    
        environment:
          - POLARIS_PERSISTENCE_TYPE=relational-jdbc
          - QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          - QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
          - QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
          - POLARIS_REALM=${POLARIS_REALM}
          - ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
          - ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
        command:
          - "bootstrap"
          - "--realm=${POLARIS_REALM}"
          - "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
    polaris-setup service from docker-compose.
    Copy code
    polaris-setup:
        image: alpine/curl
        hostname: polaris-setup
        container_name: polaris-setup
        depends_on:
          polaris:
            condition: service_healthy
        environment:
          - CLIENT_ID=${ROOT_CLIENT_ID}
          - CLIENT_SECRET=${ROOT_CLIENT_SECRET}
          - CATALOG_NAME=${CATALOG_NAME}
          - POLARIS_REALM=${POLARIS_REALM}
          - MINIO_BUCKET=${MINIO_BUCKET}
          - MINIO_ENDPOINT=${MINIO_ENDPOINT}
        volumes:
          - ./conf/polaris/:/polaris
        entrypoint: "/bin/sh"
        command:
          - "-c"
          - >-
            chmod +x /polaris/create-catalog.sh;
            chmod +x /polaris/obtain-token.sh;
            source /polaris/obtain-token.sh $$POLARIS_REALM;
            echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
            export STORAGE_CONFIG_INFO='{"storageType":"S3",
              "endpoint":"<http://localhost:9000>",
              "endpointInternal":"'$$MINIO_ENDPOINT'",
              "pathStyleAccess":true}';
            export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
            /polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
            echo Extra grants...;
            curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
              -X PUT \
              <http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
              -d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
            echo Polaris Setup Complete.;
    output from fetch catalogs api call, verifying the catalog and full path
    Copy code
    curl -X GET <http://localhost:8181/api/management/v1/catalogs> \
    >   -H "Authorization: Bearer ${TOKEN}" | jq
      % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                     Dload  Upload   Total   Spent    Left  Speed
    100   388  100   388    0     0  13756      0 --:--:-- --:--:-- --:--:-- 13857
    {
      "catalogs": [
        {
          "type": "INTERNAL",
          "name": "icebergcat",
          "properties": {
            "default-base-location": "<s3://warehouse/iceberg>"
          },
          "createTimestamp": 1765474601004,
          "lastUpdateTimestamp": 1765474601004,
          "entityVersion": 1,
          "storageConfigInfo": {
            "endpoint": "<http://localhost:9000>",
            "endpointInternal": "<http://minio:9000>",
            "pathStyleAccess": true,
            "storageType": "S3",
            "allowedLocations": [
              "<s3://warehouse/iceberg>"
            ]
          }
        }
      ]
    }
    ERROR
    Copy code
    > CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
    >      'file.format'                       = 'parquet'
    >     ,'compaction.min.file-num'           = '2'
    >     ,'compaction.early-max.file-num'     = '50'
    >     ,'snapshot.time-retained'            = '1h'
    >     ,'snapshot.num-retained.min'         = '5'
    >     ,'snapshot.num-retained.max'         = '20'
    >     ,'table.exec.sink.upsert-materialize'= 'NONE'
    > ) AS 
    > SELECT * FROM c_cdcsource.demog.accountholders;
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1164/0x0000000801826a00 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
    docker images
    Copy code
    arm64v8/flink:1.20.2-scala_2.12-java17                      79a54781f5d0        789MB             0B        
    georgelza/apacheflink-base-1.20.2-scala_2.12-java17:1.0.2   dd8585afe66c       6.84GB             0B    U
    Flink container build / Dockerfile, as ... error search is saying version incompatibility...
    Copy code
    FROM arm64v8/flink:1.20.2-scala_2.12-java17
    SHELL ["/bin/bash", "-c"]
    
    ENV FLINK_HOME=/opt/flink
    ENV HIVE_HOME=$FLINK_HOME/conf/
    WORKDIR $FLINK_HOME
    
    
    RUN echo "--> Setup Directory Structure" && \
        mkdir -p /opt/flink/conf/ && \
        mkdir -p /opt/flink/checkpoints && \
        mkdir -p /opt/flink/rocksdb 
    
    RUN echo "--> Install JARs: Flink's S3 plugin" && \
        mkdir -p ./plugins/s3-fs-hadoop && \
        mv ./opt/flink-s3-fs-hadoop-1.20.2.jar ./plugins/s3-fs-hadoop/
    
    RUN echo "--> Install Flink JARs: => Generic"
    COPY stage/bundle-2.31.9.jar                            /opt/flink/lib/bundle-2.31.9.jar  
    COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar    /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    
    COPY stage/postgresql-42.7.6.jar                        /opt/flink/lib/postgresql-42.7.6.jar
    # CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
    COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar   /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
    COPY stage/flink-sql-json-1.20.2.jar                    /opt/flink/lib/flink-sql-json-1.20.2.jar     
    COPY stage/flink-sql-parquet-1.20.2.jar                 /opt/flink/lib/flink-sql-parquet-1.20.2.jar
    COPY stage/flink-json-1.20.2.jar                        /opt/flink/lib/flink-json-1.20.2.jar
    COPY stage/flink-connector-jdbc-3.3.0-1.20.jar          /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
    
    RUN echo "--> Install Flink JARs: => Open Table Formats"
    COPY stage/iceberg-flink-1.20-1.9.1.jar                 /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
    COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar         /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
    COPY stage/iceberg-aws-bundle-1.9.1.jar                 /opt/flink/iceberg-aws-bundle-1.9.1.jar
    
        
    RUN echo "--> Set Ownerships of /opt/flink" && \
        chown -R flink:flink $FLINK_HOME 
    
    USER flink:flink
    
    CMD ./bin/start-cluster.sh && sleep infinity
  • g

    George Leonard

    12/11/2025, 6:33 PM
    interesting errors in the docker compose logs wrt namespace
    err.txt
  • g

    George Leonard

    12/12/2025, 9:55 AM
    hi hi all. Need help pls. Downgraded to 1.20.1, Iceberg 1.9.1 hoping my problems was version incompatibility, as all searches pointed at that... below is my dockerfile used to build the flink container being used. I've now added the jackson jar, as previous errors pointed/complaint about that being missing.
    Copy code
    FROM arm64v8/flink:1.20.1-scala_2.12-java17
    SHELL ["/bin/bash", "-c"]
    ENV FLINK_HOME=/opt/flink
    ENV HIVE_HOME=$FLINK_HOME/conf/
    WORKDIR $FLINK_HOME
    
    
    RUN echo "--> Setup Directory Structure" && \
        mkdir -p /opt/flink/conf/ && \
        mkdir -p /opt/flink/checkpoints && \
        mkdir -p /opt/flink/rocksdb 
    
    RUN echo "--> Install JARs: Flink's S3 plugin" && \
        mkdir -p ./plugins/s3-fs-hadoop && \
        mv ./opt/flink-s3-fs-hadoop-1.20.1.jar ./plugins/s3-fs-hadoop/
    
    RUN echo "--> Install JARs: => Generic"
    COPY stage/bundle-2.31.9.jar                            /opt/flink/lib/bundle-2.31.9.jar  
    COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar    /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
    COPY stage/jackson-core-2.20.1.jar                      /opt/flink/lib/jackson-core-2.20.1.jar
    COPY stage/postgresql-42.7.6.jar                        /opt/flink/lib/postgresql-42.7.6.jar
    # CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
    COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar   /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
    
    # Flink version specific
    RUN echo "--> Install JARs: => Flink 1.20.1"
    COPY stage/flink-sql-json-1.20.1.jar                    /opt/flink/lib/flink-sql-json-1.20.1.jar     
    COPY stage/flink-sql-parquet-1.20.1.jar                 /opt/flink/lib/flink-sql-parquet-1.20.1.jar
    COPY stage/flink-json-1.20.1.jar                        /opt/flink/lib/flink-json-1.20.1.jar
    COPY stage/flink-connector-jdbc-3.3.0-1.20.jar          /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
    
    RUN echo "--> Install JARs: => Iceberg 1.9.1"
    COPY stage/iceberg-flink-1.20-1.9.1.jar                 /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
    COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar         /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
    COPY stage/iceberg-aws-bundle-1.9.1.jar                 /opt/flink/iceberg-aws-bundle-1.9.1.jar
    
    RUN echo "--> Set Ownerships of /opt/flink" && \
        chown -R flink:flink $FLINK_HOME 
    
    USER flink:flink
    
    CMD ./bin/start-cluster.sh && sleep infinity
    below is the polaris bits from my docker-compose file, included as this creates the polaris catalog, uses the previously created minio warehouse/iceberg bucket
    Copy code
    polaris:
        image: apache/polaris:latest
        hostname: polaris
        container_name: polaris
        depends_on:
          postgrescat:
            condition: service_healthy
          minio:
            condition: service_healthy
          polaris-bootstrap:
            condition: service_completed_successfully
    
        ports:
          # API port
          - "8181:8181"
          # Management port (metrics and health checks)
          - "8182:8182"
          # Optional, allows attaching a debugger to the Polaris JVM
          - "5005:5005"
        environment:
    
          POLARIS_PERSISTENCE_TYPE: relational-jdbc
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES: 5
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS: 100
          POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DURATION_IN_MS: 5000
          QUARKUS_DATASOURCE_JDBC_URL: jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          QUARKUS_DATASOURCE_USERNAME: ${POSTGRES_CAT_USER}
          QUARKUS_DATASOURCE_PASSWORD: ${POSTGRES_CAT_PASSWORD}
          POLARIS_REALM_CONTEXT_REALMS: ${POLARIS_REALM}
          QUARKUS_OTEL_SDK_DISABLED: true
          polaris.features."ALLOW_INSECURE_STORAGE_TYPES": "true"
          polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES": "[\"FILE\",\"S3\"]"
          polaris.readiness.ignore-severe-issues: "true"
          
          # S3 endpoint for MinIO
          AWS_REGION: ${AWS_REGION}
          AWS_ACCESS_KEY_ID: ${MINIO_ROOT_USER}
          AWS_SECRET_ACCESS_KEY: ${MINIO_ROOT_PASSWORD}
          POLARIS_BOOTSTRAP_CREDENTIALS: ${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}
          polaris.realm-context.realms: ${POLARIS_REALM}
          quarkus.otel.sdk.disabled: "true"
    
        volumes:
          - ./data/polaris:/var/lib/polaris
          - ./conf/polaris:/polaris 
        healthcheck:
          test: ["CMD", "curl", "<http://localhost:8182/q/health>"]
          interval: 5s
          timeout: 10s
          retries: 5
          start_period: 30s
        restart: unless-stopped
    
      polaris-bootstrap:
        image: apache/polaris-admin-tool:latest
        hostname: polaris-bootstrap
        container_name: polaris-bootstrap
        depends_on:
          postgrescat:
            condition: service_healthy    
        environment:
          - POLARIS_PERSISTENCE_TYPE=relational-jdbc
          - QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
          - QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
          - QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
          - POLARIS_REALM=${POLARIS_REALM}
          - ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
          - ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
        command:
          - "bootstrap"
          - "--realm=${POLARIS_REALM}"
          - "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
    
      polaris-setup:
        image: alpine/curl
        hostname: polaris-setup
        container_name: polaris-setup
        depends_on:
          polaris:
            condition: service_healthy
        environment:
          - CLIENT_ID=${ROOT_CLIENT_ID}
          - CLIENT_SECRET=${ROOT_CLIENT_SECRET}
          - CATALOG_NAME=${CATALOG_NAME}
          - POLARIS_REALM=${POLARIS_REALM}
          - MINIO_BUCKET=${MINIO_BUCKET}
          - MINIO_ENDPOINT=${MINIO_ENDPOINT}
        volumes:
          - ./conf/polaris/:/polaris
        entrypoint: "/bin/sh"
        command:
          - "-c"
          - >-
            chmod +x /polaris/create-catalog.sh;
            chmod +x /polaris/obtain-token.sh;
            source /polaris/obtain-token.sh $$POLARIS_REALM;
            echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
            export STORAGE_CONFIG_INFO='{"storageType":"S3",
              "endpoint":"<http://localhost:9000>",
              "endpointInternal":"'$$MINIO_ENDPOINT'",
              "pathStyleAccess":true}';
            export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
            /polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
            echo Extra grants...;
            curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
              -X PUT \
              <http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
              -d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
            echo Polaris Setup Complete.;
    c_iceberg catalog create
    Copy code
    CREATE CATALOG c_iceberg WITH (
       'type'='iceberg'
      ,'catalog-type'='rest'
      ,'uri'='<http://polaris:8181/api/catalog>'
      ,'warehouse'='icebergcat'
      ,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
      ,'credential'='root:s3cr3t'
      ,'scope'='PRINCIPAL_ROLE:ALL'
      ,'s3.endpoint'='<http://minio:900>'
      ,'s3.access-key-id'='mnadmin'
      ,'s3.secret-access-key'='mnpassword'
      ,'s3.path-style-access'='true'
    );
    
    
    USE CATALOG c_iceberg;
    -- Create using below
    CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
    flink sql being executed
    Copy code
    CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
         _id                BIGINT                  NOT NULL
        ,nationalid         VARCHAR(16)             NOT NULL
        ,firstname          VARCHAR(100)
        ,lastname           VARCHAR(100)
        ,dob                VARCHAR(10) 
        ,gender             VARCHAR(10)
        ,children           INT
        ,emailaddress       VARCHAR(100)
        ,mobilephonenumber  VARCHAR(20)
        ,accounts           STRING
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK          FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (_id) NOT ENFORCED
    ) WITH (
         'connector'                           = 'postgres-cdc'
        ,'hostname'                            = 'postgrescdc'
        ,'port'                                = '5432'
        ,'username'                            = 'dbadmin'
        ,'password'                            = 'dbpassword'
        ,'database-name'                       = 'demog'
        ,'schema-name'                         = 'public'
        ,'table-name'                          = 'accountholders'
        ,'slot.name'                           = 'accountholders0'
        -- experimental feature: incremental snapshot (default off)
        ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
        ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>     ,'decoding.plugin.name'                = 'pgoutput'
        ,'decoding.plugin.name'                = 'pgoutput'
    );
    
    SET 'execution.checkpointing.interval'   = '60s';
    SET 'table.exec.sink.upsert-materialize' = 'NONE';
    SET 'pipeline.name' = 'Persist into Iceberg: accountholders table';
    CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    SELECT * FROM c_cdcsource.demog.accountholders;
    The above resulted in the below
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: finflow
    After I got the above namespace problem I executed the following
    Copy code
    curl -X POST <http://localhost:8181/api/catalog/v1/icebergcat/namespaces> \
        -H "Authorization: Bearer ${TOKEN}" \
        -H 'Content-Type: application/json' \
        -d '{"namespace": ["finflow"], "properties": {"description": "Iceberg catalog database"}}' | jq
    I then reran my above CTAS errors below that
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
    
    Flink SQL>
  • g

    George Leonard

    12/12/2025, 10:04 AM
    NOTE I can see my output table in MinIO, there is a metadata.json created. I can also select from my source table. Flink SQL> select * from c_cdcsource.demog.accountholders; [INFO] Result retrieval cancelled. Flink SQL> drop table c_iceberg.finflow.accountholders; [ERROR] Could not execute SQL statement. Reason: java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson. Flink SQL> use catalog c_iceberg; [INFO] Execute statement succeeded. Flink SQL> use finflow; [INFO] Execute statement succeeded. Flink SQL> show tables; +----------------+ | table name | +----------------+ | accountholders | +----------------+ 1 row in set Flink SQL> drop table accountholders; [ERROR] Could not execute SQL statement. Reason: java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
  • g

    George Leonard

    12/12/2025, 10:08 AM
    figured i will pre create the table and then do a insert into <> select * from <>... but well can't even drop the table;
  • f

    Francisco Morillo

    12/12/2025, 10:26 AM
    Hi everyone! While using Flink SQL there are many aggregations such as group bys, joins that turn the streams into an Upsert Stream, and there are many connectors that do not allow upsert stream, one example is Kinesis. Is there a way to disable the stream to become an upsert stream, and simply become an append only stream? We have made a workaround by doing 1 second windowing to turn it into append only, but adds unnecessary latency
    d
    • 2
    • 2
  • a

    Anooj S

    12/12/2025, 1:31 PM
    Hello everyone, We are running a Flink session cluster in standalone mode on Kubernetes, and we deploy multiple TaskManager deployments under the same session cluster, each with different CPU/memory configurations. This allows us to support both lightweight and CPU-heavy pipelines in the same Flink session. Example setup: • TM type A → low-spec (1 CPU, 4GB) • TM type B → high-spec (4 CPU, 16GB) • TM type C → mid-spec (2 CPU, 8GB) All TaskManagers register with the same JobManager. Our problem: When we submit a job, we cannot control which TaskManager type the job will use. Flink may place CPU-heavy tasks on low-spec TMs because we cannot specify: • TaskManager affinity • Slot affinity • TM selection rules • Resource-class filtering (beyond resource profiles) We previously used Native Kubernetes mode via the Flink K8s Operator, but it doesn’t allow different TM resource specs within the same session cluster. Switching to standalone mode gave us flexibility to run multiple TM deployments, but we still cannot control which TM the JobManager assigns tasks to. We found a similar question from 2021:https://stackoverflow.com/questions/68531175/task-manager-affinity-in-flink Any guidance would be appreciated. Thanks!
    a
    • 2
    • 1
  • g

    George Leonard

    12/12/2025, 2:48 PM
    RE My ERROR above. I implemented a JDBC catalog and created Paimon based tables onto the same MinIO stack and thats working... so Flink itself is working. this is somewhere in the polaris integration
    a
    • 2
    • 3
  • m

    Marco Villalobos

    12/12/2025, 7:05 PM
    Hi everybody. I want to consider using Apache Flink CDC that has Mysql as a Source and Paimon as a Sink. I wanted to deploy this on the Kubernetes Operator, but I noticed that it requires the Mini Cluster. I prefer not to use Mini Cluster for production. How do I change that? My next option is Amazon Managed Service for Apache Flink, but I don't know if people have successfully integrated that with Apache Flink CDC. Is there a way to change Apache Flink CDC to NOT use MiniCluster in the Kubernetes Operator? Does Apache Flink CDC work with Amazon Managed Service for Apache Flink? My alternative is integrate Debezium directly, with Apache Kafka, and Spark structred streaming with Paimon. I'd prefer to use Flink.
    a
    • 2
    • 11
  • h

    Hristo Yordanov

    12/12/2025, 9:37 PM
    Hi all, Is possible to enable Watermarks when using Flink CDC Pipeline (yaml file). Version 3.5.0 and Flink 1.20? I want to see watermarks metrics on prometheus and watermarks on Flink UI when using pipeline approach.
  • b

    Brad Murry

    12/15/2025, 3:45 PM
    Howdy! I'm looking to traverse state using the state processor API, however my flink jobs are often a mix of datastream and Table/SQL APIs. I could potentially manually register state descriptors for a job on the datastream portions, follow best practices such as consistent UUIDs on operators, etc.. but for Table/SQL, since that is all generated via the planner, I'm not sure if there is a path forward. Does a methodology exist for interrogating a job graph and building a mapping of state descriptors, etc...?
    a
    a
    d
    • 4
    • 11
  • r

    Royston

    12/16/2025, 12:36 PM
    Hi Was wondering if there is a tool or a combination of tools which anybody has used to act as a coordinator for jobs submitted in session mode Looking for mainly one thing 1. Scale up and scale down taskmanager pods based on some kind of queue of jobs 2. Additionally management of jobs in case of failure of job manager Afaik operator automatically does this part but wanted to confirm Thanks
    j
    • 2
    • 5
  • t

    Tiago Pereira

    12/16/2025, 1:48 PM
    HI @Royston
  • t

    Tiago Pereira

    12/16/2025, 1:48 PM
    i´m not a member of flink development but what i know is that if you are using kubernetes to deploy your cluster
    r
    • 2
    • 1
  • t

    Tiago Pereira

    12/16/2025, 1:49 PM
    changing the paralellism and number of instances you can achieve that. having multiple task managers
  • t

    Tiago Pereira

    12/16/2025, 1:50 PM
    image.png
  • t

    Tiago Pereira

    12/16/2025, 1:50 PM
    image.png
  • t

    Tiago Pereira

    12/16/2025, 1:52 PM
    if i understood your question... but one thing that i´m not understanding, what are your queue jobs?
    r
    • 2
    • 8
  • j

    Jon Slusher

    12/18/2025, 5:47 PM
    👋 I'm trying to make jobs in my FlinkDeployment stateful using the information here in the documentation. I'm running Flink using the operator in Kubernetes (EKS) and my first idea was to use an EBS-backed PVC. It seems that because the jobManager and taskManager run on different pods I would have to configure the PVC as
    ReadWriteMany
    , which is not supported for EBS volumes. Can anyone point me in the right direction? I see that EFS might be an option, but I'm curious what the recommended options are for configuring savepoint/checkpoint and ha volumes for Flink jobManagers in EKS. I'll put my current configuration in a thread
    • 1
    • 3
  • b

    Bruno Cancelinha

    12/18/2025, 6:28 PM
    Hello guys 👋. I’m a software developer and we are developing an alert system using Flink SQL. We want our users to be able to define alerts for specific metrics. In this particular case, we are developing an alert system for the number of users logged in on a particular account. We have two Postgres databases. One defines the alerts (
    alert_definitions
    ) and the other one represents all users’ status (
    user_status
    ). Because these are continuously updating tables, I used
    postgres-cdc
    connector like so:
    Copy code
    CREATE TABLE user_status_binlog (
      id STRING NOT NULL,
      user_id STRING,
      account_id STRING,
      status STRING,
      deleted BOOLEAN,
      inactive BOOLEAN,
      queue_name STRING,
      team_id STRING,
      updated_at TIMESTAMP(3),
      PRIMARY KEY (id) NOT ENFORCED,
      WATERMARK FOR updated_at AS updated_at
    ) WITH (
     'connector' = 'postgres-cdc',
     'hostname' = 'postgresql',
     'port' = '5432',
     'username' = 'admin',
     'password' = 'admin',
     'schema-name' = 'public',
     'database-name' = 'user-status',
     'table-name' = 'live_user_status',
     'slot.name' = 'flink',
     'debezium.plugin.name' = 'pgoutput',
     'scan.startup.mode' = 'initial',
     'changelog-mode' = 'upsert'
    );
    
    
    CREATE TABLE alert_definitions(
      alert_id STRING NOT NULL,
      metric_id STRING,
      account_id STRING,
      filters STRING,
      min_value NUMERIC,
      max_value NUMERIC,
      filter_status AS JSON_QUERY(filters, '$.status' RETURNING ARRAY<STRING>),
      filter_ring_groups AS JSON_QUERY(filters, '$.ring_groups' RETURNING ARRAY<STRING>),
      filter_team_ids AS JSON_QUERY(filters, '$.team_ids' RETURNING ARRAY<STRING>),
      created_at TIMESTAMP(6),
      PRIMARY KEY (alert_id) NOT ENFORCED
    ) WITH (
     'connector' = 'postgres-cdc',
     'hostname' = 'postgresql',
     'port' = '5432',
     'username' = 'admin',
     'password' = 'admin',
     'schema-name' = 'public',
     'database-name' = 'live_sentinel_db',
     'table-name' = 'alert_definition',
     'slot.name' = 'alert_definition',
     'debezium.plugin.name' = 'pgoutput',
     'scan.startup.mode' = 'initial',
     'changelog-mode' = 'upsert'
    );
    Every time a new user changes their status or a new alert is created, I want the query to run and send the results to a kafka topic. Because my query is an updating table, I created a Kafka sink with connector
    upsert-kafka
    (although the behaviour I’m looking for is more akin to an appending-only table). My query, which counts the number of users logged in, is as follows:
    Copy code
    CREATE TEMPORARY VIEW alert_user_counts AS
    SELECT
        alert_id,
        account_id,
        active_user_count,
        min_value,
        max_value
    FROM 
        (
            SELECT alert_id, account_id, min_value, max_value, filter_ring_groups, filter_status, filter_team_ids 
            FROM alert_definitions 
            WHERE account_id IS NOT NULL
            AND metric_id = 'count-users-logged-in'
        ) alerts,
        LATERAL (
            SELECT
                COUNT(DISTINCT user_id) AS active_user_count
            FROM user_status_binlog AS us
            WHERE
                us.account_id = alerts.account_id
                AND us.status NOT IN ('offline', 'hidden')
                AND us.deleted = FALSE
                AND us.inactive = FALSE
                AND (COALESCE(CARDINALITY(alerts.filter_status), 0) = 0 OR ARRAY_CONTAINS(alerts.filter_status, us.status))
        );
    
    INSERT INTO notification_sink
    SELECT 
        counts.account_id,
        counts.alert_id,
        counts.active_user_count as `value`,
        CASE 
            WHEN (counts.min_value IS NULL OR counts.active_user_count >= counts.min_value) 
             AND (counts.max_value IS NULL OR counts.active_user_count <= counts.max_value) 
            THEN 'VIOLATION'
            ELSE 'NOMINAL'
        END AS `type`,
        CURRENT_TIMESTAMP as event_timestamp
    FROM alert_user_counts counts
    Although the idea behind this job seems pretty simple, I keep finding issues with my implementation. I needed to use a LATERAL JOIN because it was the only way not to get multiple values for the same alert. Currently it’s working fine except when the
    alert_definition
    table is updated. If a new alert is created, then a lot of messages are sent to kafka. For example: say the account has 3 users logged in. If a new alert is created for that account, we get 3 messages on the kafka topic, like so:
    Copy code
    {"account_id":"account-id","alert_id":"10","value":1,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.991"}
    {"account_id":"account-id","alert_id":"10","value":2,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.992"}
    {"account_id":"account-id","alert_id":"10","value":3,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.993"}
    I don’t really understand this ‘counting’ behaviour. It’s as if the user_status table was an appending only table and this new alert is joining with multiple past versions. Can someone please explain to me why this behaviour is happening? P.S. You might realize that this implementation doesn’t really work when no users are logged in (In which case it should return a 0 but Flink SQL doesn’t return any lines). I’m aware of that, but right now it’s not where I’m focusing on.
  • u

    Utkarsh Vashishtha

    12/18/2025, 9:02 PM
    Hi, would appreciate some help on this: With an upgrade to using Flink 2.0 and iceberg 1.10, we noticed that the Iceberg Files committer stream and the writer stream were chained together in the same operator (vertex node). This was not the case with versions <= Flink 1.20 where the committer (with max parallelism 1) and writer streams were different chains (occupied different flink vertices). Is this expected? If it is, then functionality (like autoscaling) that was dependent on per-vertex metric consumption breaks. If there is a mistake in my understanding, please correct it.