https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Rushikesh Gulve

    06/06/2025, 9:12 AM
    Hi All, I am working on a flink project and I want to maximize utilization od resources. Currently I have 10 Task Managers in my cluster, each having 1 task slot. I am producing data with 10 distinct keys and I expect to use all the 10 task managers, but I end up using only 6 task managers. How do I maximize utilization. I have tried using murmurhash code to find the keys that should be directed to different parallel instances of a subtask, but nothing seems to work.
    Copy code
    import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    
    public class FlinkKeyGroupMapper {
    
        public static void main(String[] args) {
            final int maxParallelism = 128;  // Flink maxParallelism
            final int parallelism = 10;      // Number of subtasks
            final int keysNeeded = parallelism;
    
            // Map: subtask index -> key found
            java.util.Map<Integer, String> subtaskToKey = new java.util.HashMap<>();
    
            int i = 100;
            int configId = 20003;
    
            while (subtaskToKey.size() < keysNeeded) {
                String key = configId + "_" + i;
    
                // Compute key group for this key (Flink does MurmurHash inside)
                int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
    
                // Find which operator subtask the key group maps to
                int subtaskIndex = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
                        maxParallelism, parallelism, keyGroup);
    
                // If this subtask index not assigned a key, assign it
                if (!subtaskToKey.containsKey(subtaskIndex)) {
                    subtaskToKey.put(subtaskIndex, key);
                    System.out.println("Subtask " + subtaskIndex + " -> Key: " + key);
                }
    
                i++;
            }
        }
    }
    this is how I have generated the keys, and my key selector in flink also uses the same key. Can anyone guide me in how can I determine which parallel instance of subtask my current key should be redirected to? Thanks
    f
    • 2
    • 1
  • s

    Sandip Nayak

    06/08/2025, 10:32 PM
    Hi All, I am exploring
    Disaggregated State Management
    with
    ForSt
    state backend on
    Flink 2.0
    , even with the following config, during Flink app restoring from state, I see the the full checkpoints are downloaded into the taskmanager pod, do you know what I might be missing?
    Copy code
    state.backend.type: "forst"
        table.exec.async-state.enabled: "true"
        execution.checkpointing.incremental: "true"
        table.exec.mini-batch.enabled: "false"
        table.optimizer.agg-phase-strategy: "ONE_PHASE"
    p
    • 2
    • 3
  • y

    Yarden BenMoshe

    06/10/2025, 7:00 AM
    Hi everyone, I am working on a use case that involves writing avro records to a kafka topic, from pipeline-a using DataStream API (Without schema registry, performance concerns). Then, using pipeline-b i create a table with the expected format and try to consume data from the topic to go down the stream. I have an issue with serialization, as the avro connector for Table API generates a schema with name of record = "record" and namespace="org.apache.flink.avro.generated". Have anyone experienced similar scenario, and can maybe advise what are my options here?
    p
    • 2
    • 2
  • m

    Monika Bednarz

    06/11/2025, 9:29 AM
    Hi Team! I truly hope I can get some inspiration or a push in the right direction from you. Context: We deployed Flink on K8s with the official operator. We want to deploy the SQL Gateway, which will allow users to query Kafka topics. We use the JAAS config in normal jobs and the SQL client CLI, like below. We want to eliminate passing the password in cleartext in the API call to the gateway. How to do that properly? The container can have the password as env variable or a file config, but I couldn't make it work (JAAS config path seems to be hardcoded and sql client doesn't support jinja and embedding the env variables). Please share ideas and what worked for you 🙂 🙏 SQL we run:
    Copy code
    CREATE TABLE source__topic ( ... ) WITH (
                  'connector' = 'kafka',
                  'value.format' = 'avro-confluent',
    ...
                  'properties.security.protocol' = 'SASL_SSL',
                  'properties.sasl.mechanism' = 'SCRAM-SHA-512',
                  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="..." password="...";');
    We want to get rid of the explicit config passing. (if there's some form of authentication that doesn't require this in cleartext, also please point me to it 🙏 )
    • 1
    • 1
  • t

    Tiansu Yu

    06/11/2025, 12:00 PM
    Hi, I have a date column, I would like to express date at midnight 000, is the event timestamp of my table (convert date to timestamp_ltz) how could i do that on Flink SQL?
  • d

    Dheeraj Panangat

    06/11/2025, 1:23 PM
    Hi All, Running into an issue with Flink Autoscaling Implementation : • Using Flink 1.20.1 with Flink Operator 1.12.0 for deployment Issue : • The job stops processing data in between during autoscaling whenever it scales up • We want to auto-scale with Exactly Once and without Exactly Once jobs What we tried : • Enabled Unalligned checkpoints -> though the job never stopped processing, this resulted in data loss Appreciate any inputs here. Is there no way to ensure autoscaling without data loss and exactly-once semantics ? CC: @Abhishek Joshi Thanks.
  • m

    MohammadReza Shahmorady

    06/12/2025, 1:17 AM
    Hi everyone, I'm experiencing an issue with batch producing using Kafka sink. I've configured the following configs:
    Copy code
    <http://linger.ms|linger.ms>=500  
    batch.size=1048576  
    compression.type=lz4
    However, when there's significant backpressure, the producer doesn't start sending data as expected and takes a very long time. When I remove the
    batch.size
    configuration, performance slightly improves, but it's still not ideal. I've attached a screenshot with more details in threads. Does anyone have any suggestions on how to resolve this?
    • 1
    • 1
  • l

    L P V

    06/12/2025, 7:51 AM
    Hi everyone. I"m facing an issue with convert data stream to table
    Copy code
    DataStream<Row> merchantQuasiFeatureCreditFilterRow = merchantQuasiFeatureCreditFilter
            .map(message -> Row.of(
                message.getEntity(),        // Field 0
                message.getFeatureSet(),    // Field 1
                message.getTimestamp()    ,  // Field 2
                sha256(message.getEntity())  // Field 3
            ))
            .returns(Row.class);
        tableEnv.createTemporaryView("datastream_table", merchantQuasiFeatureCreditFilterRow,
        Schema.newBuilder()
                .columnByExpression("entity", "cast(f0 as string)")
                .columnByExpression("proctime", "PROCTIME()") //
                .columnByExpression("entityHash", "cast(f3 as string)") // Hashed entity
            .build()
        );
    It show error when deploy Flink job:
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Invalid expression for computed column 'entityHash'.
    I don't know why because I've already add field 3 to data stream.
    p
    • 2
    • 14
  • r

    Rushikesh Gulve

    06/12/2025, 9:44 AM
    Hi everyone, I am trying to deploy Flink using Kubernetes operator.
    Copy code
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: data-processing-flink
      namespace: flink
    spec:
      image: 624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest
      flinkVersion: "v1_20"
      serviceAccount: flink
      flinkConfiguration:
        rest.port: "8081"
        jobmanager.rpc.address: "data-processing-flink"
        jobmanager.rpc.port: "6123"
        taskmanager.numberOfTaskSlots: "4"
      podTemplate:
        spec:
          imagePullSecrets:
            - name: ecrscr-credentials
          containers:
            - name: flink-main-container
              imagePullPolicy: Always
              envFrom:
                - secretRef:
                    name: redis-secret-main
              env:
                - name: KAFKA_BROKER
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: BOOTSTRAP_SERVERS
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: MINIO_SERVER
                  value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
                - name: MINIO_USER
                  value: "minio"
                - name: MINIO_PASSWORD
                  value: "minio123"
                - name: MAX_PARALLELISM
                  value: "128"
          securityContext:
            runAsUser: 9999
            runAsGroup: 9999
      jobManager:
        resource:
          memory: "2048m"
          cpu: 0.4
      taskManager:
        resource:
          memory: "4096m"
          cpu: 2
        replicas: 8
      job:
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
        upgradeMode: stateless
    Though I have set 4gb memory explicitly per Task Manager but it ends up using lot more than that and ultimately goes Out Of Memory. I have 2 questions:
    Copy code
    Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [<pekko.tcp://flink@192.168.164.1:6122/user/rpc/taskmanager_0>] timed out. This is usually caused by: 1) Pekko failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase pekko.ask.timeout.
    1. Can I set any limits to to resource utilization? 2. Is it normal to use this much memory or is there something I can do to reduce consumption?
    a
    p
    • 3
    • 8
  • f

    Fabrizzio Chavez

    06/12/2025, 2:04 PM
    HI, I can see this diagram in the documentation of flink SQL gateway, but I can't find a guideline related to the connection with dbeaver, do you know how to configure it?
    r
    • 2
    • 4
  • f

    Fabrizzio Chavez

    06/14/2025, 2:52 PM
    Hello, I have a question related to flink SQL and a stream flow: I will receive an unbounded event stream from a pulsar source called player, so I want to sink to other pulsar topic with this query:
    Copy code
    INSERT INTO player_scores_sink
    SELECT
        playerId,
        MAX(score) AS maxScore
    FROM
        players_source
    GROUP BY 
        playerId;
    According to the documentation the group by will generate a stateful app, but my doubt is what is stored? only the playerId and the maxScore (two integer values) ? or the history of events that came from the players_source also will be stored? I only want to store the playerId and the maxScore so when a new event of this player arrives I want to compare only with the previous aggregation, but not with all historical events to prevent uncontrollable storage growth
  • g

    George Leonard

    06/15/2025, 4:02 PM
    want to confirm.... personally written connectors can be placed in $FLINK_HOME/lib or a subdirectory. i got my prometheus connector in ...HOME/lib/flink/ I have other connectors/jars in HOME/lib/fluss and HOME/lib/hive i can see the relevant jar loaded in the echo of the CLASSPATH in the standalone log file.
  • r

    Rushikesh Gulve

    06/16/2025, 6:47 AM
    Hi everyone, I am trying to deploy Flink using Kubernetes operator. After introducing checkpointing my task manager manager pods are failing continuously on same resource and same work load. I tried using profiling to find the root cause but I could only see spike in resource consumption once the before the task manager pods fail.
    Copy code
    kind: FlinkDeployment
    metadata:
      name: data-processing-flink
      namespace: flink
    spec:
      image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
      flinkVersion: "v1_20"
      serviceAccount: flink
      flinkConfiguration:
        rest.port: "8081"
        jobmanager.rpc.address: "data-processing-flink"
        jobmanager.rpc.port: "6123"
        taskmanager.numberOfTaskSlots: "2"
    
        # High Availability
        high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
        high-availability.storageDir: file:///flink-data/ha
    
        # Checkpoints and Savepoints
        state.checkpoints.dir: file:///flink-data/checkpoints
        state.savepoints.dir: file:///flink-data/savepoints
        state.backend: rocksdb
        state.backend.incremental: true
    
        rest.profiling.enabled: "true"
        env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
    
      podTemplate:
        spec:
          imagePullSecrets:
            - name: ecrscr-credentials
          containers:
            - name: flink-main-container
              imagePullPolicy: Always
              envFrom:
                - secretRef:
                    name: redis-secret-main
              env:
                - name: KAFKA_BROKER
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: BOOTSTRAP_SERVERS
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: MINIO_SERVER
                  value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
                - name: MINIO_USER
                  value: "minio"
                - name: MINIO_PASSWORD
                  value: "minio123"
                - name: MAX_PARALLELISM
                  value: "128"
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-pvc
          volumes:
            - name: flink-pvc
              persistentVolumeClaim:
                claimName: flink-data-pvc
          securityContext:
            runAsUser: 9999
            runAsGroup: 9999
    
      jobManager:
        resource:
          memory: "3072m"
          cpu: 0.4
    
      taskManager:
        resource:
          memory: "3584m"
          cpu: 2
        replicas: 2
    
      job:
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
        upgradeMode: last-state
    This is the deployment file I am using. My application is already consuming a lot of resources and I cannot afford allocating more resources to it. Also the checkpointing size is also 2-3 mb for a few successful checkpoint that I observed. Can anyone guide from where can I start to debug this issue. Thanks
    a
    • 2
    • 13
  • n

    Nick Mosin

    06/16/2025, 12:40 PM
    hi all, I have simple k8s session cluster deployment. When I submit my job it fail (I now why, that not a problem). But now I see that that job always in RESTARTING state and I can't stop it via UI because no cancel button or other options. How to stop and remove such job? Flink 1.20.1
    • 1
    • 1
  • n

    Nick Mosin

    06/16/2025, 2:20 PM
    another question: how I could enable loading flink-s3-fs-presto-1.20.1.jar if it in opt dir in docker?
    • 1
    • 1
  • n

    Nick Mosin

    06/16/2025, 3:46 PM
    and more S3 problems. I already set libs in ENABLE_BUILT_IN_PLUGINS, seen in jobmanager logs
    Copy code
    Enabling required built-in plugins
    Linking flink-s3-fs-presto-1.20.1.jar to plugin directory
    Successfully enabled flink-s3-fs-presto-1.20.1.jar
    Linking flink-s3-fs-hadoop-1.20.1.jar to plugin directory
    Successfully enabled flink-s3-fs-hadoop-1.20.1.jar
    and even
    Copy code
    Multiple providers loaded with the same prefix: s3. This might lead to unintended consequences, please consider using only one of them.
    but anyway I got
    Copy code
    Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto.
    r
    • 2
    • 3
  • t

    Tiansu Yu

    06/17/2025, 8:06 AM
    Hi, I have just noticed that flink confluent avro will use legacy timestamp mapping, which results in
    Copy code
    java.lang.UnsupportedOperationException: Unsupported to derive Schema for type: TIMESTAMP_LTZ(3)
    at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:494) ~[flink-sql-avro-1.19.2.jar:1.19.2]
    however, this argument is only available for flink avro not flink confluent avro format. Therefore it is unable to cast unix timstamp in avro properly as a source table in Flink, while using confluent-avro format (where you can hook schema registry). Is this a known bug already?
    p
    • 2
    • 3
  • r

    Rushikesh Gulve

    06/17/2025, 12:09 PM
    Hi everyone, I am trying to deploying pyflink application in kubernetes. I am facing issue when trying to configure checkpointing. I have 2 task manager pods and 3.5 gb memory per task manager. This configuration works fine when I do not have checkpointing. When I configure checkpointing, I get OOMKilled in task manager pod and when I configure unaligned-checkpointing, I get sigsegv crash. This is my deployment file.
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: data-processing-flink
      namespace: flink
    spec:
      image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
      flinkVersion: "v1_20"
      serviceAccount: flink
      flinkConfiguration:
        rest.port: "8081"
        jobmanager.rpc.address: "data-processing-flink"
        jobmanager.rpc.port: "6123"
        taskmanager.numberOfTaskSlots: "2"
    
        # High Availability
        high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
        high-availability.storageDir: file:///flink-data/ha
    
        # Checkpoints and Savepoints
        state.checkpoints.dir: file:///flink-data/checkpoints
        state.savepoints.dir: file:///flink-data/savepoints
        state.backend.type: rocksdb
        
        execution.checkpointing.incremental: "true"
       # execution.checkpointing.alignment-timeout: "3000"  # ms, after which checkpoint switches to unaligned mode dynamically
    
        #execution.checkpointing.unaligned: "true"                  # Enable unaligned checkpoints
    
        #rest.profiling.enabled: "true"
        #env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
    
      podTemplate:
        spec:
          imagePullSecrets:
            - name: ecrscr-credentials
          containers:
            - name: flink-main-container
              imagePullPolicy: Always
              envFrom:
                - secretRef:
                    name: redis-secret-main
              env:
                - name: KAFKA_BROKER
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: BOOTSTRAP_SERVERS
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: MINIO_SERVER
                  value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
                - name: MINIO_USER
                  value: "minio"
                - name: MINIO_PASSWORD
                  value: "minio123"
                - name: MAX_PARALLELISM
                  value: "128"
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-pvc
          volumes:
            - name: flink-pvc
              persistentVolumeClaim:
                claimName: flink-data-pvc
          securityContext:
            runAsUser: 9999
            runAsGroup: 9999
    
      jobManager:
        resource:
          memory: "3072m"
          cpu: 0.4
    
      taskManager:
        resource:
          memory: "3584m"
          cpu: 2
        replicas: 2
    
      job:
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
        upgradeMode: stateless
    I am not able find the root cause or the best way to configure checkpointing. Does anyone have any idea as to how I shall proceed with this?
    p
    • 2
    • 1
  • l

    Leong Wai Leong

    06/17/2025, 2:10 PM
    I have been trying to look for dynamic CEP , under FLIP-200 , I understand that this is being offered by ververica and alicloud, is there any implementation available for on prem usage instead
    p
    • 2
    • 2
  • a

    Apollo Elon

    06/18/2025, 4:05 PM
    Hello everyone! Urgent help!! I encountered some serialization problems when using flink-connect-kafka, but I traced it down and didn't know exactly what the problem was. Attached are my code and error messages.
    Copy code
    KafkaSourceBuilder<RichCdcMultiplexRecord> kafkaSourceBuilder = KafkaSource.builder();
    kafkaSourceBuilder.setTopics(Context.KafkaOptions.topic)
            .setBootstrapServers(Context.KafkaOptions.bootstrapServer)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new RichCdcMultiplexRecordDeserializer())
            .setGroupId(Context.KafkaOptions.groupId);
    public class RichCdcMultiplexRecordDeserializer
            implements DeserializationSchema<RichCdcMultiplexRecord> {
    
        @Override
        public RichCdcMultiplexRecord deserialize(byte[] message) throws IOException {
            RichCdcMultiplexRecord richCdcMultiplexRecord = RichCdcMultiplexRecordDeserializer.deserializeForValue(message);
            return richCdcMultiplexRecord;
        }
    
        @Override
        public boolean isEndOfStream(RichCdcMultiplexRecord nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<RichCdcMultiplexRecord> getProducedType() {
            return TypeInformation.of(RichCdcMultiplexRecord.class);
        }
    
        public static RichCdcMultiplexRecord deserializeForValue(byte[] value) {
            return KryoSerializerUtil.deserialize(value, RichCdcMultiplexRecord.class);
        }
    }
    I'm sure the Value of the data in Kafka is in the RichCdcMultiplexRecord format. The data has been read from Kafka because the serialization method debug has observed it. However, there was a problem when writing to the next operator. Thank you all!!!
    2025-06-18 23:18:09
  • s

    Sumit Nekar

    06/19/2025, 6:04 AM
    Query regarding flink autoscaler. If anyone is facing similar problem (i had posted in stack overflow also) then please help to address the same. Thanks https://stackoverflow.com/questions/79665550/flink-autoscaler-not-scaling-down-task-manger-pods?noredirect=1#comment140518279_79665550
  • a

    Artsiom Yudovin

    06/20/2025, 10:17 AM
    Hi, I use SQL hint to set statet ttl
    Copy code
    SELECT /*+ STATE_TTL('session'='12h', 'userState'='1d', 'authorizedUserState'='365d') */
    it looks like it is not working, do anybody have any idea what it can happend?
    g
    • 2
    • 1
  • a

    Akash Patel

    06/20/2025, 2:04 PM
    Hi, We deploy Flink in Session Cluster mode with the FlinkDeployment CRD and need guidance on two issues: a. Separate resource requests vs. limits Can we specify different values for requests and limits? At the moment, the CPU- and memory-values defined for the JobManager or TaskManager are applied to both; I don’t see a separate setting. a. TaskManager out-of-memory (Metaspace) errors Some TaskManager pods crash with Metaspace OOM. We could raise the cluster-level memory, but a single session cluster may run many jobs, and only a few need the extra memory. For example, in a cluster running 20 jobs, 5 of them trigger OOM, the affected pod is killed, a new one is created, and the job restarts. How can we isolate or control these heavy jobs so they don’t fail with out of memory issue? Currently we don’t want to move them to a new session cluster. Thank You
  • r

    Rushikesh Gulve

    06/23/2025, 10:31 AM
    Hi everyone, I have a flink application deployed using kubernetes operator. I have 40 task manager pods and I am using unaligned checkpointing. The tolerable failed checkpoint is 3. I got these logs in one of my task manager where task manager did not log anything for 15 minutes and then logged a SIGSEGV error. During this interval 3 checkpoints have failed and all task managers are killed.
    Copy code
    # JRE version: OpenJDK Runtime Environment Temurin-11.0.27+6 (11.0.27+6) (build 11.0.27+6)
    2025-06-23 11:43:03.749	
    #
    2025-06-23 11:43:03.749	
    #  SIGSEGV (0xb) at pc=0x00007fa5f75ff898, pid=1, tid=3479
    2025-06-23 11:43:03.748	
    #
    2025-06-23 11:43:03.748	
    # A fatal error has been detected by the Java Runtime Environment:
    2025-06-23 11:43:03.748	
    #
    2025-06-23 11:43:03.748	
    pthread lock: Invalid argument
    2025-06-23 11:29:59.808	
    2025-06-23 09:29:59,808 INFO  /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - after minio data_output: WindowProcessComputeIOStates-5-1736416800000, 64558e0e8caa4ebabc262d76e8117256, 1736416800000
    During this time, other task managers are logging a failiure in checkpointing.
    Copy code
    org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-dist-1.20.1.jar:1.20.1]
    2025-06-23 11:33:00.661	
    	at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:127) ~[flink-dist-1.20.1.jar:1.20.1]
    2025-06-23 11:33:00.661	
    2025-06-23 09:33:00,634 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - KEYED PROCESS -> (Extract-Timestamp -> Timestamps/Watermarks -> Remove-Timestamp -> Map -> Sink: Writer -> Sink: Committer, Map, Map -> Sink: Writer -> Sink: Committer) (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
    2025-06-23 11:33:00.661	
    2025-06-23 09:33:00,629 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: prepareData->ProcessComputePartTypeTransactions -> _stream_key_by_map_operator (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
    2025-06-23 11:33:00.661	
    java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.
    I am guessing the one task manager whcih got unresponsive was responsible for the checkpoint failure which happened due to timeout. Can anyone help with identifying the root cause?
  • t

    Tudor Plugaru

    06/24/2025, 1:45 PM
    Hello team! Does anyone added distributing tracing capabilities to their Flink pipelines, as-in reading Kafka records with tracing headers and writing to Kafka also with tracing headers while preserving the same
    traceparent
    ? Currently, from the work I did it seems that it's very very hard to properly do it, unless you pass tracing metadata as part of the POJO that get's passed from operator to operator, but even in this case, you'll have to "activate" the trace in each operator of your pipeline. I found this to be not very DX friendly and very error prone. We're on Flink 1.18 if it's important. I'm looking for inspiration mainly on how to do it nicer and more DX friendly, so open for suggestions/ideas. Thanks
    p
    • 2
    • 1
  • g

    George Leonard

    06/25/2025, 5:32 AM
    wonder if anyone can help, have the following import... and here is the associated pom entry, import is stll failing as far as vscode and mvn compile is concerned. from code.
    Copy code
    import org.apache.flink.api.common.serialization.SerializableSerializer;
    ...
    
        @Override
        public SimpleVersionedSerializer<SnmpSourceSplit> getSplitSerializer() {
            return new SerializableSerializer<>();
        }
    
        @Override
        public SimpleVersionedSerializer<List<SnmpSourceSplit>> getEnumeratorCheckpointSerializer() {
            return new SerializableSerializer<>();
        }
    parent pom.xml
    Copy code
    <properties>
            <flink.version>1.20.1</flink.version>
            <java.version>17</java.version>
            <maven-compiler.version>3.11.0</maven-compiler.version>
            <maven-shade.version>3.5.2</maven-shade.version>
            <maven-compiler.source>${java.version}</maven-compiler.source>
            <maven-compiler.target>${java.version}</maven-compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         ...
    local pom.xml
    Copy code
    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    a
    • 2
    • 6
  • f

    Fabrizzio Chavez

    06/25/2025, 1:50 PM
    Hello!, I would like to know if anyone has experience configuring flink sql gateway with the flink kubernetes operator, I've tried to declare a deployment running the command ./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest-address=0.0.0.0, the endpoint /v1/info works fine, but when I send a sql query using the flink jdbc driver, it returns an error after 1min, how can I configure the the jobmanager urls?
  • a

    Adam Richardson

    06/26/2025, 3:27 AM
    Hi all. I have a few questions about the legacy two-phase commit sink. My application reads data from Kafka and writes it to a Delta Lake via the open-source Delta sink, which is implemented as a 2PC sink using the legacy APIs (
    org.apache.flink.api.connector.sink.{Writer,Committer,GlobalCommitter,Sink,...}
    ). I'm observing strange behavior where consumer group latency on Kafka is close to zero after a checkpoint, but the data in the output Delta table lags significantly. I can see in the TM logs that Delta commits are happening quickly and reliably after each checkpoint, but recent data is still missing. After some digging I found a reference in an unrelated issue on Flink: "it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline". This lines up very well with the symptoms I'm seeing -- my app checkpoints every 5m, and I'm seeing latency in the output table fluctuating from 5m (immediately after a checkpoint) to 10m (immediately before the next checkpoint). My questions: • Is it expected behavior that the committed data consistently lags behind by one checkpoint? Are there any more authoritative docs on this behavior? • Is there any workaround (config or code changes) to avoid/fix this behavior? • Does the v2 sink API have this problem? Thank you!
  • r

    Rushikesh Gulve

    06/26/2025, 6:49 AM
    Hi everyone, I have a flink application deployed using kubernetes operator. I have 40 task manager pods and I am using unaligned checkpointing. The tolerable failed checkpoint is 3. I got these logs in one of my task manager where task manager did not log anything for 15 minutes and then logged a SIGSEGV error. During this interval 3 checkpoints have failed and all task managers are killed.
    Copy code
    # JRE version: OpenJDK Runtime Environment Temurin-11.0.27+6 (11.0.27+6) (build 11.0.27+6)
    2025-06-23 11:43:03.749	
    #
    2025-06-23 11:43:03.749	
    #  SIGSEGV (0xb) at pc=0x00007fa5f75ff898, pid=1, tid=3479
    2025-06-23 11:43:03.748	
    #
    2025-06-23 11:43:03.748	
    # A fatal error has been detected by the Java Runtime Environment:
    2025-06-23 11:43:03.748	
    #
    2025-06-23 11:43:03.748	
    pthread lock: Invalid argument
    2025-06-23 11:29:59.808	
    2025-06-23 09:29:59,808 INFO  /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - after minio data_output: WindowProcessComputeIOStates-5-1736416800000, 64558e0e8caa4ebabc262d76e8117256, 1736416800000
    During this time, other task managers are logging a failiure in checkpointing.
    Copy code
    org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-dist-1.20.1.jar:1.20.1]
    2025-06-23 11:33:00.661	
    	at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:127) ~[flink-dist-1.20.1.jar:1.20.1]
    2025-06-23 11:33:00.661	
    2025-06-23 09:33:00,634 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - KEYED PROCESS -> (Extract-Timestamp -> Timestamps/Watermarks -> Remove-Timestamp -> Map -> Sink: Writer -> Sink: Committer, Map, Map -> Sink: Writer -> Sink: Committer) (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
    2025-06-23 11:33:00.661	
    2025-06-23 09:33:00,629 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: prepareData->ProcessComputePartTypeTransactions -> _stream_key_by_map_operator (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
    2025-06-23 11:33:00.661	
    java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.
    I am guessing the one task manager whcih got unresponsive was responsible for the checkpoint failure which happened due to timeout. Can anyone help with identifying the root cause?
  • m

    Magdalena Kobusch

    06/26/2025, 8:19 AM
    Does Flink support Amazon s3 Table buckets for Catalog definition? Similar to this example for Spark? I'm attempting to define a catalog on s3 table warehouse in Flink TableAPI, like this:
    Copy code
    tEnv.executeSql(
                        "CREATE CATALOG pulse_table WITH ("
                                + "'type'='iceberg',"
                                + "'warehouse'='arn:aws:s3tables:us-east-1:058264243434:bucket/kyk-dataeng-table-iceberg',"
                                + "'catalog-impl'='org.apache.iceberg.aws.s3tables.S3TablesCatalog',"
                                + ")"
                );
    But that is not working for me. I have added:
    Copy code
    dependencies {
        implementation 'software.amazon.awssdk:s3tables:2.29.26'
        implementation 'software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.7'
    }
    But I'm still getting:
    Copy code
    Exception in thread "main" java.lang.IllegalArgumentException: Cannot initialize Catalog implementation org.apache.iceberg.aws.s3tables.S3TablesCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
    	Missing org.apache.iceberg.aws.s3tables.S3TablesCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.aws.s3tables.S3TablesCatalog]
    a
    • 2
    • 8