https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • f

    FAS

    10/22/2025, 4:45 AM
    Hi Everyone, I'm encountering an issue with a cross-account Flink SQL setup and would appreciate some clarification on how
    assume-role
    is handled by the Iceberg catalog. Scenario 1. Account #1: Runs a Flink (1.19.1) job on an EKS cluster. 2. Account #2: Hosts Iceberg tables in an S3 bucket (
    <s3://account-2-bucket-iceberg/dbstore1/>
    ) and manages metadata using the AWS Glue Catalog (
    awsAccount2Id
    ). 3. Permissions: ◦ The Flink EKS pod in Account #1 has a Service Account configured with OIDC. ◦ This Service Account assumes a cross-account role (
    arn:aws:iam::awsAccount2Id:role/cross-account-role
    ) in Account #2. 4. Verification: ◦ I have `exec`'d into the running Flink pod. ◦ From the pod, I can successfully use the AWS CLI to assume the cross-account role. ◦ After assuming the role, I can successfully list the Glue databases and tables in Account #2. ◦ This confirms the underlying EKS OIDC, IAM roles, and network access are all correctly configured. The Challenge In my Flink job, I first define the catalog for Account #2. 1. Create Catalog (Success) This SQL statement executes successfully, and the Flink logs confirm it:
    2025-10-22 03:57:00,929 INFO ... - SQL statement executed successfully. sql=CREATE CATALOG \awsAccount2Id
    ...`` SQL
    Copy code
    CREATE CATALOG `awsAccount2Id`
    WITH (
      'type' = 'iceberg',
      'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
      'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
      'warehouse' = '<s3://account-2-bucket-iceberg/dbstore1/>',
      'client.assume-role.arn' = 'arn:aws:iam::awsAccount2Id:role/cross-account-role',
      'glue.catalog-id' = 'awsAccount2Id',
      'client.region' = 'us-east-1',
      'client.credentials-provider' = 'software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider'
    );
    2. Select from Catalog (Failure) Immediately after the catalog is created, my Flink job executes the following
    SELECT
    query: SQL
    Copy code
    SELECT 
    ....
    
    FROM `awsAccount2Id`.`dbstore1`.table1
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='30s') */;
    This query fails with a validation error:
    Copy code
    2025-10-22 03:57:06,710 ERROR ... - Failed to execute SQL statement:
    SELECT ...
    FROM `awsAccount2Id`.`dbstore1`.table1 ...
    ;
    org.apache.flink.table.api.ValidationException: SQL validation failed. From line 11, column 6 to line 11, column 59: Object 'dbstore1' not found within 'awsAccount2Id'
    I also noticed that when Flink logs the list of available databases, it only shows databases from Account #1, not the cross-account ones from Account #2. My Question My expectation was that by defining
    client.assume-role.arn
    and
    glue.catalog-id
    in the
    CREATE CATALOG
    statement, any subsequent Flink SQL operations referencing the
    awsAccount2Id
    catalog (like my
    SELECT
    query) would automatically use those settings to assume the role and query the Glue catalog in Account #2. Why is Flink reporting that the database
    dbstore1
    is "not found," even though the catalog was created successfully and configured to assume a role that can see that database? i can see tables from this database when i manually assume-role using aws-cli from that pod. It seems the
    SELECT
    query is not honoring the catalog's
    assume-role
    configuration and is somehow still querying the default Glue catalog in Account #1. Is this expected, or am I missing a configuration step for Flink to correctly use the assumed role for metadata discovery after the catalog is created?
  • j

    Jaya Ananthram

    10/22/2025, 9:10 AM
    Hello 👋 Question about the Flink AsyncIO. In my DAG. I have two Flink AsyncIO operators (say Op1 & Op2), and both are using unordered wait, and by default, Flink chains them together. In this case, whether Flink will guarantee to invoke Op2 after Op1 completion/failure for an event (E1), or can we expect Op1 and Op2 to trigger at the same time for event E1?
    p
    • 2
    • 1
  • u

    מייקי בר יעקב

    10/22/2025, 11:18 PM
    I am trying to deploy flink 2.1 without operator on application mode, My main class is org.example.Main but i get this error: ClassNotFoundError for another class in the project
  • e

    Elad

    10/23/2025, 8:43 AM
    Hello 😄 I want to configure the flink cluster logging with log4j2.xml configuration, but in the documentation there is no mention of usage or support of this file, and only the legacy log4j properties files. I even got an error when trying to configure log4j2.xml file on the flink deployment because it’s not supported - even though flink does use log4j2 under the hood. Is there any documentation I’m missing about configuring flink with log4j2.xml? Or if there is no current support for that - is there any expected date / release for that?
  • e

    Eric Huang

    10/23/2025, 1:36 PM
    Hello everyone! I am using Flink 1.16 with CEP, and I got this NullPointerException. The job just made a full restart from an inner exception, and around 10 minutes later, the NullPointerException appeared. Appreciate it very much if anyone can help.
    Copy code
    2025-10-23 14:05:32,327 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (21/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_20_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:05:32,327 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (90/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_89_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:05:32,327 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (59/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_58_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:05:32,327 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (17/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_16_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:05:32,400 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - DelayableMessageProcess -> Sink: xiaoxiang_reach_system (1/1) (dcdc4daa8ced8ca9d2b8fc6c58e26129_0a53a086337bb3f8a33ad689643a92fc_0_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:10:28,717 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 191 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1761199828716 for job ffffffffbd30e5570000000000000001.
    2025-10-23 14:10:30,023 INFO  org.apache.flink.runtime.state.SharedStateRegistryImpl       [] - state self-sustained:true, lastCompletedCheckpoint:191, earliestDependent:9223372036854775807, highestNotClaimedCheckpointID:-1
    2025-10-23 14:10:30,023 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - job ffffffffbd30e5570000000000000001 checkpoint 191 completed, job is state-sustained
    2025-10-23 14:10:30,207 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 191 for job ffffffffbd30e5570000000000000001 (280739389 bytes, checkpointDuration=1425 ms, finalizationTime=66 ms).
    2025-10-23 14:15:28,717 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 192 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1761200128716 for job ffffffffbd30e5570000000000000001.
    2025-10-23 14:15:29,030 INFO  org.apache.flink.runtime.state.SharedStateRegistryImpl       [] - state self-sustained:true, lastCompletedCheckpoint:192, earliestDependent:9223372036854775807, highestNotClaimedCheckpointID:-1
    2025-10-23 14:15:29,030 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - job ffffffffbd30e5570000000000000001 checkpoint 192 completed, job is state-sustained
    2025-10-23 14:15:29,096 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 192 for job ffffffffbd30e5570000000000000001 (317081932 bytes, checkpointDuration=335 ms, finalizationTime=45 ms).
    2025-10-23 14:16:37,533 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (12/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_11_1) switched from RUNNING to FAILED on session-2123414-1761142270-taskmanager-1-10 @ hldy-data-k8s-flink-ssd-node03895.mt (dataPort=23347).
    java.lang.NullPointerException: null
    	at org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor.materializeMatch(SharedBufferAccessor.java:213) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.cep.nfa.NFA.processMatchesAccordingToSkipStrategy(NFA.java:474) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:337) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.cep.operator.CepOperator.advanceTime(CepOperator.java:429) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.cep.operator.CepOperator.onEventTime(CepOperator.java:325) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:599) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:552) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:843) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:792) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:969) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:948) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) ~[flink-dist-1.16.1.jar:1.16.1]
    	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
    2025-10-23 14:16:37,534 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.TaskManagerRestartStrategy [] - Received failure event: TaskFailureEvent{taskManagerId=session-2123414-1761142270-taskmanager-1-10, timestamp=1761200197533, cause=NullPointerException: null}, excluded: false
    2025-10-23 14:16:37,534 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.TaskManagerRestartStrategy [] - Resetting restart strategy state due to stable running period
    2025-10-23 14:16:37,536 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.ContinuousRestartLimitation [] - Earliest failure timestamp: 1761199499926, max continuous restart duration: 28800000 ms
    2025-10-23 14:16:37,536 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 423 tasks will be restarted to recover the failed task dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_11_1.
    2025-10-23 14:16:37,536 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job rt_scene_51697_staging (ffffffffbd30e5570000000000000001) switched from state RUNNING to RESTARTING.
  • m

    Mrugesh Kadia

    10/24/2025, 9:48 AM
    Hi folks, I’m currently working on a POC using Flink for transforming Kafka events. Our setup involves consuming events from Kafka, applying transformations, and producing the results back to Kafka. Since we have multiple event types with varying structures, I’m evaluating which approach would be best — DataStream API, Table API, or SQL. At the moment, I’m exploring Flink SQL because it’s easier for end developers and faster to integrate. For each event type, I’ve created a Kafka source table and defined a corresponding view to apply transformations before inserting the results into separate Kafka topics.For more complex transformations, we plan to implement custom UDFs within Flink SQL. Question: Given this setup, which approach would be most suitable — DataStream API, Table API, or SQL — especially when dealing with multiple event types and varying transformation complexity?
  • m

    Mohsen Rezaei

    10/27/2025, 11:12 PM
    Hey everyone! I was playing around with the ForSt (disaggregated) state backend, and ran into an issue with the async (incremental) state and a simple
    GROUP BY
    clause. I filed an issue for this to show some details on what's going on here, but I was curious if anyone else has run into this since it's a very basic test against Flink 2.1? Running that scenario in a sync state works fine, but is not going to be ideal for more complex scenarios
  • a

    Arman shakeri

    10/28/2025, 10:09 AM
    Hi guys, I'm working on a statefull pipeline and I have some issues. I want to read three stream from kafka topics and join them for creating a flatten table but kafka topics fill with CDC data so all records may have updates and I want to have last version. I tried use interval join but I can not keep events for long time(huge states) is there better solution for this scenario? I was thinking about store last state of records in external cach db like redis. could you please share your ideas?
    p
    • 2
    • 1
  • t

    Tiago Pereira

    10/28/2025, 12:11 PM
    hey guys, in flink we dont have a callback when a map, filter or other function gives error. its possible to introduce that kind of feature in flink and pyflink in a future release?
    p
    d
    • 3
    • 14
  • m

    Manish Jain

    10/28/2025, 1:49 PM
    Hi Team, We are using Flink K8s operator to deploy flink in our azure k8s environment. We have some secrets that we use in the flink jobs, and we are using a azure-keyvault-secrets-mounter to mount this secret as an environment variable in Flink Job Manager and Task manager. In our setup, we change the secret after every x days. And once the secret is changed, the azure secret mounter identifies this change and applies it to the k8s cluster. The challenge that we are facing is that even though the secret has changed, the pods of job manager and task manager do not restart. This works seamlessly for all other pods in our system. We are using
    stakater/reloader:v1.0.29
    to reload the pods when a config changes. But the annotations that are working with other pods, are not working with Flink components. Is anyone using a similar setup and has run into such a problem? We don't want to create a custom solution for job restarts, and manual restarts, are not optimal.
    p
    • 2
    • 3
  • f

    Francisco Morillo

    10/28/2025, 7:29 PM
    Is there any way to migrate a flink 1.20 state that used collections to flink 2.1?
  • n

    Noufal Rijal

    10/29/2025, 5:59 AM
    Hi Team, I have been recently trying out the flink-k8s-operator for session mode for the auto-scaler feature. Based on the testing that we have done till date its found that - 1. We wont be able to submit an entirely different docker image to the existing cluster ( which runs on a base image) a. its because the kind FlinkSessionJob does not support any field or arg for passing the images 2. Now we were checking the possibility of passing a zipped virtual env, while submitting the job a. For this we were focusing on the argument -pyarch and were passing the blob path to the zipped env file b. The problem that we are facing here is - the flink system is not capturing the zipped env and the package that we have supplied within the new venv via the zip is said to not found
    Copy code
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: flink-session
      namespace: flink-test
    spec:
      image: pyflink-session-test:v1.0
      flinkVersion: v1_20
      imagePullPolicy: Always
      serviceAccount: flink
      mode: standalone
      
      flinkConfiguration:
        # All Flink runtime config keys go here
        fs.allowed-fallback-filesystems: "file"
        io.tmp.dirs: "/tmp"
        taskmanager.numberOfTaskSlots: "4"
        
        # ===== OPERATOR AUTOSCALER =====
        kubernetes.operator.job.autoscaler.enabled: "true"
        kubernetes.operator.job.autoscaler.target.utilization: "0.7"
        kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
        kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
        kubernetes.operator.job.autoscaler.metrics.window: "5m"
        kubernetes.operator.job.autoscaler.scale-up.grace-period: "1m"
        kubernetes.operator.job.autoscaler.scale-down.grace-period: "5m"
        
      # # 💡 MOVED: jobManager must be a direct child of 'spec'
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        replicas: 2
        resource:
          # memory: "10240m"
          memory: "2048m"
          cpu: 2
    Copy code
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkSessionJob
    metadata:
      name: ppe-kafka-streaming
      namespace: flink-test
    spec:
      deploymentName: flink-session
      
      job:
    
        jarURI: /opt/flink/opt/flink-python-1.20.3.jar
        # entryClass: org.apache.flink.client.python.PythonDriver
        # args:
        #   # 1. Main PyFlink Script
        #   - "--python"  # Changed from -py
        #   - "/opt/flink/usrlib/streaming_test.py"
          
        #   # 2. Python Archives
        #   - "--pyArchives"  # Changed from -pyarch
        #   - "blob_path#flink_venv"
          
        #   # 3. Python Executable
        #   - "-pyexec"  # This is correct!
        #   - "flink_venv/bin/python" 
    
        args:
        - "-py"
        - "/opt/flink/usrlib/streaming_test.py"
        - "-pyarch"
        - "blob_path#flink_venv"
        - "-pyexec"
        - "flink_venv/bin/python"
        
        parallelism: 2
    Request for your help if you have faced and tackled a similar issue. #C065944F9M2 #C03G7LJTS2G #C03GV7L3G2C
  • s

    Saketh

    10/29/2025, 7:09 AM
    Hi all, I’m facing a serious state/memory scalability issue with my Flink enrichment pipeline. Context: Input: Streaming audit events, 90 lakh (9 million) unique SIDs per day, ~30 stages per SID, data arrives out of order. Goal: Enrich each event with certain metadata (bitset(32 char string), isMBU, isNRI, etc.) that typically arrives in a special “UOS” event for the same SID, but sometimes late or missing. Previous Approach: We buffered all out-of-order events per SID in keyed state (using MapState/ListState). When UOS arrived or “ENDOFFLOW” stage is seen, we would enrich & emit all buffered events for that SID. RocksDB config is tuned for low memory pressure: Reduced write buffer and block cache sizes (setWriteBufferSize, setBlockCacheSize, fewer write buffers) Enabled compression (LZ4, and ZSTD at bottom-most level) Set incremental cleanup and compaction filters to help with TTL-based state expiration Problem: At our scale, keyed state explodes (potentially millions of active SIDs at peak, each with buffered events), leading to frequent heap space exhaustion, large checkpoint files, and task manager restarts The moment I submit the job within 5-10 mins , the heap space will reach the max limit and the job will get restarted, Please help me in fixing this . Thanks in advance.
  • m

    Mohamed Galal

    10/29/2025, 7:19 AM
    Hi all, can someone tell me please how can I use MongoDBSource cdc connector with flink 2.1, I can't find any compatible release with flink 2.1 for MongoDB
    p
    • 2
    • 2
  • a

    André Santos

    10/29/2025, 7:13 PM
    Hi folks! Found what looks like a bug in the k8s operator - when submitting session jobs in HA setups, the operator doesn't discover the leader JobManager and just connects to whatever endpoint it has configured. This means job submissions fail several times when they hit a non-leader JM pod. Anyone else run into this? The issue is in
    AbstractFlinkService.submitJobToSessionCluster()
    - it bypasses rest pod leader discovery entirely.
  • r

    Royston

    10/30/2025, 11:43 AM
    Hi I need to publish some metrics in flink , basically number of records processed or errors in flatMap function I see a issue where the total metrics are not flushed before the job ends, is there a way to do this manually Any recommendation for which metrics exporter I can use for this
    p
    • 2
    • 1
  • i

    Iain Dixon

    10/30/2025, 1:23 PM
    Hello, My name is Iain, I'm a PhD student at Newcastle University in the UK researching stream benchmarking as my topic. Specifically I'm interested in the effects of backpressure on stream benchmark results, and as such I have a question about metrics which suggest that an operator might become backpressured. According to this blog post (https://flink.apache.org/2019/07/23/flink-network-stack-vol.-2-monitoring-metrics-and-that-backpressure-thing/), the
    inPoolUsage
    and
    outPoolUsage
    are good metrics with which to assess the presence of backpressure. To test things out I built a really simple setup as seen in the picture below, where records are generated in the generator (via a loop to create the rate and Thread.sleep at the end to buff out the rest of the second, based on the DS2 wordcount here https://github.com/strymon-system/ds2/blob/master/flink-examples/src/main/java/ch/ethz/systems/strymon/ds2/flink/wordcount/sources/RateControlledSourceFunction.java), sent to a pipeline workload simulator (which is a single operator which counts the number of recieved records and runs a Thread.sleep for different frequencies in order to simulate pipeline workload), and exit to a sink where records are received but not saved or sent onwards. I bound the parallelism of each operator to 1 (to create the minimumal possible pipeline). The generator produces a constant workload of 1000 records per second, and the workload simulator produces a constant work for every n records. !

    experimental_setup▾

    Below I've included an experiment where I run my pipeline at high utilisation (Thread.sleep(1) every record, which should induce backpressure as the amount of work performed within the second exceeds a second, resulting in the buffers filling and backpressure being triggered) and a low pipeline utilisation (Thread.sleep(1) every other record, which should only create around 500ms of work which the operator should be able to handle over a second). The high workload hits a maximum outPoolUsage of the source operator, which makes sense to my understanding of how Flink handles backpressure, but the low amouont exhibits a sawtooth pattern. Looking at the isBackpressured metric of the same source operator, the low amount never triggers backpressure which makes sense, but my expectation was that the
    outPoolUsage
    would trend up to some value (as roughly 500ms of "work" should be created every second) and remain relatively constant at that value, raher than dipping back down and jumping up as seen in the graph. I'm not sure what mechanism in Flink would be responsible for this behaviour if the workload is constant, and I was wondering anyone working on Flink could explain what's occuring or point me in the right direction. I'm aware (from the linked blog post above) that the outPoolUsage metric is an aggregation of the floatingBuffersUsage and exclusiveBuffersUsage metrics, so the dropping to 10% would be one of the exclusiveBuffers, but why would floating buffers come and go if the pipeline workload and arrival rates are constant? !

    buffer_question▾

    To summarise, I'm wondering if anyone knows which mechanism is responsible for the sawtooth pattern in the outPoolUsage metric when the buffer isn't reaching 100% under a constant workload. My intuition is that it would increase to some threshold and stay (relatively) steady, but it jumps between 40% and 10%. Thank you very much for the time and assistance,! For anyone that would like to get in touch my email is i.g.dixon1@newcastle.ac.uk - Iain
    🙌 2
    👀 1
  • n

    Noufal Rijal

    11/04/2025, 4:19 PM
    Hi Team, I was going through the external resources implementation in Flink. Our usecase is we have to to model inference over the data that we receive, which requires GPU. As per the doc we have tried implementing the same, by setting up the values for nvidia. What we have observed is that - when we setup the amount = 1, each task manager that we spawn will start taking the entire GPU. This inturn is causing a critical resource scaling issue - say if we want 10TM then we need 10GPU's which is pretty costly. Is there a way by which we can solve this case, any kind of resource sharing mechanisms, or do we need to go ahead with a model server architecture. Note - we are using Nvidia T4 GPU nodes where CPU=4 , memory=28GB #C065944F9M2 #C03G7LJTS2G
    v
    • 2
    • 7
  • s

    Sidharth Ramalingam

    11/05/2025, 6:12 AM
    we are using Flink's
    AsyncIO
    function with Futures to make external gRPC calls. Currently, we have set the async capacity to 1, and we are using a blocking stub to make those calls. For each event, we trigger 4 Futures (i.e., 4 gRPC calls per event). Does this mean that the
    Executors.newFixedThreadPool()
    needs to have at least 4 threads to avoid queue starvation? Also, if we increase the async capacity to 2, should we increase the thread pool size to 8 to keep up with the parallel calls?
    r
    • 2
    • 2
  • j

    Jashwanth S J

    11/06/2025, 6:50 AM
    Team, We've baked JM and TM pod image to have jar files which are required for flinksessionjob submission. We're submitting jobs using
    jarURI: "file:///opt/flink/integration_testing_ui_demo.jar".
    Operator is failing to find the jar inside the JM/TM pod to bring up TM pods even though jar is present within image. I could exec and see the jar. Can someone help to find the cause Operator logs:
    Copy code
    2025-11-06 06:36:55,168 o.a.f.k.o.r.ReconciliationUtils [WARN ][ncm-sc/nc-int] Attempt count: 11, last attempt: false
    2025-11-06 06:36:55,259 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
    2025-11-06 06:36:55,362 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
    2025-11-06 06:36:55,459 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][ncm-sc/fsc-flink-cluster] Resource fully reconciled, nothing to do...
    2025-11-06 06:36:55,469 i.j.o.p.e.EventProcessor       [ERROR][ncm-sc/nc-int] Error during event processing ExecutionScope{ resource id: ResourceID{name='nc-int', namespace='ncm-sc'}, version: 48211671}
    org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
    	at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:130)
    	at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:58)
    	at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
    	at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
    Exec to JM Pod: I have jar inside
    Copy code
    ❯ kubectl exec -it -n ncm-sc fsc-flink-cluster-6c47f5964b-t5jhb -- bash
    Defaulted container "flink-main-container" out of: flink-main-container, setup-certs-and-plugins (init)
    root@fsc-flink-cluster-6c47f5964b-t5jhb:/opt/flink# ls
    '${sys:log.file}'   bin    examples                          lib       licenses   NOTICE   plugins        README.txt
     artifacts          conf   integration_testing_ui_demo.jar   LICENSE   log        opt      pod-template
    Describe flinksession job
    Copy code
    ❯ kubectl describe flinksessionjob nc-int -n ncm-sc | tail -n 20
    
    Status:
      Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{},"throwableList":[{"type":"java.io.FileNotFoundException","message":"/opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{}}]}
      Job Status:
        Checkpoint Info:
          Last Periodic Checkpoint Timestamp:  0
        Job Id:                                f72f53c6355212276b75452aa2dc376e
        Savepoint Info:
          Last Periodic Savepoint Timestamp:  0
          Savepoint History:
      Lifecycle State:      UPGRADING
      Observed Generation:  2
      Reconciliation Status:
        Last Reconciled Spec:      {"spec":{"job":{"jarURI":"file:///opt/flink/integration_testing_ui_demo.jar","parallelism":1,"entryClass":"com.beam.screaltime.worker.ncm.process_functions.NcmEventsJob","args":["--jobType","cdc","--cloudProvider","nx","--buildId","dev1-6404","--operatorParallelism","{\"default\":1}"],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null,"autoscalerResetNonce":null},"restartNonce":1,"flinkConfiguration":null,"deploymentName":"fsc-flink-cluster"},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","firstDeployment":true}}
        Reconciliation Timestamp:  1762411304062
        State:                     UPGRADING
    Events:
      Type     Reason               Age                  From  Message
      ----     ------               ----                 ----  -------
      Warning  SessionJobException  7m5s (x19 over 23m)  Job   /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
      Normal   Submit               7m5s (x19 over 23m)  Job   Starting deployment
    l
    • 2
    • 9
  • w

    windwheel

    11/07/2025, 8:59 AM
    https://github.com/apache/flink/pull/27152 Is anyone interested in this PR? I'm the author of this PR, and I've completed the FLIP code since its creation and fixed as many e2e tests as possible. However, no one has asked any questions in the discussion thread so far. Are there any community members with free time who could offer some suggestions? I'd really like to push this PR to be merged into the main branch.
    d
    • 2
    • 2
  • k

    Kamakshi

    11/07/2025, 10:59 PM
    Hi Team, I am trying to run Bdd tests with Flink Tables on docker. Is there any example of this?
  • e

    Elad

    11/09/2025, 1:03 PM
    Hi there Trying to deploy flink as standalone cluster on k8s WITHOUT operator (unfortunately I cannot install the required CRDs on my k8s cluster). I’m failing to deploy the cluster in high availability mode. I have read the documentation and added the needed configuration (running the job manager with the args [“standalone-job”, “—host”, “$POD_IP”] and with setting the pod ip env variable dinamicly of course ) (read deployment/resource-providers/standalone/kubernetes/#applicatiob-cluster-resource-definitions). When reading the generated high availability ConfigMap it seems like the job managers registered themselves with their localhost address- 127.0.0.1, instead of using the pod ip as needed. It also matches with the TaskManagers trying to connect to a localhost address… I’m wondering if the job managers using the —host property, and if not - what should I do to make them work? TLDR - trying to configure high availability on k8s WITHOUT operator failing - I would love to see examples for a working setup.
    l
    p
    • 3
    • 4
  • h

    Han You

    11/10/2025, 3:53 PM
    Hi team! I think there has been a change in k8s operator which breaks autoscaler standalone. In this hotfix commit, we stop all autoscaler config keys from being set on the jobmanager. However the standalone autoscaler reads those configs from jobmanager as shown in this line. The effect is that standalone autoscaler no longer works in operator version 1.13. Can someone more familiar with the operator and autoscaler confirm? I can also help make a PR to fix. Thank you!
  • v

    Vasu Dins

    11/11/2025, 6:53 AM
    Hi everyone, I’m trying to integrate Elasticsearch 7 with PyFlink (version 1.20.1) using the jar
    flink-connector-elasticsearch7-3.1.0-1.20.jar
    . when I run my job, I’m getting the following error:
    Copy code
    Exception in thread "Thread-5" java.lang.NoClassDefFoundError: org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase
    py4j.protocol.Py4JError: org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder does not exist in the JVM
    here’s a minimal example of my code
    Copy code
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
    import json
    
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///C:/flink-1.20.1/lib/flink-connector-elasticsearch7-3.1.0-1.20.jar")
    
    data_stream = env.from_collection([
        {"id": "1", "name": "John", "age": 25},
        {"id": "2", "name": "Jane", "age": 30}
    ])
    
    json_stream = data_stream.map(lambda x: json.dumps(x))
    
    es7_sink = Elasticsearch7SinkBuilder() \
        .set_bulk_flush_max_actions(1) \
        .set_emitter(ElasticsearchEmitter.static_index('my-index', 'id')) \
        .set_hosts(['10.0.0.102:9200']) \
        .build()
    
    json_stream.sink_to(es7_sink)
    env.execute("Flink Elasticsearch 7 Job")
    i’m running it using
    Copy code
    flink run -py sampletestelastic.py
    has anyone faced this issue before? seems like
    ElasticsearchSinkBuilderBase
    class is missing from the jar or not visible to PyFlink. do i need an extra dependency or different jar for flink 1.20.1? It seems like
    ElasticsearchSinkBuilderBase
    might be missing or not accessible from the JVM side. any guidance or suggestions would be really appreciated
    a
    p
    • 3
    • 3
  • v

    Vikas Patil

    11/11/2025, 9:02 PM
    Hello All, our pekko framesize keeps on increasing. This is causing job failures. We use incremental checkpointing and flink version 1.18.1. Is there a relation between the number of sst files and pekko payload ? The number of sst files is well over 100k for this job.
  • u

    徐平

    11/12/2025, 5:16 AM
    hi everyone, i resume the task from savepoint,but it failed suddenly with this problem,who can give me some tricks?thank u very much 2025-11-12 050248.618 [Source: dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] WARN org.apache.flink.runtime.taskmanager.Task - Source: dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 (765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) switched from RUNNING to FAILED with failure cause: java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = event_topic, partition = 1, leaderEpoch = 7, offset = 2399381243, CreateTime = 1762853256453, serialized key size = 37, serialized value size = 2917, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@54e6a925, value = [B@16f80c51). at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 common frames omitted Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:115) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ... 15 common frames omitted Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at StreamExecCalc$150.processElement_0_0_rewriteGroup82(Unknown Source) at StreamExecCalc$150.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 23 common frames omitted Caused by: java.lang.IllegalStateException: Received element after endOfInput: Record @ (undef) : +I(17653228541,33333888241751361,6931888464607053,deal_loginresult,true,8548827403,,false,fido,101.230.113.1577001894070038941:????????3,HUNDSUN,CTP,b7afd566c360b4104d44d46bc4cc972e,cf1b93e8c889f5acaa5956a01a1edf30,,,188201457,,,,null,null,null,null,null,null,null,null,null,??????,app,null,null,null,null,null,null,null) at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 28 common frames omitted
    p
    • 2
    • 2
  • g

    GAURAV MIGLANI

    11/12/2025, 8:30 AM
    in flink 1.19.0, is there any issue with simple pipeline kafka source->sql transformation-> kafka sink in k8s kubernetes flink operator, atleast once guarantee is failing and at time of kubernetes consolidation, data is getting dropped in flink, the pipeline is using processing time 1to 1 sink only, please help
    r
    • 2
    • 6
  • h

    Hristo Yordanov

    11/12/2025, 7:28 PM
    Hi there, Trying to explore Flink CDC Pipeline (source postgres and sink fluss). Hot storing to Flus is working well. What I'm trying is activating cold store (Minio, Paimon) using tiering service. Is running in separate container. All containers start well and cdc working as expecting in hot storage with fluss, but no cold storage is happening at all. I know that normally tiering should be activated but in pipelines with fluss sink I don't see how to doit. My question is if pipeline with fluss sink support tiering to paimon? Or i should use other approach but not pipelines?
    r
    • 2
    • 1
  • a

    Ananth bharadwaj

    11/13/2025, 2:44 AM
    Hi. I am trying to use postgres-cdc connector where my source and destination both are postgres. I have a fairly large table size 4G which is taking considerate time for initial snapshot process. I would like make use of snapshot.select.statement.overrides feature to only start snapshot from a timestamp. For example - snapshot.select.statement.overrides.public.orders=SELECT * FROM public.orders WHERE order_date > '2020-11-06'. But even after that the full snapshot gets triggered. I have specified scan.incremental.snapshot.enabled as false but still is same behaviour. Is there a way to only snapshot with a timestamp or id based condition?