https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • v

    Victor Babenko

    10/09/2025, 8:22 PM
    Does Flink support any kind of micro-batching in Datastream API? We’d like to reduce the number of times state is modified in a windowing operation (it updates state on every incoming message). Ideally we’d batch the incoming updates for each window (by count and time), and update the window in batches that way. However it doesn’t look like there is an easy way to do that without either updating state on every message anyway, or risking data loss. Has anyone tried implementing micro-batching in Datastream?
    d
    • 2
    • 5
  • r

    Royston

    10/10/2025, 3:08 PM
    Does flink support dynamic credentials for GCS/S3 Basically per job? From the flink code itself, it seems to be registering credentials at cluster creation. is there a option to do it using some FlinkConfig at Runtime
    • 1
    • 1
  • t

    Trystan

    10/10/2025, 5:19 PM
    Copy code
    Event  | Info    | SCALINGREPORT   | Scaling execution enabled, begin scaling vertices:
    { Vertex ID 5591cebca2a45b74833df01196d1a431 | Parallelism 74 -> 80 | Processing capacity 6671.45 -> 7131.00 | Target data rate 3889.81}
    { Vertex ID 162a29c968c94eac6fbdfc2f8ccc2080 | Parallelism 180 -> 128 | Processing capacity Infinity -> Infinity | Target data rate 50.67}
    { Vertex ID 009b60dbc7737a8a2e91e7b3c30d9949 | Parallelism 128 -> 169 | Processing capacity 70.68 -> 93.00 | Target data rate 50.67}
    these autoscaling decisions don't make a lot of sense to me. why on earth is it scaling UP
    009b
    ? here's the output log from right before:
    Copy code
    ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.TRUE_PROCESSING_RATE.Average: 71.325
    ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.SCALE_DOWN_RATE_THRESHOLD.Current: 176.0
    ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.SCALE_UP_RATE_THRESHOLD.Current: 56.0
    is this a catchup buffer problem? maybe it thinks it would be unable to catch up within the desired window?
  • b

    Barak Ben-Nathan

    10/12/2025, 7:35 AM
    hi guys, I am using Flink 1.18 Does anyone know if it's possible to use the
    State Processor API
    to generate both a keyed state and a broadcast state for the same operator?
  • i

    ilililil

    10/15/2025, 4:19 AM
    Hi. Are Flink's timer service & Collector thread-safe? Async operation & Future callback was added as the State API was upgraded to v2. Will it be thread-safe to call the Timer service & Collector from that callback? Example
    Copy code
    final var stateFuture = state.asyncValue();
    stateFuture.thenAccept(value -> {
        ctx.timerService().registerEventTimeTimer(timestamp + someValue);
        out.collect(foo(value));
    });
    z
    • 2
    • 5
  • e

    Elad

    10/15/2025, 6:18 AM
    Hi I'm looking into deploying flink natively on k8s, and I wanted if anyone else does it - and if so, what are the best practices to handle such deployment, in terms of connecting it to a ci cd pipeline, integrating custom flink cluster setting not via command line arguments (if it's even possible - the documentation does not mention such mechanism) etc. Thanks in advance!
    m
    e
    +2
    • 5
    • 6
  • s

    Sergio Morales

    10/15/2025, 12:01 PM
    Hi, I was wondering if someone has created OpenRewrite recipes for Flink updates before (https://docs.openrewrite.org/recipes), if so, can you share them? I'm looking for ways to simplify the upgrade to the most recent Flink version.
  • r

    Royston

    10/16/2025, 8:18 AM
    Can someone tell me if I can use KafkaSource in flink 2.1.0 The docs say there is no connector yet for kafka for 2.1.0 (confused by this statement)
    e
    d
    • 3
    • 2
  • g

    George Leonard

    10/16/2025, 11:46 AM
    curious issue, configured a HMS as catalog. then created the below catalogs in it via flink and the associated databases nside the catalogs. notice how the postgres_catalog has the 2 databases from the kafka_catalog listed. The commands are from a totally new, nothing exist environment. catalog is backed by a postgre sql database. I understood each catalog is it's own namespace... if the same is done using a in memory catalog then this does not happen.
    Copy code
    Flink SQL> CREATE CATALOG kafka_catalog WITH (
    >   'type'          = 'hive',
    >   'hive-conf-dir' = './conf/'
    > );
    [INFO] Execute statement succeeded.
    
    Flink SQL> use catalog kafka_catalog;
    > 
    [INFO] Execute statement succeeded.
    
    Flink SQL> CREATE DATABASE IF NOT EXISTS kafka_catalog.inbound;  
    > 
    [INFO] Execute statement succeeded.
    
    Flink SQL> CREATE DATABASE IF NOT EXISTS kafka_catalog.outbound;  
    > 
    [INFO] Execute statement succeeded.
    
    Flink SQL> CREATE CATALOG postgres_catalog WITH (
    >   'type'          = 'hive',
    >   'hive-conf-dir' = './conf/'
    > );
    [INFO] Execute statement succeeded.
    
    Flink SQL> use catalog kafka_catalog;
    [INFO] Execute statement succeeded.
    
    Flink SQL> show databases;
    +---------------+
    | database name |
    +---------------+
    |       default |
    |       inbound |
    |      outbound |
    +---------------+
    3 rows in set
    
    Flink SQL> use catalog postgres_catalog;
    [INFO] Execute statement succeeded.
    
    Flink SQL> show databases;
    +---------------+
    | database name |
    +---------------+
    |       default |
    |       inbound |
    |      outbound |
    +---------------+
    3 rows in set
    Copy code
    Flink SQL> use catalog kafka_catalog;
    [INFO] Execute statement succeeded.
    
    Flink SQL> create database test;
    [INFO] Execute statement succeeded.
    
    Flink SQL> use catalog postgres_catalog;
    [INFO] Execute statement succeeded.
    
    Flink SQL> show databases;
    +---------------+
    | database name |
    +---------------+
    |       default |
    |       inbound |
    |      outbound |
    |          test |
    +---------------+
    j
    • 2
    • 3
  • v

    vasanth loka

    10/16/2025, 6:52 PM
    i am using flink 2.1.0 with JM + 2TM cluster. i want to use a shared S3 storage system between JM & TM - any advise with regards to pros and cons?
  • i

    Itamar Weiss

    10/21/2025, 6:04 AM
    Hi all, I’m developing an event-based file source to continuously monitor an S3 bucket. The problem with the existing file source is that continuously listing the bucket is expensive, and the state grows with the number of files. My idea is to use SQS and listen for ObjectCreated events instead of polling the bucket. I’m currently considering two design alternatives: 1. Periodic enumerator – The enumerator is triggered periodically and drains the SQS queue. Each S3 object becomes a split, similar to how Flink’s current file source works. 2. Single-reader enumerator – The enumerator simply assigns the SQS queue to a single reader, which continuously consumes it. In this model, there is a single split (the SQS queue itself), similar to how FLIP-27 treats Kafka partitions as splits assigned to readers. Has anyone worked on a similar approach or explored event-driven file sources before?
  • r

    Royston

    10/21/2025, 6:13 AM
    Hi Is there any documentation for fault tolerance in batch mode As checkpointing doesn't work in batch mode
    d
    • 2
    • 4
  • v

    Vikas Patil

    10/21/2025, 2:37 PM
    Hello All, Flink uses a configured savepoint path before HA. So the operator’s comment assumes HA will be used, but with execution.savepoint.path present, Flink never reaches HA. Code that proves it: • Operator sets the dummy when it thinks HA is available:
    Copy code
    //ApplicationReconciler.java
    
    } else if (requireHaMetadata && flinkService.atLeastOneCheckpoint(deployConfig)) {
        // Last state deployment, explicitly set a dummy savepoint path to avoid accidental
        // incorrect state restore in case the HA metadata is deleted by the user
        deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, LAST_STATE_DUMMY_SP_PATH);
        status.getJobStatus().setUpgradeSavepointPath(LAST_STATE_DUMMY_SP_PATH);
    } else {
        deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
    • Flink, given any savepoint/checkpoint pointer, immediately resolves that path and fails fast on the dummy before any HA path is considered:
    Copy code
    public static FsCompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
        final Path path;
        try { path = new Path(checkpointPointer); } catch (Exception e) { /* invalid URI */ }
        final FileSystem fs = path.getFileSystem();
        final FileStatus status;
        try { status = fs.getFileStatus(path); } catch (FileNotFoundException e) {
            throw new FileNotFoundException("Cannot find checkpoint or savepoint file/directory '"
                + checkpointPointer + "' on file system '" + fs.getUri().getScheme() + "'.");
        }
        // ...
    }
    Why is this the case ? Does anyone have any context on how to solve this ?
  • f

    FAS

    10/22/2025, 4:45 AM
    Hi Everyone, I'm encountering an issue with a cross-account Flink SQL setup and would appreciate some clarification on how
    assume-role
    is handled by the Iceberg catalog. Scenario 1. Account #1: Runs a Flink (1.19.1) job on an EKS cluster. 2. Account #2: Hosts Iceberg tables in an S3 bucket (
    <s3://account-2-bucket-iceberg/dbstore1/>
    ) and manages metadata using the AWS Glue Catalog (
    awsAccount2Id
    ). 3. Permissions: ◦ The Flink EKS pod in Account #1 has a Service Account configured with OIDC. ◦ This Service Account assumes a cross-account role (
    arn:aws:iam::awsAccount2Id:role/cross-account-role
    ) in Account #2. 4. Verification: ◦ I have `exec`'d into the running Flink pod. ◦ From the pod, I can successfully use the AWS CLI to assume the cross-account role. ◦ After assuming the role, I can successfully list the Glue databases and tables in Account #2. ◦ This confirms the underlying EKS OIDC, IAM roles, and network access are all correctly configured. The Challenge In my Flink job, I first define the catalog for Account #2. 1. Create Catalog (Success) This SQL statement executes successfully, and the Flink logs confirm it:
    2025-10-22 03:57:00,929 INFO ... - SQL statement executed successfully. sql=CREATE CATALOG \awsAccount2Id
    ...`` SQL
    Copy code
    CREATE CATALOG `awsAccount2Id`
    WITH (
      'type' = 'iceberg',
      'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
      'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
      'warehouse' = '<s3://account-2-bucket-iceberg/dbstore1/>',
      'client.assume-role.arn' = 'arn:aws:iam::awsAccount2Id:role/cross-account-role',
      'glue.catalog-id' = 'awsAccount2Id',
      'client.region' = 'us-east-1',
      'client.credentials-provider' = 'software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider'
    );
    2. Select from Catalog (Failure) Immediately after the catalog is created, my Flink job executes the following
    SELECT
    query: SQL
    Copy code
    SELECT 
    ....
    
    FROM `awsAccount2Id`.`dbstore1`.table1
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='30s') */;
    This query fails with a validation error:
    Copy code
    2025-10-22 03:57:06,710 ERROR ... - Failed to execute SQL statement:
    SELECT ...
    FROM `awsAccount2Id`.`dbstore1`.table1 ...
    ;
    org.apache.flink.table.api.ValidationException: SQL validation failed. From line 11, column 6 to line 11, column 59: Object 'dbstore1' not found within 'awsAccount2Id'
    I also noticed that when Flink logs the list of available databases, it only shows databases from Account #1, not the cross-account ones from Account #2. My Question My expectation was that by defining
    client.assume-role.arn
    and
    glue.catalog-id
    in the
    CREATE CATALOG
    statement, any subsequent Flink SQL operations referencing the
    awsAccount2Id
    catalog (like my
    SELECT
    query) would automatically use those settings to assume the role and query the Glue catalog in Account #2. Why is Flink reporting that the database
    dbstore1
    is "not found," even though the catalog was created successfully and configured to assume a role that can see that database? i can see tables from this database when i manually assume-role using aws-cli from that pod. It seems the
    SELECT
    query is not honoring the catalog's
    assume-role
    configuration and is somehow still querying the default Glue catalog in Account #1. Is this expected, or am I missing a configuration step for Flink to correctly use the assumed role for metadata discovery after the catalog is created?
  • j

    Jaya Ananthram

    10/22/2025, 9:10 AM
    Hello 👋 Question about the Flink AsyncIO. In my DAG. I have two Flink AsyncIO operators (say Op1 & Op2), and both are using unordered wait, and by default, Flink chains them together. In this case, whether Flink will guarantee to invoke Op2 after Op1 completion/failure for an event (E1), or can we expect Op1 and Op2 to trigger at the same time for event E1?
    p
    • 2
    • 1
  • u

    מייקי בר יעקב

    10/22/2025, 11:18 PM
    I am trying to deploy flink 2.1 without operator on application mode, My main class is org.example.Main but i get this error: ClassNotFoundError for another class in the project
  • e

    Elad

    10/23/2025, 8:43 AM
    Hello 😄 I want to configure the flink cluster logging with log4j2.xml configuration, but in the documentation there is no mention of usage or support of this file, and only the legacy log4j properties files. I even got an error when trying to configure log4j2.xml file on the flink deployment because it’s not supported - even though flink does use log4j2 under the hood. Is there any documentation I’m missing about configuring flink with log4j2.xml? Or if there is no current support for that - is there any expected date / release for that?
  • e

    Eric Huang

    10/23/2025, 1:36 PM
    Hello everyone! I am using Flink 1.16 with CEP, and I got this NullPointerException. The job just made a full restart from an inner exception, and around 10 minutes later, the NullPointerException appeared. Appreciate it very much if anyone can help.
    Copy code
    2025-10-23 14:05:32,327 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (21/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_20_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:05:32,327 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (90/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_89_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:05:32,327 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (59/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_58_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:05:32,327 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (17/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_16_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:05:32,400 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - DelayableMessageProcess -> Sink: xiaoxiang_reach_system (1/1) (dcdc4daa8ced8ca9d2b8fc6c58e26129_0a53a086337bb3f8a33ad689643a92fc_0_1) switched from INITIALIZING to RUNNING.
    2025-10-23 14:10:28,717 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 191 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1761199828716 for job ffffffffbd30e5570000000000000001.
    2025-10-23 14:10:30,023 INFO  org.apache.flink.runtime.state.SharedStateRegistryImpl       [] - state self-sustained:true, lastCompletedCheckpoint:191, earliestDependent:9223372036854775807, highestNotClaimedCheckpointID:-1
    2025-10-23 14:10:30,023 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - job ffffffffbd30e5570000000000000001 checkpoint 191 completed, job is state-sustained
    2025-10-23 14:10:30,207 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 191 for job ffffffffbd30e5570000000000000001 (280739389 bytes, checkpointDuration=1425 ms, finalizationTime=66 ms).
    2025-10-23 14:15:28,717 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering checkpoint 192 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1761200128716 for job ffffffffbd30e5570000000000000001.
    2025-10-23 14:15:29,030 INFO  org.apache.flink.runtime.state.SharedStateRegistryImpl       [] - state self-sustained:true, lastCompletedCheckpoint:192, earliestDependent:9223372036854775807, highestNotClaimedCheckpointID:-1
    2025-10-23 14:15:29,030 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - job ffffffffbd30e5570000000000000001 checkpoint 192 completed, job is state-sustained
    2025-10-23 14:15:29,096 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed checkpoint 192 for job ffffffffbd30e5570000000000000001 (317081932 bytes, checkpointDuration=335 ms, finalizationTime=45 ms).
    2025-10-23 14:16:37,533 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Match[7] -> Calc[8] -> SinkConversion[9] (12/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_11_1) switched from RUNNING to FAILED on session-2123414-1761142270-taskmanager-1-10 @ hldy-data-k8s-flink-ssd-node03895.mt (dataPort=23347).
    java.lang.NullPointerException: null
    	at org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor.materializeMatch(SharedBufferAccessor.java:213) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.cep.nfa.NFA.processMatchesAccordingToSkipStrategy(NFA.java:474) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:337) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.cep.operator.CepOperator.advanceTime(CepOperator.java:429) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.cep.operator.CepOperator.onEventTime(CepOperator.java:325) ~[flink-cep-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:599) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:552) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:843) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:792) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:969) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:948) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) ~[flink-dist-1.16.1.jar:1.16.1]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) ~[flink-dist-1.16.1.jar:1.16.1]
    	at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
    2025-10-23 14:16:37,534 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.TaskManagerRestartStrategy [] - Received failure event: TaskFailureEvent{taskManagerId=session-2123414-1761142270-taskmanager-1-10, timestamp=1761200197533, cause=NullPointerException: null}, excluded: false
    2025-10-23 14:16:37,534 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.TaskManagerRestartStrategy [] - Resetting restart strategy state due to stable running period
    2025-10-23 14:16:37,536 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.ContinuousRestartLimitation [] - Earliest failure timestamp: 1761199499926, max continuous restart duration: 28800000 ms
    2025-10-23 14:16:37,536 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - 423 tasks will be restarted to recover the failed task dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_11_1.
    2025-10-23 14:16:37,536 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job rt_scene_51697_staging (ffffffffbd30e5570000000000000001) switched from state RUNNING to RESTARTING.
  • m

    Mrugesh Kadia

    10/24/2025, 9:48 AM
    Hi folks, I’m currently working on a POC using Flink for transforming Kafka events. Our setup involves consuming events from Kafka, applying transformations, and producing the results back to Kafka. Since we have multiple event types with varying structures, I’m evaluating which approach would be best — DataStream API, Table API, or SQL. At the moment, I’m exploring Flink SQL because it’s easier for end developers and faster to integrate. For each event type, I’ve created a Kafka source table and defined a corresponding view to apply transformations before inserting the results into separate Kafka topics.For more complex transformations, we plan to implement custom UDFs within Flink SQL. Question: Given this setup, which approach would be most suitable — DataStream API, Table API, or SQL — especially when dealing with multiple event types and varying transformation complexity?
  • m

    Mohsen Rezaei

    10/27/2025, 11:12 PM
    Hey everyone! I was playing around with the ForSt (disaggregated) state backend, and ran into an issue with the async (incremental) state and a simple
    GROUP BY
    clause. I filed an issue for this to show some details on what's going on here, but I was curious if anyone else has run into this since it's a very basic test against Flink 2.1? Running that scenario in a sync state works fine, but is not going to be ideal for more complex scenarios
  • a

    Arman shakeri

    10/28/2025, 10:09 AM
    Hi guys, I'm working on a statefull pipeline and I have some issues. I want to read three stream from kafka topics and join them for creating a flatten table but kafka topics fill with CDC data so all records may have updates and I want to have last version. I tried use interval join but I can not keep events for long time(huge states) is there better solution for this scenario? I was thinking about store last state of records in external cach db like redis. could you please share your ideas?
    p
    • 2
    • 1
  • t

    Tiago Pereira

    10/28/2025, 12:11 PM
    hey guys, in flink we dont have a callback when a map, filter or other function gives error. its possible to introduce that kind of feature in flink and pyflink in a future release?
    p
    • 2
    • 1
  • m

    Manish Jain

    10/28/2025, 1:49 PM
    Hi Team, We are using Flink K8s operator to deploy flink in our azure k8s environment. We have some secrets that we use in the flink jobs, and we are using a azure-keyvault-secrets-mounter to mount this secret as an environment variable in Flink Job Manager and Task manager. In our setup, we change the secret after every x days. And once the secret is changed, the azure secret mounter identifies this change and applies it to the k8s cluster. The challenge that we are facing is that even though the secret has changed, the pods of job manager and task manager do not restart. This works seamlessly for all other pods in our system. We are using
    stakater/reloader:v1.0.29
    to reload the pods when a config changes. But the annotations that are working with other pods, are not working with Flink components. Is anyone using a similar setup and has run into such a problem? We don't want to create a custom solution for job restarts, and manual restarts, are not optimal.
    p
    • 2
    • 1
  • f

    Francisco Morillo

    10/28/2025, 7:29 PM
    Is there any way to migrate a flink 1.20 state that used collections to flink 2.1?
  • n

    Noufal Rijal

    10/29/2025, 5:59 AM
    Hi Team, I have been recently trying out the flink-k8s-operator for session mode for the auto-scaler feature. Based on the testing that we have done till date its found that - 1. We wont be able to submit an entirely different docker image to the existing cluster ( which runs on a base image) a. its because the kind FlinkSessionJob does not support any field or arg for passing the images 2. Now we were checking the possibility of passing a zipped virtual env, while submitting the job a. For this we were focusing on the argument -pyarch and were passing the blob path to the zipped env file b. The problem that we are facing here is - the flink system is not capturing the zipped env and the package that we have supplied within the new venv via the zip is said to not found
    Copy code
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: flink-session
      namespace: flink-test
    spec:
      image: pyflink-session-test:v1.0
      flinkVersion: v1_20
      imagePullPolicy: Always
      serviceAccount: flink
      mode: standalone
      
      flinkConfiguration:
        # All Flink runtime config keys go here
        fs.allowed-fallback-filesystems: "file"
        io.tmp.dirs: "/tmp"
        taskmanager.numberOfTaskSlots: "4"
        
        # ===== OPERATOR AUTOSCALER =====
        kubernetes.operator.job.autoscaler.enabled: "true"
        kubernetes.operator.job.autoscaler.target.utilization: "0.7"
        kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
        kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
        kubernetes.operator.job.autoscaler.metrics.window: "5m"
        kubernetes.operator.job.autoscaler.scale-up.grace-period: "1m"
        kubernetes.operator.job.autoscaler.scale-down.grace-period: "5m"
        
      # # 💡 MOVED: jobManager must be a direct child of 'spec'
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        replicas: 2
        resource:
          # memory: "10240m"
          memory: "2048m"
          cpu: 2
    Copy code
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkSessionJob
    metadata:
      name: ppe-kafka-streaming
      namespace: flink-test
    spec:
      deploymentName: flink-session
      
      job:
    
        jarURI: /opt/flink/opt/flink-python-1.20.3.jar
        # entryClass: org.apache.flink.client.python.PythonDriver
        # args:
        #   # 1. Main PyFlink Script
        #   - "--python"  # Changed from -py
        #   - "/opt/flink/usrlib/streaming_test.py"
          
        #   # 2. Python Archives
        #   - "--pyArchives"  # Changed from -pyarch
        #   - "blob_path#flink_venv"
          
        #   # 3. Python Executable
        #   - "-pyexec"  # This is correct!
        #   - "flink_venv/bin/python" 
    
        args:
        - "-py"
        - "/opt/flink/usrlib/streaming_test.py"
        - "-pyarch"
        - "blob_path#flink_venv"
        - "-pyexec"
        - "flink_venv/bin/python"
        
        parallelism: 2
    Request for your help if you have faced and tackled a similar issue. #C065944F9M2 #C03G7LJTS2G #C03GV7L3G2C
  • s

    Saketh

    10/29/2025, 7:09 AM
    Hi all, I’m facing a serious state/memory scalability issue with my Flink enrichment pipeline. Context: Input: Streaming audit events, 90 lakh (9 million) unique SIDs per day, ~30 stages per SID, data arrives out of order. Goal: Enrich each event with certain metadata (bitset(32 char string), isMBU, isNRI, etc.) that typically arrives in a special “UOS” event for the same SID, but sometimes late or missing. Previous Approach: We buffered all out-of-order events per SID in keyed state (using MapState/ListState). When UOS arrived or “ENDOFFLOW” stage is seen, we would enrich & emit all buffered events for that SID. RocksDB config is tuned for low memory pressure: Reduced write buffer and block cache sizes (setWriteBufferSize, setBlockCacheSize, fewer write buffers) Enabled compression (LZ4, and ZSTD at bottom-most level) Set incremental cleanup and compaction filters to help with TTL-based state expiration Problem: At our scale, keyed state explodes (potentially millions of active SIDs at peak, each with buffered events), leading to frequent heap space exhaustion, large checkpoint files, and task manager restarts The moment I submit the job within 5-10 mins , the heap space will reach the max limit and the job will get restarted, Please help me in fixing this . Thanks in advance.
  • m

    Mohamed Galal

    10/29/2025, 7:19 AM
    Hi all, can someone tell me please how can I use MongoDBSource cdc connector with flink 2.1, I can't find any compatible release with flink 2.1 for MongoDB
    p
    • 2
    • 1
  • a

    André Santos

    10/29/2025, 7:13 PM
    Hi folks! Found what looks like a bug in the k8s operator - when submitting session jobs in HA setups, the operator doesn't discover the leader JobManager and just connects to whatever endpoint it has configured. This means job submissions fail several times when they hit a non-leader JM pod. Anyone else run into this? The issue is in
    AbstractFlinkService.submitJobToSessionCluster()
    - it bypasses rest pod leader discovery entirely.
  • r

    Royston

    10/30/2025, 11:43 AM
    Hi I need to publish some metrics in flink , basically number of records processed or errors in flatMap function I see a issue where the total metrics are not flushed before the job ends, is there a way to do this manually Any recommendation for which metrics exporter I can use for this
    p
    • 2
    • 1
  • i

    Iain Dixon

    10/30/2025, 1:23 PM
    Hello, My name is Iain, I'm a PhD student at Newcastle University in the UK researching stream benchmarking as my topic. Specifically I'm interested in the effects of backpressure on stream benchmark results, and as such I have a question about metrics which suggest that an operator might become backpressured. According to this blog post (https://flink.apache.org/2019/07/23/flink-network-stack-vol.-2-monitoring-metrics-and-that-backpressure-thing/), the
    inPoolUsage
    and
    outPoolUsage
    are good metrics with which to assess the presence of backpressure. To test things out I built a really simple setup as seen in the picture below, where records are generated in the generator (via a loop to create the rate and Thread.sleep at the end to buff out the rest of the second, based on the DS2 wordcount here https://github.com/strymon-system/ds2/blob/master/flink-examples/src/main/java/ch/ethz/systems/strymon/ds2/flink/wordcount/sources/RateControlledSourceFunction.java), sent to a pipeline workload simulator (which is a single operator which counts the number of recieved records and runs a Thread.sleep for different frequencies in order to simulate pipeline workload), and exit to a sink where records are received but not saved or sent onwards. I bound the parallelism of each operator to 1 (to create the minimumal possible pipeline). The generator produces a constant workload of 1000 records per second, and the workload simulator produces a constant work for every n records. !

    experimental_setup▾

    Below I've included an experiment where I run my pipeline at high utilisation (Thread.sleep(1) every record, which should induce backpressure as the amount of work performed within the second exceeds a second, resulting in the buffers filling and backpressure being triggered) and a low pipeline utilisation (Thread.sleep(1) every other record, which should only create around 500ms of work which the operator should be able to handle over a second). The high workload hits a maximum outPoolUsage of the source operator, which makes sense to my understanding of how Flink handles backpressure, but the low amouont exhibits a sawtooth pattern. Looking at the isBackpressured metric of the same source operator, the low amount never triggers backpressure which makes sense, but my expectation was that the
    outPoolUsage
    would trend up to some value (as roughly 500ms of "work" should be created every second) and remain relatively constant at that value, raher than dipping back down and jumping up as seen in the graph. I'm not sure what mechanism in Flink would be responsible for this behaviour if the workload is constant, and I was wondering anyone working on Flink could explain what's occuring or point me in the right direction. I'm aware (from the linked blog post above) that the outPoolUsage metric is an aggregation of the floatingBuffersUsage and exclusiveBuffersUsage metrics, so the dropping to 10% would be one of the exclusiveBuffers, but why would floating buffers come and go if the pipeline workload and arrival rates are constant? !

    buffer_question▾

    To summarise, I'm wondering if anyone knows which mechanism is responsible for the sawtooth pattern in the outPoolUsage metric when the buffer isn't reaching 100% under a constant workload. My intuition is that it would increase to some threshold and stay (relatively) steady, but it jumps between 40% and 10%. Thank you very much for the time and assistance,! For anyone that would like to get in touch my email is i.g.dixon1@newcastle.ac.uk - Iain
    🙌 2