https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • f

    Francisco Morillo

    10/28/2025, 7:29 PM
    Is there any way to migrate a flink 1.20 state that used collections to flink 2.1?
  • n

    Noufal Rijal

    10/29/2025, 5:59 AM
    Hi Team, I have been recently trying out the flink-k8s-operator for session mode for the auto-scaler feature. Based on the testing that we have done till date its found that - 1. We wont be able to submit an entirely different docker image to the existing cluster ( which runs on a base image) a. its because the kind FlinkSessionJob does not support any field or arg for passing the images 2. Now we were checking the possibility of passing a zipped virtual env, while submitting the job a. For this we were focusing on the argument -pyarch and were passing the blob path to the zipped env file b. The problem that we are facing here is - the flink system is not capturing the zipped env and the package that we have supplied within the new venv via the zip is said to not found
    Copy code
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: flink-session
      namespace: flink-test
    spec:
      image: pyflink-session-test:v1.0
      flinkVersion: v1_20
      imagePullPolicy: Always
      serviceAccount: flink
      mode: standalone
      
      flinkConfiguration:
        # All Flink runtime config keys go here
        fs.allowed-fallback-filesystems: "file"
        io.tmp.dirs: "/tmp"
        taskmanager.numberOfTaskSlots: "4"
        
        # ===== OPERATOR AUTOSCALER =====
        kubernetes.operator.job.autoscaler.enabled: "true"
        kubernetes.operator.job.autoscaler.target.utilization: "0.7"
        kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
        kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
        kubernetes.operator.job.autoscaler.metrics.window: "5m"
        kubernetes.operator.job.autoscaler.scale-up.grace-period: "1m"
        kubernetes.operator.job.autoscaler.scale-down.grace-period: "5m"
        
      # # 💡 MOVED: jobManager must be a direct child of 'spec'
      jobManager:
        replicas: 1
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        replicas: 2
        resource:
          # memory: "10240m"
          memory: "2048m"
          cpu: 2
    Copy code
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkSessionJob
    metadata:
      name: ppe-kafka-streaming
      namespace: flink-test
    spec:
      deploymentName: flink-session
      
      job:
    
        jarURI: /opt/flink/opt/flink-python-1.20.3.jar
        # entryClass: org.apache.flink.client.python.PythonDriver
        # args:
        #   # 1. Main PyFlink Script
        #   - "--python"  # Changed from -py
        #   - "/opt/flink/usrlib/streaming_test.py"
          
        #   # 2. Python Archives
        #   - "--pyArchives"  # Changed from -pyarch
        #   - "blob_path#flink_venv"
          
        #   # 3. Python Executable
        #   - "-pyexec"  # This is correct!
        #   - "flink_venv/bin/python" 
    
        args:
        - "-py"
        - "/opt/flink/usrlib/streaming_test.py"
        - "-pyarch"
        - "blob_path#flink_venv"
        - "-pyexec"
        - "flink_venv/bin/python"
        
        parallelism: 2
    Request for your help if you have faced and tackled a similar issue. #C065944F9M2 #C03G7LJTS2G #C03GV7L3G2C
  • s

    Saketh

    10/29/2025, 7:09 AM
    Hi all, I’m facing a serious state/memory scalability issue with my Flink enrichment pipeline. Context: Input: Streaming audit events, 90 lakh (9 million) unique SIDs per day, ~30 stages per SID, data arrives out of order. Goal: Enrich each event with certain metadata (bitset(32 char string), isMBU, isNRI, etc.) that typically arrives in a special “UOS” event for the same SID, but sometimes late or missing. Previous Approach: We buffered all out-of-order events per SID in keyed state (using MapState/ListState). When UOS arrived or “ENDOFFLOW” stage is seen, we would enrich & emit all buffered events for that SID. RocksDB config is tuned for low memory pressure: Reduced write buffer and block cache sizes (setWriteBufferSize, setBlockCacheSize, fewer write buffers) Enabled compression (LZ4, and ZSTD at bottom-most level) Set incremental cleanup and compaction filters to help with TTL-based state expiration Problem: At our scale, keyed state explodes (potentially millions of active SIDs at peak, each with buffered events), leading to frequent heap space exhaustion, large checkpoint files, and task manager restarts The moment I submit the job within 5-10 mins , the heap space will reach the max limit and the job will get restarted, Please help me in fixing this . Thanks in advance.
  • m

    Mohamed Galal

    10/29/2025, 7:19 AM
    Hi all, can someone tell me please how can I use MongoDBSource cdc connector with flink 2.1, I can't find any compatible release with flink 2.1 for MongoDB
    p
    • 2
    • 2
  • a

    AndrĂŠ Santos

    10/29/2025, 7:13 PM
    Hi folks! Found what looks like a bug in the k8s operator - when submitting session jobs in HA setups, the operator doesn't discover the leader JobManager and just connects to whatever endpoint it has configured. This means job submissions fail several times when they hit a non-leader JM pod. Anyone else run into this? The issue is in
    AbstractFlinkService.submitJobToSessionCluster()
    - it bypasses rest pod leader discovery entirely.
  • r

    Royston

    10/30/2025, 11:43 AM
    Hi I need to publish some metrics in flink , basically number of records processed or errors in flatMap function I see a issue where the total metrics are not flushed before the job ends, is there a way to do this manually Any recommendation for which metrics exporter I can use for this
    p
    • 2
    • 1
  • i

    Iain Dixon

    10/30/2025, 1:23 PM
    Hello, My name is Iain, I'm a PhD student at Newcastle University in the UK researching stream benchmarking as my topic. Specifically I'm interested in the effects of backpressure on stream benchmark results, and as such I have a question about metrics which suggest that an operator might become backpressured. According to this blog post (https://flink.apache.org/2019/07/23/flink-network-stack-vol.-2-monitoring-metrics-and-that-backpressure-thing/), the
    inPoolUsage
    and
    outPoolUsage
    are good metrics with which to assess the presence of backpressure. To test things out I built a really simple setup as seen in the picture below, where records are generated in the generator (via a loop to create the rate and Thread.sleep at the end to buff out the rest of the second, based on the DS2 wordcount here https://github.com/strymon-system/ds2/blob/master/flink-examples/src/main/java/ch/ethz/systems/strymon/ds2/flink/wordcount/sources/RateControlledSourceFunction.java), sent to a pipeline workload simulator (which is a single operator which counts the number of recieved records and runs a Thread.sleep for different frequencies in order to simulate pipeline workload), and exit to a sink where records are received but not saved or sent onwards. I bound the parallelism of each operator to 1 (to create the minimumal possible pipeline). The generator produces a constant workload of 1000 records per second, and the workload simulator produces a constant work for every n records. !

    experimental_setup▾

    Below I've included an experiment where I run my pipeline at high utilisation (Thread.sleep(1) every record, which should induce backpressure as the amount of work performed within the second exceeds a second, resulting in the buffers filling and backpressure being triggered) and a low pipeline utilisation (Thread.sleep(1) every other record, which should only create around 500ms of work which the operator should be able to handle over a second). The high workload hits a maximum outPoolUsage of the source operator, which makes sense to my understanding of how Flink handles backpressure, but the low amouont exhibits a sawtooth pattern. Looking at the isBackpressured metric of the same source operator, the low amount never triggers backpressure which makes sense, but my expectation was that the
    outPoolUsage
    would trend up to some value (as roughly 500ms of "work" should be created every second) and remain relatively constant at that value, raher than dipping back down and jumping up as seen in the graph. I'm not sure what mechanism in Flink would be responsible for this behaviour if the workload is constant, and I was wondering anyone working on Flink could explain what's occuring or point me in the right direction. I'm aware (from the linked blog post above) that the outPoolUsage metric is an aggregation of the floatingBuffersUsage and exclusiveBuffersUsage metrics, so the dropping to 10% would be one of the exclusiveBuffers, but why would floating buffers come and go if the pipeline workload and arrival rates are constant? !

    buffer_question▾

    To summarise, I'm wondering if anyone knows which mechanism is responsible for the sawtooth pattern in the outPoolUsage metric when the buffer isn't reaching 100% under a constant workload. My intuition is that it would increase to some threshold and stay (relatively) steady, but it jumps between 40% and 10%. Thank you very much for the time and assistance,! For anyone that would like to get in touch my email is i.g.dixon1@newcastle.ac.uk - Iain
    🙌 2
    👀 1
  • n

    Noufal Rijal

    11/04/2025, 4:19 PM
    Hi Team, I was going through the external resources implementation in Flink. Our usecase is we have to to model inference over the data that we receive, which requires GPU. As per the doc we have tried implementing the same, by setting up the values for nvidia. What we have observed is that - when we setup the amount = 1, each task manager that we spawn will start taking the entire GPU. This inturn is causing a critical resource scaling issue - say if we want 10TM then we need 10GPU's which is pretty costly. Is there a way by which we can solve this case, any kind of resource sharing mechanisms, or do we need to go ahead with a model server architecture. Note - we are using Nvidia T4 GPU nodes where CPU=4 , memory=28GB #C065944F9M2 #C03G7LJTS2G
    v
    • 2
    • 7
  • s

    Sidharth Ramalingam

    11/05/2025, 6:12 AM
    we are using Flink's
    AsyncIO
    function with Futures to make external gRPC calls. Currently, we have set the async capacity to 1, and we are using a blocking stub to make those calls. For each event, we trigger 4 Futures (i.e., 4 gRPC calls per event). Does this mean that the
    Executors.newFixedThreadPool()
    needs to have at least 4 threads to avoid queue starvation? Also, if we increase the async capacity to 2, should we increase the thread pool size to 8 to keep up with the parallel calls?
    r
    • 2
    • 2
  • j

    Jashwanth S J

    11/06/2025, 6:50 AM
    Team, We've baked JM and TM pod image to have jar files which are required for flinksessionjob submission. We're submitting jobs using
    jarURI: "file:///opt/flink/integration_testing_ui_demo.jar".
    Operator is failing to find the jar inside the JM/TM pod to bring up TM pods even though jar is present within image. I could exec and see the jar. Can someone help to find the cause Operator logs:
    Copy code
    2025-11-06 06:36:55,168 o.a.f.k.o.r.ReconciliationUtils [WARN ][ncm-sc/nc-int] Attempt count: 11, last attempt: false
    2025-11-06 06:36:55,259 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
    2025-11-06 06:36:55,362 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
    2025-11-06 06:36:55,459 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][ncm-sc/fsc-flink-cluster] Resource fully reconciled, nothing to do...
    2025-11-06 06:36:55,469 i.j.o.p.e.EventProcessor       [ERROR][ncm-sc/nc-int] Error during event processing ExecutionScope{ resource id: ResourceID{name='nc-int', namespace='ncm-sc'}, version: 48211671}
    org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
    	at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:130)
    	at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:58)
    	at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
    	at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
    Exec to JM Pod: I have jar inside
    Copy code
    ❯ kubectl exec -it -n ncm-sc fsc-flink-cluster-6c47f5964b-t5jhb -- bash
    Defaulted container "flink-main-container" out of: flink-main-container, setup-certs-and-plugins (init)
    root@fsc-flink-cluster-6c47f5964b-t5jhb:/opt/flink# ls
    '${sys:log.file}'   bin    examples                          lib       licenses   NOTICE   plugins        README.txt
     artifacts          conf   integration_testing_ui_demo.jar   LICENSE   log        opt      pod-template
    Describe flinksession job
    Copy code
    ❯ kubectl describe flinksessionjob nc-int -n ncm-sc | tail -n 20
    
    Status:
      Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{},"throwableList":[{"type":"java.io.FileNotFoundException","message":"/opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{}}]}
      Job Status:
        Checkpoint Info:
          Last Periodic Checkpoint Timestamp:  0
        Job Id:                                f72f53c6355212276b75452aa2dc376e
        Savepoint Info:
          Last Periodic Savepoint Timestamp:  0
          Savepoint History:
      Lifecycle State:      UPGRADING
      Observed Generation:  2
      Reconciliation Status:
        Last Reconciled Spec:      {"spec":{"job":{"jarURI":"file:///opt/flink/integration_testing_ui_demo.jar","parallelism":1,"entryClass":"com.beam.screaltime.worker.ncm.process_functions.NcmEventsJob","args":["--jobType","cdc","--cloudProvider","nx","--buildId","dev1-6404","--operatorParallelism","{\"default\":1}"],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null,"autoscalerResetNonce":null},"restartNonce":1,"flinkConfiguration":null,"deploymentName":"fsc-flink-cluster"},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","firstDeployment":true}}
        Reconciliation Timestamp:  1762411304062
        State:                     UPGRADING
    Events:
      Type     Reason               Age                  From  Message
      ----     ------               ----                 ----  -------
      Warning  SessionJobException  7m5s (x19 over 23m)  Job   /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
      Normal   Submit               7m5s (x19 over 23m)  Job   Starting deployment
    l
    • 2
    • 9
  • w

    windwheel

    11/07/2025, 8:59 AM
    https://github.com/apache/flink/pull/27152 Is anyone interested in this PR? I'm the author of this PR, and I've completed the FLIP code since its creation and fixed as many e2e tests as possible. However, no one has asked any questions in the discussion thread so far. Are there any community members with free time who could offer some suggestions? I'd really like to push this PR to be merged into the main branch.
    d
    • 2
    • 2
  • k

    Kamakshi

    11/07/2025, 10:59 PM
    Hi Team, I am trying to run Bdd tests with Flink Tables on docker. Is there any example of this?
  • e

    Elad

    11/09/2025, 1:03 PM
    Hi there Trying to deploy flink as standalone cluster on k8s WITHOUT operator (unfortunately I cannot install the required CRDs on my k8s cluster). I’m failing to deploy the cluster in high availability mode. I have read the documentation and added the needed configuration (running the job manager with the args [“standalone-job”, “—host”, “$POD_IP”] and with setting the pod ip env variable dinamicly of course ) (read deployment/resource-providers/standalone/kubernetes/#applicatiob-cluster-resource-definitions). When reading the generated high availability ConfigMap it seems like the job managers registered themselves with their localhost address- 127.0.0.1, instead of using the pod ip as needed. It also matches with the TaskManagers trying to connect to a localhost address… I’m wondering if the job managers using the —host property, and if not - what should I do to make them work? TLDR - trying to configure high availability on k8s WITHOUT operator failing - I would love to see examples for a working setup.
    l
    p
    • 3
    • 5
  • h

    Han You

    11/10/2025, 3:53 PM
    Hi team! I think there has been a change in k8s operator which breaks autoscaler standalone. In this hotfix commit, we stop all autoscaler config keys from being set on the jobmanager. However the standalone autoscaler reads those configs from jobmanager as shown in this line. The effect is that standalone autoscaler no longer works in operator version 1.13. Can someone more familiar with the operator and autoscaler confirm? I can also help make a PR to fix. Thank you!
  • v

    Vasu Dins

    11/11/2025, 6:53 AM
    Hi everyone, I’m trying to integrate Elasticsearch 7 with PyFlink (version 1.20.1) using the jar
    flink-connector-elasticsearch7-3.1.0-1.20.jar
    . when I run my job, I’m getting the following error:
    Copy code
    Exception in thread "Thread-5" java.lang.NoClassDefFoundError: org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase
    py4j.protocol.Py4JError: org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder does not exist in the JVM
    here’s a minimal example of my code
    Copy code
    from pyflink.datastream import StreamExecutionEnvironment
    from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
    import json
    
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///C:/flink-1.20.1/lib/flink-connector-elasticsearch7-3.1.0-1.20.jar")
    
    data_stream = env.from_collection([
        {"id": "1", "name": "John", "age": 25},
        {"id": "2", "name": "Jane", "age": 30}
    ])
    
    json_stream = data_stream.map(lambda x: json.dumps(x))
    
    es7_sink = Elasticsearch7SinkBuilder() \
        .set_bulk_flush_max_actions(1) \
        .set_emitter(ElasticsearchEmitter.static_index('my-index', 'id')) \
        .set_hosts(['10.0.0.102:9200']) \
        .build()
    
    json_stream.sink_to(es7_sink)
    env.execute("Flink Elasticsearch 7 Job")
    i’m running it using
    Copy code
    flink run -py sampletestelastic.py
    has anyone faced this issue before? seems like
    ElasticsearchSinkBuilderBase
    class is missing from the jar or not visible to PyFlink. do i need an extra dependency or different jar for flink 1.20.1? It seems like
    ElasticsearchSinkBuilderBase
    might be missing or not accessible from the JVM side. any guidance or suggestions would be really appreciated
    a
    p
    • 3
    • 9
  • v

    Vikas Patil

    11/11/2025, 9:02 PM
    Hello All, our pekko framesize keeps on increasing. This is causing job failures. We use incremental checkpointing and flink version 1.18.1. Is there a relation between the number of sst files and pekko payload ? The number of sst files is well over 100k for this job.
  • u

    垐嚳

    11/12/2025, 5:16 AM
    hi everyone, i resume the task from savepoint,but it failed suddenly with this problem,who can give me some tricks?thank u very much 2025-11-12 050248.618 [Source: dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546] WARN org.apache.flink.runtime.taskmanager.Task - Source: dw_realtime_inner_ods_sensor_events[1] -> Calc[2] -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Writer -> dw_realtime_inner_dwd_sensor_deal_loginresult_dtl_000[3]: Committer (1/1)#546 (765f7cffb5ee6a037a1ae2a15d0f3ed5_cbc357ccb763df2852fee8c4fc7d55f2_0_546) switched from RUNNING to FAILED with failure cause: java.io.IOException: Failed to deserialize consumer record due to at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = event_topic, partition = 1, leaderEpoch = 7, offset = 2399381243, CreateTime = 1762853256453, serialized key size = 37, serialized value size = 2917, headers = RecordHeaders(headers = [], isReadOnly = false), key = [B@54e6a925, value = [B@16f80c51). at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:59) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ... 14 common frames omitted Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:67) at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84) at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:115) at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) ... 15 common frames omitted Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at StreamExecCalc$150.processElement_0_0_rewriteGroup82(Unknown Source) at StreamExecCalc$150.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 23 common frames omitted Caused by: java.lang.IllegalStateException: Received element after endOfInput: Record @ (undef) : +I(17653228541,33333888241751361,6931888464607053,deal_loginresult,true,8548827403,,false,fido,101.230.113.1577001894070038941:????????3,HUNDSUN,CTP,b7afd566c360b4104d44d46bc4cc972e,cf1b93e8c889f5acaa5956a01a1edf30,,,188201457,,,,null,null,null,null,null,null,null,null,null,??????,app,null,null,null,null,null,null,null) at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) ... 28 common frames omitted
    p
    • 2
    • 3
  • g

    GAURAV MIGLANI

    11/12/2025, 8:30 AM
    in flink 1.19.0, is there any issue with simple pipeline kafka source->sql transformation-> kafka sink in k8s kubernetes flink operator, atleast once guarantee is failing and at time of kubernetes consolidation, data is getting dropped in flink, the pipeline is using processing time 1to 1 sink only, please help
    r
    • 2
    • 6
  • h

    Hristo Yordanov

    11/12/2025, 7:28 PM
    Hi there, Trying to explore Flink CDC Pipeline (source postgres and sink fluss). Hot storing to Flus is working well. What I'm trying is activating cold store (Minio, Paimon) using tiering service. Is running in separate container. All containers start well and cdc working as expecting in hot storage with fluss, but no cold storage is happening at all. I know that normally tiering should be activated but in pipelines with fluss sink I don't see how to doit. My question is if pipeline with fluss sink support tiering to paimon? Or i should use other approach but not pipelines?
    r
    • 2
    • 2
  • a

    Ananth bharadwaj

    11/13/2025, 2:44 AM
    Hi. I am trying to use postgres-cdc connector where my source and destination both are postgres. I have a fairly large table size 4G which is taking considerate time for initial snapshot process. I would like make use of snapshot.select.statement.overrides feature to only start snapshot from a timestamp. For example - snapshot.select.statement.overrides.public.orders=SELECT * FROM public.orders WHERE order_date > '2020-11-06'. But even after that the full snapshot gets triggered. I have specified scan.incremental.snapshot.enabled as false but still is same behaviour. Is there a way to only snapshot with a timestamp or id based condition?
  • m

    Mehdi Jalili

    11/13/2025, 2:45 PM
    Hello all. I could really use your help understanding the
    scan.watermark.idle-timeout
    . I’m using the SQL Gateway to join two tables, backed by Kafka topics. One is a fact table that receives a constant stream of data. The other one is a compact topic populated by Debezium that contains dimension data and I’m joining the two to enrich the facts. I want to sink the results to an append-only Kafka table sink which is why I have made the dimension table versioned and use a Temporal join to get the snapshot of the dimensions at the specified fact timestamp. Because the dimension stream rarely gets any new data, the watermark will usually be hours if not days behind the fact stream watermark and this will stop the output watermark from progressing. I have used
    scan.watermark.idle-timeout
    on the dimension table to let the watermark generator know to ignore the dimension table watermark but I still see the watermark on the TemporalJoin operator as the minimum of the two. What am i I doing wrong? Sorry if my grasp of these concepts are somewhat shaky. I’d really appreciate the help. Thank you. Here’s some simplified code to demonstrate the setup:
    Untitled.sql
    d
    • 2
    • 1
  • b

    Brin

    11/14/2025, 12:40 PM
    Hello all: I could really using Flink 1.20 with Flink CDC 3.3, deployed on a Kubernetes cluster, to synchronize data from MySQL to StarRocks. Both the source table (
    test.vip_record
    ) and the sink table (
    test.ods_vip_record
    ) currently have exactly the same schema, and no DDL changes have been made during the job execution. However, the job keeps failing after running for some time, and the logs show the following error:
    Copy code
    2025-11-14 20:32:03
    java.lang.IllegalStateException: Unable to coerce data record from test.vip_record (schema: null) to test.ods_vip_record (schema: null)
      at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:235)
      at java.util.Optional.orElseThrow(Optional.java:290)
      at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:232)
      at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
      at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
      at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
      at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
      at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
      at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141)
      at java.util.Collections$SingletonList.forEach(Collections.java:4824)
      at org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
      at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119)
      at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
      at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120)
      at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
      at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
      at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
      at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
      at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
      at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
      at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
      at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
      at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
      at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
      at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
      at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
      at java.lang.Thread.run(Thread.java:750)
    The key part is:
    Copy code
    schema: null
    which suggests that both the source and sink schemas are unexpectedly lost or not recognized during the runtime. --- ### Additional information * Deployment mode: Flink on Kubernetes (Native K8s Mode) * Source: MySQL 5.7 * Sink: StarRocks * Flink CDC performs snapshot + incremental reading normally after restart * After 1–2 hours of normal running, the job fails with the above error * No schema changes were made during this time --- ### Questions 1. Under what conditions would
    SchemaOperator
    encounter
    (schema: null)
    for both source and sink? 2. Could this be related to state inconsistency or schema metadata loss inside checkpoint/savepoint? 3. Is this a known issue in Flink CDC 3.3 with Flink 1.20? 4. Are there recommended configurations to ensure stable schema management for MySQL → StarRocks pipelines? Thank you!
  • m

    melin li

    11/17/2025, 7:04 AM
    hello all Deploy Flink on native Kubernetes, set "user.artifacts.artifact-list" to upload local JAR files to the pod. Also, set the pipeline.classpaths. The jvm classpaths do not include the jars that were set in the pipeline.classpaths.
  • i

    Idan Sheinberg

    11/17/2025, 11:54 AM
    Hello all, I'm trying to figure out how to setup
    taskmanager.numOfTaskSlots
    via the Flink Kubenetes Operator. I'm using the operator version 1.13.0 and Flink 1.20.3. While the configmap generated by the operator correctly reflects the set value (
    '3'
    in my case). It looks like during the actual taskmanager startup process, it resolves
    taskmanager.numOfTaskSlots=1
    as a Dynamic configuration property, which overrides the value at runtime. Has anyone successfully set a value larger than 1 in K8s in the past?
    r
    a
    • 3
    • 5
  • r

    Roberto Serafin

    11/17/2025, 8:03 PM
    Hi everyone, We are using Flink to write with fanout DynamicIcebergSink into about 300 Iceberg tables and we need to run TableMaintenance (expireSnapshots, vacuum, etc.) on all of them. Running a separate maintenance job for each table would consume a huge number of taskSlots. Is there any best practice, tool, or pattern to efficiently manage maintenance for a large number of tables, without exhausting taskSlots with hundreds of concurrent jobs? Has anyone solved this problem or implemented scalable TableMaintenance orchestration across many tables? Thanks a lot!
  • l

    Lee Wallen

    11/17/2025, 11:40 PM
    Hi everyone! We’re running a Flink 1.19.3 app on AWS EKS, using flink-connector-kafka:3.3.0-1.19 for KafkaSource and Sink. The Kafka cluster is hosted on Confluent Cloud. Issue: When the JobManager is replaced (e.g., during failover), the new JobManager starts fine, but then TaskManagers begin throwing exceptions related to
    SaslClientAuthenticator
    failing to configure. Relevant configuration: • Added
    classloader.parent-first-patterns.additional
    per Confluent Cloud support to avoid child-first classloading issues with OAuth:
    Copy code
    org.apache.kafka.common.security.oauthbearer.;
    org.apache.kafka.common.security.auth.;
    org.apache.kafka.common.security.plain.;
    org.apache.kafka.common.security.scram.
    • Both
    sasl.login.callback.handler.class
    and
    sasl.client.callback.handler.class
    are set to:
    Copy code
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
    Runtime classpath (Kafka-related):
    Copy code
    +--- io.confluent:kafka-avro-serializer:7.4.12
    |    +--- io.confluent:kafka-schema-serializer:7.4.12
    |    |    +--- io.confluent:kafka-schema-registry-client:7.4.12
    |    +--- io.confluent:kafka-schema-registry-client:7.4.12 (*)
    +--- org.apache.flink:flink-connector-kafka:3.3.0-1.19
    |    +--- org.apache.kafka:kafka-clients:3.4.0
    Stacktrace excerpt:
    Copy code
    Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
    Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
    Has anyone encountered this issue or have suggestions on how to resolve it? I'll add a thread and include a snippet with the full stacktrace.
    • 1
    • 1
  • a

    Anuj Jain

    11/18/2025, 6:24 AM
    Hello Everyone , i have deployed flink K8 operator but its not supporting joburi from s3. I have read teh documentation and get that we need to install s3 plugin while installing operator. i am using helm build to deploy operator.
    Copy code
    if you use helm to install flink-kubernetes-operator, it allows you to specify a postStart hook to download the required plugins.
    i need help how we use this posStart hook, is there any example ?
    l
    • 2
    • 17
  • a

    Ashish Marottickal Gopi

    11/19/2025, 11:10 AM
    Hi Everyone, Im developing a new Flink Application (real time stock out) which will be reading from various Kafka topics ( 6 so far ) . I had some questions I needed to clarify from the experts here: 1. All topics are of different schema's , So I started with creating individual source in flink per topic. I believe this is correct ? 2. I wanted to use event time as this is important for this application ( Important wrt order of the events) . But this is not needed for all the topics I consume from as some of them are low throughput reference data. When I say event time, I dont want to consider late arriving data. I added watermark in Kafka Source itself as i read about per partition watermarks. I wanted to clarify if this is necessary or simply reading with Process time semantics and in the Joins ( KeyedCoprocessFunctions etc ) just keeping the last read business event timestamp and only considering newer one's will do ? a. One reason for this thought is because the topic contains data from all markets and I dont want by introducing event time and lateness configs to ignore a different market's event as late arriving. Another is as mentioned above, the latest event for a key is whats important for me. b. The thing that's worrying about event time and watermarks is that since Im reading from multiple topics with different throughputs I dont want a situation where data is not emitted due to watermark issues as this application needs minimum end to end latency.
  • j

    Jatin Kumar

    11/19/2025, 2:03 PM
    Hello, i have a use case, to read from s3 then write to iceberg(on s3), in addition to that i want to create rocksdb state and savepoint/checkpoint for the same when job finish. so that i can start from savepoint/checkpoint for another job, this is warmup/backfill phase then i will start the job to read from kafka once backfill is done then checkpointing is quite easy any thoughts?
    j
    • 2
    • 1
  • x

    Xianxin Lin

    11/19/2025, 8:32 PM
    Hi Everyone, Looking into k8s operator autoscaler doc, is there anyway to support out of box metrics for scaling decision
    Copy code
    Collected metrics:
    
    Backlog information at each source
    Incoming data rate at the sources (e.g. records/sec written into the Kafka topic)
    Record processing rate at each job vertex
    Busy and backpressured time at each job vertex