https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • n

    Nikola Stanisavljevic

    09/21/2025, 8:47 PM
    Hi everyone, With Flink SQL, is there a possibility to handle poison pill message in way it is sent to a dlq? So something like side-output in DataStream api. Or what would be alternative in handling such scenario that message processing is skipped somehow.
  • a

    Ashish Marottickal Gopi

    09/22/2025, 10:06 AM
    Hi Everyone, We are using K8S operator to manage flink Application clusters. In my case for one of the jobs I had set the
    "fixed-delay"
    strategy for failure restarts. This led to job being in Terminally failed status . As per the documentation, we went for manual restore with deletion of the FlinkDeployment CRD and restoring from checkpoint. But in our case the application still doesnt start with FlinkDeployment CRD status being in Reconciling state with error as :
    Copy code
    JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been del │ │ eted. Manual restore required.
    Could someone help with this ?
    a
    • 2
    • 2
  • m

    Mourad HARMIM

    09/22/2025, 2:06 PM
    Hello Everyone, asking the community for a little help. In productio, my team and I are using states with TTL. However we noticed that our savepoints are still increasing in size. It seems that there are some keys that the job is not accessing during its run and to confirm it i'd like to check the timestamps that go along the values stored in the savepoint. I couldn't find the way to access it. Do you have some tip ? Also i tried using .cleanupFullSnapshot() in order to trigger the clean along a savepoint but it doesn't seem to have an effect, that why i want to access the timestamp of the last updated entries in the savepoint Thank you in advance
    h
    • 2
    • 1
  • c

    Carlos Sanabria Miranda

    09/23/2025, 1:49 PM
    👋 Hello everyone, My team is using a
    RichAsyncFunction
    to call an external HTTP service, with
    org.apache.http.nio.client.HttpAsyncClient
    as HTTP client. It is an async HTTP client that follows the non-blocking I/O (NIO) model, allowing it to handle many concurrent HTTP connections with a very small number of threads, using the Reactor pattern. I'm trying to figure out the best way to create and configure this HTTP client when used inside a RichAsyncFunction: 1. Client creation: Is it best to create one client for each subtask (in the
    open()
    method), or should we share a single static client across all operator subtasks in the same JVM? I've read this last option is an anti-pattern in Flink, but just wanted to see if it still makes sense. 2. Num threads: How should we configure the number of IO dispatch threads in the client? Should we limit it to one thread per client and rely on Flink's parallelism, or set it higher? By default this library configures 2 threads. Should we set as many threads as CPU processors available to the JVM (
    Runtime.getRuntime.availableProcessors()
    )? Cheers!
    j
    • 2
    • 1
  • j

    Jo Wijnant

    09/24/2025, 1:38 PM
    Hello, I'm playing around with Flink (sql) and Kafka and I wonder why
    properties.client.id
    on a source table (kafka consumer) is ignored whereas on a sink table (kafka producer) it effectively applies. This is the relevant SQL for all my tables:
    Copy code
    CREATE TABLE ...
      WITH (
      'connector' = 'upsert-kafka',
      'properties.client.id' = 'flink-project-usage',
       ...)
    The client.id is applied to the producer (logs of taskmanager):
    Copy code
    2025-09-24 13:20:17,676 INFO  org.apache.kafka.common.config.AbstractConfig                [] - ProducerConfig values:
            acks = -1
            ...
            client.id = flink-project-usage
    But not for the consumer. It takes {group.id}-{index}
    Copy code
    2025-09-24 13:20:17,478 INFO  org.apache.kafka.common.config.AbstractConfig                [] - ConsumerConfig values:
            allow.auto.create.topics = false
            ...
            client.id = coz_irods_project_usage-0
    Why can I not set client.id on consumer? I like to label all consumers with the same client.id so I can easily define kafka quotas for this client. But now I'd have to define multiple quotas (coz_irods_project_usage-0, coz_irods_project_usage-1, coz_irods_project_usage-2 ...)
  • m

    Michał Sochoń

    09/24/2025, 6:26 PM
    hey, just started using flink 2.1.0 with kubernetes-operator on AWS EKS with session mode (our focus is on batch processing, not streaming), I've set up session cluster with example app that finishes processing, and generally it works (jobmanager is up), but my current problem is the fact that if we get an error in the processing and taskmanager pod is gone, then trying to get logs via jobmanager ends in getting http 500 because corresponding taskmanager instance is gone (pod shut down). Can someone point me how can I achieve somekind of integrated log durability for those? I know I can direct logs to other system but that's a bit inconvenient - I would prefer to somehow get a redirection in jobmanager so it is a seamless (as if in the jobmanager ui and so on, an not in external system such as loki+grafana or elastickstack). Thanks in advance.
    r
    • 2
    • 6
  • b

    Boice Lu

    09/26/2025, 2:00 AM
    Hi everyone, My team is running into an issue with Flink operator in session mode. We deploy a
    FlinkSessionJob
    that consumes data from a Kafka topic and writes it to a BigQuery table. When the JobManager starts, it seems that the initialization step — possibly checking the BigQuery table — takes a long time. During this delay, the web upload temporary file appears to be deleted, causing the job to fail. We are seeing the following exception:
    Copy code
    Caused by: java.io.FileNotFoundException: 
    /tmp/flink-web-11008d54-f3d5-4458-8b2c-531d6c88ab1a/flink-web-upload/edc628df-da1b-4b0f-b508-88b3d9891edb_flink-datastream-1.0.0.jar 
    (No such file or directory)
    Has anyone encountered this issue before? Is there a recommended way to prevent the uploaded jar file from being deleted during job initialization? Thanks in advance for any help! 🙏
    a
    • 2
    • 6
  • r

    Royston

    09/29/2025, 2:29 PM
    Hi Everyone, Have a usecase where i have to read csv files off gcs/s3 The problem I am facing is that the formats in fileSystem connector seem to be not splittable and only one subtask is reading the file I have written a custom streamFormat to resolve but was wondering if there was a inbuilt solution for this Thank you
    👍 1
    • 1
    • 1
  • j

    Jeremy Tee

    10/01/2025, 7:35 AM
    hello flink community, i am wondering is it possible for kafka source to always send out the writer schema and then piping those to iceberg?
  • l

    L P V

    10/01/2025, 12:03 PM
    ClassNotFoundException for Protobuf Message Class During EXPLAIN PLAN in Flink SQL Gateway with Remote Session Cluster I'm facing an issue with loading a custom Protobuf message class in Flink SQL using the built-in
    'format' = 'protobuf'
    for a Kafka source table. The
    CREATE TABLE
    succeeds, but
    EXPLAIN PLAN FOR SELECT *
    fails with
    ClassNotFoundException
    during schema inference/planning. This happens in a distributed setup with Flink SQL Gateway and a remote Flink Session Cluster. I've tried dynamic JAR loading via GCS (
    gs://
    ) but the class isn't resolvable at planning time, even though GCS access works (no filesystem errors). Setup: • Flink version: 1.19.1 • SQL Client startup:
    ./bin/sql-client.sh gateway --endpoint 0.0.0.0:8083 -Drest.address=xxxxx -Drest.port=8081
    • JAR:
    trackify_proto_file_deploy.jar
    (built from Protobuf-generated code via Maven/Gradle). Verified contents with `jar -tvf`: Includes
    vn/com/momo/mle/proto/v1/UserCountResult.class
    and inner classes (e.g.,
    UserCountResult$1.class
    ). It's a fat JAR with
    protobuf-java
    shaded in. Steps to Reproduce: • Start SQL Client (as above) and connect to Gateway session. • Add JAR and set config: ADD JAR 'gs://..../trackify_proto_file_deploy.jar'; SET 'pipeline.jars' = 'gs://ml-game-short-term-storage-sing/flink_sql/jars/trackify_proto_file_deploy.jar'; ◦ Both succeed:
    [INFO] Execute statement succeed.
    /
    [INFO] JAR added successfully.
    • Create table (parses fine, registers in catalog): CREATE TABLE src.kafka.test ( ....... ) WITH ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '...', 'format' = 'protobuf', 'protobuf.message-class-name' = 'vn.com.momo.mle.proto.v1.UserCountResult', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'SCRAM-SHA-512', 'properties.group.id' = ', 'properties.sasl.jaas.config' = '...', 'scan.startup.mode' = 'earliest-offset' ); Succeeds:
    [INFO] Execute statement succeed.
    • Run planning: EXPLAIN PLAN FOR SELECT * FROM src.kafka.test; • Fails with: [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: vn.com.momo.mle.proto.v1.UserCountResult What I've Tried (No Redeploy): •
    SHOW JARS;
    lists the added JAR. Question/Goal: • Why does Protobuf class loading fail at planning time (
    EXPLAIN PLAN
    ) in a Gateway + remote Session Cluster, even with
    pipeline.jars
    set to a resolvable URL (gs:// or HTTP via REST upload)? Is planning strictly client-side, or does the Gateway/JM need additional setup for dynamic JARs in custom formats like Protobuf? • How to dynamically add/update Protobuf JARs (e.g., for schema evolution in Kafka topics) without redeploying the Flink image or putting in
    /opt/flink/lib/
    ?
  • l

    Lukasz Krawiec

    10/01/2025, 1:43 PM
    Hello community, am I missing something or is it not possible to get a metric with state size per state store and task manager/slot? Currently we are using checkpoint size as proxy but it’s not ideal given there can be data skews on some partitions? Using in memory state
    d
    • 2
    • 6
  • u

    מייקי בר יעקב

    10/01/2025, 9:38 PM
    I am exploring the new forst state backend in flink 2.0 (windows 11 and java 11) I used him and set the local-dir to directory in my computer and i get EXCEPTION_ACCESS_VIOLATION from native cpp code, can someone help?
  • u

    מייקי בר יעקב

    10/02/2025, 10:20 AM
    I am exploring the new forst state backend in flink 2.0 (windows 11 and java 11) I used him and set the local-dir to directory in my computer and i get EXCEPTION_ACCESS_VIOLATION from native cpp code, can someone help?
  • m

    Madhusudhan Reddy

    10/02/2025, 3:42 PM
    Hello Everyone, We’ve developed an application with the following workflow: • Streaming data from Kafka • Performing UPSERT operations (insert/update/delete) into a Table API object after filtering • Executing a SQL join across approximately 12 Table API views • Emitting an event with the SQL results only if the join conditions are satisfied Questions: 1. How can I debug the contents of the internal Flink tables when the SQL join conditions are not met? 2. Does Flink provide any built-in API to side output records in such scenarios? 3. Is there an API available to dynamically query all 12 tables on the fly for a specific key? Could you please let me know your view on this questions @David Anderson
    d
    • 2
    • 4
  • v

    Victor Babenko

    10/03/2025, 5:02 AM
    We have a Flink job that subscribes to a topic (with 10+k/sec volume) and applies tumbling windows with aggregation to create hourly buckets; each window contains a simple counter and also a HyperLogLog structure to track uniqueness estimates. We’re using the RocksDB backend. The application should be fairly simple, but it is very slow with the RocksDB backend because the windowing operation has to read, deserialize, update, serialize and write back to disk for every incoming element, which is incredibly wasteful. With HashMap backend, it works orders of magnitude faster. Is there a way to optimize how it runs on RocksDB and reduce state reads/writes in the window? For example, process incoming events in batches rather than one at a time?
    a
    • 2
    • 1
  • c

    Chennupati Gopal

    10/03/2025, 7:35 AM
    Hi All, I have deployed Apache Flink 2.0 in Application mode in Kuberenetes(EKS) cluster with parallelism of 25 as stream job to consume data from a kafka topic. Until I had to explicitly enable jmx metrics exporter I could not receive any jmx metrics, after enabling the jmx metrics exporter I see a bad memory leak on my streaming taskmanager pods, is there any one faced similar issue, and how to fix it?
  • j

    Jaya Ananthram

    10/04/2025, 11:17 AM
    Hello 👋 Question on Serialization/Deserialization in Flink Message Processing. Assume a job topology that uses 2 TaskManagers with 4 slots each. Looking for an answer with an official documentation link 😉 What is the expected behavior of serialization/deserialization in the following scenarios? 1. Within the same subtask ◦ When a record is passed between chained operators (e.g.,
    Source → Map
    ). Does serialization/deserialization occur? 2. Between slots within the same TaskManager ◦ If two subtasks are running in different slots of the same TaskManager, does Flink serialize/deserialize the record when passing data between them? 3. Between slots across different TaskManagers ◦ When data is exchanged between subtasks (e.g., after
    keyBy
    ) located in different TaskManagers, does Flink always perform serialization/deserialization during transfer? - I believe this is obviously yes, as it is different from JVM communication (Context: I am dealing with some performance fine-tuning, so I would like to know the behaviour to understand my job bottleneck)
    d
    • 2
    • 2
  • r

    Royston

    10/04/2025, 7:27 PM
    Hi, I’m building a Flink web client to trigger jobs, and I’m running into an issue. My web client runs in a separate deployment from the Flink cluster. Right now, I’m triggering jobs using PackagedProgram, which internally calls the main() method of the JAR. The problem is that this causes all the initialization code (like configuration setup) to execute inside the web client instead of inside the Flink JobManager/TaskManagers. Is there a way to submit a Flink job without: creating a JobGraph manually via PackagedProgram, or invoking the JAR’s main() function on the client side? I know this can be done through the Flink Web UI client using /jars/run, but I was wondering if there’s a way to achieve the same thing programmatically using the RestClusterClient.
  • r

    raphaelauv

    10/05/2025, 10:46 PM
    dynamic routing and confluent schema-registry Looks like the current implementation of ConfluentRegistryAvroSerializationSchema do not support dynamic subject based on a strategy like
    io.confluent.kafka.serializers.subject.TopicNameStrategy
    but only an explicit
    subject
    does anyone, has already found a solution ? thanks all
    • 1
    • 1
  • g

    Grzegorz Liter

    10/06/2025, 9:21 AM
    Hi, we have one operator that have around 60 GB of data. One of the subtasks takes considerably longer than others. E.g. 34 mins vs 2-2.5 mins. in "sync" phase. There is no significant skew. The one that takes 34 mins have 3.05 GB while others have 1.9 GB - 2.98 GB. Any ideas what might be the cause? Thread dump just states the code is in native part:
    Copy code
    at app//org.rocksdb.Checkpoint.createCheckpoint(Native Method)
    	at app//org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:50)
    	at app//org.apache.flink.state.rocksdb.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:174)
    • 1
    • 2
  • e

    Evan NotEvan

    10/06/2025, 11:36 AM
    Hello, I am having trouble implementing an iterative computation in Flink via feedback loop. I’ve prepared a sample code and a diagram to illustrate the idea. The concept is straightforward: - Take an input number. - Repeatedly divide it by 2 until the result is less than or equal to 1. - Along the way, print all intermediate results. For example, given the input
    1024
    , the output sequence should be:
    Copy code
    1024 -> 512 -> 256 -> ... -> 1
    The interesting part is that I’m using Flink 2.1, where
    DataStream.iterate()
    and `IterativeStream`—basically the built-in Iteration API—have been removed. On top of that, Flink ML currently only supports Flink versions up to 1.17. Without these options, it’s up to the user to implement their own iteration. So far, I’ve put together the following code, which attempts to demonstrate a feedback loop. I’ve also included a diagram to better illustrate the implementation.
    Copy code
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.FilterFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    public class FeedbackLoopExample {
    
        // Define a side output for feedback
        private static final OutputTag<Integer> FEEDBACK_TAG = new OutputTag<Integer>("feedback"){};
    
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // Initial input
            DataStream<Integer> input = env.fromElements(1024)
                    .assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks());
    
            // Process function that splits into converged vs non-converged
            SingleOutputStreamOperator<Integer> processed = input.process(new ProcessFunction<Integer, Integer>() {
                @Override
                public void processElement(Integer value, Context ctx, Collector<Integer> out) {
                    if (value > 1) {
                        // Not yet converged → send to feedback
                        ctx.output(FEEDBACK_TAG, value / 2);  // iteration step
                    } else {
                        // Converged → emit as output
                        out.collect(value);
                    }
                }
            });
    
            // Feedback stream (non-converged)
            DataStream<Integer> feedback = processed.getSideOutput(FEEDBACK_TAG);
    
            // Apply the same process again (iterative feedback)
            SingleOutputStreamOperator<Integer> processedFeedback = feedback.process(new ProcessFunction<Integer, Integer>() {
                @Override
                public void processElement(Integer value, Context ctx, Collector<Integer> out) {
                    if (value > 1) {
                        ctx.output(FEEDBACK_TAG, value / 2);
                    } else {
                        out.collect(value);
                    }
                }
            });
    
            // Union feedback outputs → continue iteration
            DataStream<Integer> loopFeedback = processedFeedback.getSideOutput(FEEDBACK_TAG);
    
            // Final results = outputs from both initial and feedback iterations
            DataStream<Integer> finalResults = processed.union(processedFeedback);
    
            finalResults.print("Converged");
            loopFeedback.print("Feedback"); // shows values still circulating
    
            env.execute("Flink Feedback Loop Example");
        }
    }
    This is the output if you run the code
    Copy code
    1024 -> 512 -> 256 (stops here)
    It seems like the job only performs two iterations and then stops instead of counting down to 1. This makes me think that my attempt to loop back the feedback stream, didn’t actually work. My first question is: what’s the correct way to implement a feedback loop in Flink 2.0+ given the constrains mentioned above? Second question: is it ok if two different process functions use the same side out to write data ? Third question; is there any way to extract or analyze the graph topology of a job in Flink ?
  • f

    Fabrizzio Chavez

    10/06/2025, 9:55 PM
    Hello, is it posible to debug a flink app with visual studio code? I can see guidelines only for intellij idea
    b
    • 2
    • 2
  • d

    Dennis Sosnoski

    10/06/2025, 10:55 PM
    question about handling Avro schemas in Flink jobs - we have multiple Kafka topics we're reading from, each containing different Avro messages with different schemas (actually a couple of schema versions per topic). We'd like to run with high parallelism, but when we do we exceed the rate limits on the schema registry (either Confluent Cloud or Glue) at startup as all the parallel instance load their schemas. How do people deal with this issue? for streaming jobs we've implemented our own try/catch handling with exponential backoff to deal with the registry limits, but for table api jobs there doesn't appear to be any easy way to intercept the exceptions.
    g
    • 2
    • 4
  • a

    Ajay Sarjoo

    10/07/2025, 3:53 PM
    Hi folks, apologies in advance if this is not the right place to ask this My team and I work on a data pipeline that is comprised of Flink microservices with Kafka as the message bus inbetween services, and we are looking into approaches for tracking the lifetime of events between services to enable latency analysis. What are some approaches folks have used to enable distributed trace methodologies? I know OTEL is a common use case, do folks have others they have found success with?
    r
    j
    • 3
    • 2
  • r

    Royston

    10/09/2025, 10:50 AM
    Hi Is TextLineInputFormat Splittable? What is the alternative I can use to split a CSV file so it can be read in parallel?
  • t

    Trystan

    10/09/2025, 3:12 PM
    some of our jobs register a lot of metrics - these are primarily for prometheus scraping. some have very high cardinality (which, while not great, we can adjust prometheus to cope with to a degree). the problem seems to be that the jobmananger also scrapes these metrics. that's completely unnecessary for our use case. is there any config we might've missed that let's the job manager just scrape the metrics it finds critical - like records/sec etc (or to disable this functionality completely)? it can get bad enough that the jm gets overloaded, misses heartbeats, and causes the job to restart.
  • v

    Victor Babenko

    10/09/2025, 8:22 PM
    Does Flink support any kind of micro-batching in Datastream API? We’d like to reduce the number of times state is modified in a windowing operation (it updates state on every incoming message). Ideally we’d batch the incoming updates for each window (by count and time), and update the window in batches that way. However it doesn’t look like there is an easy way to do that without either updating state on every message anyway, or risking data loss. Has anyone tried implementing micro-batching in Datastream?
    d
    • 2
    • 5
  • r

    Royston

    10/10/2025, 3:08 PM
    Does flink support dynamic credentials for GCS/S3 Basically per job? From the flink code itself, it seems to be registering credentials at cluster creation. is there a option to do it using some FlinkConfig at Runtime
    • 1
    • 1
  • t

    Trystan

    10/10/2025, 5:19 PM
    Copy code
    Event  | Info    | SCALINGREPORT   | Scaling execution enabled, begin scaling vertices:
    { Vertex ID 5591cebca2a45b74833df01196d1a431 | Parallelism 74 -> 80 | Processing capacity 6671.45 -> 7131.00 | Target data rate 3889.81}
    { Vertex ID 162a29c968c94eac6fbdfc2f8ccc2080 | Parallelism 180 -> 128 | Processing capacity Infinity -> Infinity | Target data rate 50.67}
    { Vertex ID 009b60dbc7737a8a2e91e7b3c30d9949 | Parallelism 128 -> 169 | Processing capacity 70.68 -> 93.00 | Target data rate 50.67}
    these autoscaling decisions don't make a lot of sense to me. why on earth is it scaling UP
    009b
    ? here's the output log from right before:
    Copy code
    ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.TRUE_PROCESSING_RATE.Average: 71.325
    ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.SCALE_DOWN_RATE_THRESHOLD.Current: 176.0
    ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.SCALE_UP_RATE_THRESHOLD.Current: 56.0
    is this a catchup buffer problem? maybe it thinks it would be unable to catch up within the desired window?
  • b

    Barak Ben-Nathan

    10/12/2025, 7:35 AM
    hi guys, I am using Flink 1.18 Does anyone know if it's possible to use the
    State Processor API
    to generate both a keyed state and a broadcast state for the same operator?