https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • g

    George Leonard

    06/15/2025, 4:02 PM
    want to confirm.... personally written connectors can be placed in $FLINK_HOME/lib or a subdirectory. i got my prometheus connector in ...HOME/lib/flink/ I have other connectors/jars in HOME/lib/fluss and HOME/lib/hive i can see the relevant jar loaded in the echo of the CLASSPATH in the standalone log file.
  • r

    Rushikesh Gulve

    06/16/2025, 6:47 AM
    Hi everyone, I am trying to deploy Flink using Kubernetes operator. After introducing checkpointing my task manager manager pods are failing continuously on same resource and same work load. I tried using profiling to find the root cause but I could only see spike in resource consumption once the before the task manager pods fail.
    Copy code
    kind: FlinkDeployment
    metadata:
      name: data-processing-flink
      namespace: flink
    spec:
      image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
      flinkVersion: "v1_20"
      serviceAccount: flink
      flinkConfiguration:
        rest.port: "8081"
        jobmanager.rpc.address: "data-processing-flink"
        jobmanager.rpc.port: "6123"
        taskmanager.numberOfTaskSlots: "2"
    
        # High Availability
        high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
        high-availability.storageDir: file:///flink-data/ha
    
        # Checkpoints and Savepoints
        state.checkpoints.dir: file:///flink-data/checkpoints
        state.savepoints.dir: file:///flink-data/savepoints
        state.backend: rocksdb
        state.backend.incremental: true
    
        rest.profiling.enabled: "true"
        env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
    
      podTemplate:
        spec:
          imagePullSecrets:
            - name: ecrscr-credentials
          containers:
            - name: flink-main-container
              imagePullPolicy: Always
              envFrom:
                - secretRef:
                    name: redis-secret-main
              env:
                - name: KAFKA_BROKER
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: BOOTSTRAP_SERVERS
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: MINIO_SERVER
                  value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
                - name: MINIO_USER
                  value: "minio"
                - name: MINIO_PASSWORD
                  value: "minio123"
                - name: MAX_PARALLELISM
                  value: "128"
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-pvc
          volumes:
            - name: flink-pvc
              persistentVolumeClaim:
                claimName: flink-data-pvc
          securityContext:
            runAsUser: 9999
            runAsGroup: 9999
    
      jobManager:
        resource:
          memory: "3072m"
          cpu: 0.4
    
      taskManager:
        resource:
          memory: "3584m"
          cpu: 2
        replicas: 2
    
      job:
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
        upgradeMode: last-state
    This is the deployment file I am using. My application is already consuming a lot of resources and I cannot afford allocating more resources to it. Also the checkpointing size is also 2-3 mb for a few successful checkpoint that I observed. Can anyone guide from where can I start to debug this issue. Thanks
    a
    g
    • 3
    • 14
  • n

    Nick Mosin

    06/16/2025, 12:40 PM
    hi all, I have simple k8s session cluster deployment. When I submit my job it fail (I now why, that not a problem). But now I see that that job always in RESTARTING state and I can't stop it via UI because no cancel button or other options. How to stop and remove such job? Flink 1.20.1
    • 1
    • 1
  • n

    Nick Mosin

    06/16/2025, 2:20 PM
    another question: how I could enable loading flink-s3-fs-presto-1.20.1.jar if it in opt dir in docker?
    • 1
    • 1
  • n

    Nick Mosin

    06/16/2025, 3:46 PM
    and more S3 problems. I already set libs in ENABLE_BUILT_IN_PLUGINS, seen in jobmanager logs
    Copy code
    Enabling required built-in plugins
    Linking flink-s3-fs-presto-1.20.1.jar to plugin directory
    Successfully enabled flink-s3-fs-presto-1.20.1.jar
    Linking flink-s3-fs-hadoop-1.20.1.jar to plugin directory
    Successfully enabled flink-s3-fs-hadoop-1.20.1.jar
    and even
    Copy code
    Multiple providers loaded with the same prefix: s3. This might lead to unintended consequences, please consider using only one of them.
    but anyway I got
    Copy code
    Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto.
    r
    • 2
    • 3
  • t

    Tiansu Yu

    06/17/2025, 8:06 AM
    Hi, I have just noticed that flink confluent avro will use legacy timestamp mapping, which results in
    Copy code
    java.lang.UnsupportedOperationException: Unsupported to derive Schema for type: TIMESTAMP_LTZ(3)
    at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:494) ~[flink-sql-avro-1.19.2.jar:1.19.2]
    however, this argument is only available for flink avro not flink confluent avro format. Therefore it is unable to cast unix timstamp in avro properly as a source table in Flink, while using confluent-avro format (where you can hook schema registry). Is this a known bug already?
    p
    • 2
    • 3
  • r

    Rushikesh Gulve

    06/17/2025, 12:09 PM
    Hi everyone, I am trying to deploying pyflink application in kubernetes. I am facing issue when trying to configure checkpointing. I have 2 task manager pods and 3.5 gb memory per task manager. This configuration works fine when I do not have checkpointing. When I configure checkpointing, I get OOMKilled in task manager pod and when I configure unaligned-checkpointing, I get sigsegv crash. This is my deployment file.
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: data-processing-flink
      namespace: flink
    spec:
      image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
      flinkVersion: "v1_20"
      serviceAccount: flink
      flinkConfiguration:
        rest.port: "8081"
        jobmanager.rpc.address: "data-processing-flink"
        jobmanager.rpc.port: "6123"
        taskmanager.numberOfTaskSlots: "2"
    
        # High Availability
        high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
        high-availability.storageDir: file:///flink-data/ha
    
        # Checkpoints and Savepoints
        state.checkpoints.dir: file:///flink-data/checkpoints
        state.savepoints.dir: file:///flink-data/savepoints
        state.backend.type: rocksdb
        
        execution.checkpointing.incremental: "true"
       # execution.checkpointing.alignment-timeout: "3000"  # ms, after which checkpoint switches to unaligned mode dynamically
    
        #execution.checkpointing.unaligned: "true"                  # Enable unaligned checkpoints
    
        #rest.profiling.enabled: "true"
        #env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
    
      podTemplate:
        spec:
          imagePullSecrets:
            - name: ecrscr-credentials
          containers:
            - name: flink-main-container
              imagePullPolicy: Always
              envFrom:
                - secretRef:
                    name: redis-secret-main
              env:
                - name: KAFKA_BROKER
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: BOOTSTRAP_SERVERS
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: MINIO_SERVER
                  value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
                - name: MINIO_USER
                  value: "minio"
                - name: MINIO_PASSWORD
                  value: "minio123"
                - name: MAX_PARALLELISM
                  value: "128"
              volumeMounts:
                - mountPath: /flink-data
                  name: flink-pvc
          volumes:
            - name: flink-pvc
              persistentVolumeClaim:
                claimName: flink-data-pvc
          securityContext:
            runAsUser: 9999
            runAsGroup: 9999
    
      jobManager:
        resource:
          memory: "3072m"
          cpu: 0.4
    
      taskManager:
        resource:
          memory: "3584m"
          cpu: 2
        replicas: 2
    
      job:
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
        upgradeMode: stateless
    I am not able find the root cause or the best way to configure checkpointing. Does anyone have any idea as to how I shall proceed with this?
    p
    • 2
    • 1
  • l

    Leong Wai Leong

    06/17/2025, 2:10 PM
    I have been trying to look for dynamic CEP , under FLIP-200 , I understand that this is being offered by ververica and alicloud, is there any implementation available for on prem usage instead
    p
    • 2
    • 2
  • a

    Apollo Elon

    06/18/2025, 4:05 PM
    Hello everyone! Urgent help!! I encountered some serialization problems when using flink-connect-kafka, but I traced it down and didn't know exactly what the problem was. Attached are my code and error messages.
    Copy code
    KafkaSourceBuilder<RichCdcMultiplexRecord> kafkaSourceBuilder = KafkaSource.builder();
    kafkaSourceBuilder.setTopics(Context.KafkaOptions.topic)
            .setBootstrapServers(Context.KafkaOptions.bootstrapServer)
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setValueOnlyDeserializer(new RichCdcMultiplexRecordDeserializer())
            .setGroupId(Context.KafkaOptions.groupId);
    public class RichCdcMultiplexRecordDeserializer
            implements DeserializationSchema<RichCdcMultiplexRecord> {
    
        @Override
        public RichCdcMultiplexRecord deserialize(byte[] message) throws IOException {
            RichCdcMultiplexRecord richCdcMultiplexRecord = RichCdcMultiplexRecordDeserializer.deserializeForValue(message);
            return richCdcMultiplexRecord;
        }
    
        @Override
        public boolean isEndOfStream(RichCdcMultiplexRecord nextElement) {
            return false;
        }
    
        @Override
        public TypeInformation<RichCdcMultiplexRecord> getProducedType() {
            return TypeInformation.of(RichCdcMultiplexRecord.class);
        }
    
        public static RichCdcMultiplexRecord deserializeForValue(byte[] value) {
            return KryoSerializerUtil.deserialize(value, RichCdcMultiplexRecord.class);
        }
    }
    I'm sure the Value of the data in Kafka is in the RichCdcMultiplexRecord format. The data has been read from Kafka because the serialization method debug has observed it. However, there was a problem when writing to the next operator. Thank you all!!!
    2025-06-18 23:18:09
  • s

    Sumit Nekar

    06/19/2025, 6:04 AM
    Query regarding flink autoscaler. If anyone is facing similar problem (i had posted in stack overflow also) then please help to address the same. Thanks https://stackoverflow.com/questions/79665550/flink-autoscaler-not-scaling-down-task-manger-pods?noredirect=1#comment140518279_79665550
  • a

    Artsiom Yudovin

    06/20/2025, 10:17 AM
    Hi, I use SQL hint to set statet ttl
    Copy code
    SELECT /*+ STATE_TTL('session'='12h', 'userState'='1d', 'authorizedUserState'='365d') */
    it looks like it is not working, do anybody have any idea what it can happend?
    g
    • 2
    • 3
  • a

    Akash Patel

    06/20/2025, 2:04 PM
    Hi, We deploy Flink in Session Cluster mode with the FlinkDeployment CRD and need guidance on two issues: a. Separate resource requests vs. limits Can we specify different values for requests and limits? At the moment, the CPU- and memory-values defined for the JobManager or TaskManager are applied to both; I don’t see a separate setting. a. TaskManager out-of-memory (Metaspace) errors Some TaskManager pods crash with Metaspace OOM. We could raise the cluster-level memory, but a single session cluster may run many jobs, and only a few need the extra memory. For example, in a cluster running 20 jobs, 5 of them trigger OOM, the affected pod is killed, a new one is created, and the job restarts. How can we isolate or control these heavy jobs so they don’t fail with out of memory issue? Currently we don’t want to move them to a new session cluster. Thank You
  • r

    Rushikesh Gulve

    06/23/2025, 10:31 AM
    Hi everyone, I have a flink application deployed using kubernetes operator. I have 40 task manager pods and I am using unaligned checkpointing. The tolerable failed checkpoint is 3. I got these logs in one of my task manager where task manager did not log anything for 15 minutes and then logged a SIGSEGV error. During this interval 3 checkpoints have failed and all task managers are killed.
    Copy code
    # JRE version: OpenJDK Runtime Environment Temurin-11.0.27+6 (11.0.27+6) (build 11.0.27+6)
    2025-06-23 11:43:03.749	
    #
    2025-06-23 11:43:03.749	
    #  SIGSEGV (0xb) at pc=0x00007fa5f75ff898, pid=1, tid=3479
    2025-06-23 11:43:03.748	
    #
    2025-06-23 11:43:03.748	
    # A fatal error has been detected by the Java Runtime Environment:
    2025-06-23 11:43:03.748	
    #
    2025-06-23 11:43:03.748	
    pthread lock: Invalid argument
    2025-06-23 11:29:59.808	
    2025-06-23 09:29:59,808 INFO  /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - after minio data_output: WindowProcessComputeIOStates-5-1736416800000, 64558e0e8caa4ebabc262d76e8117256, 1736416800000
    During this time, other task managers are logging a failiure in checkpointing.
    Copy code
    org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-dist-1.20.1.jar:1.20.1]
    2025-06-23 11:33:00.661	
    	at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:127) ~[flink-dist-1.20.1.jar:1.20.1]
    2025-06-23 11:33:00.661	
    2025-06-23 09:33:00,634 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - KEYED PROCESS -> (Extract-Timestamp -> Timestamps/Watermarks -> Remove-Timestamp -> Map -> Sink: Writer -> Sink: Committer, Map, Map -> Sink: Writer -> Sink: Committer) (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
    2025-06-23 11:33:00.661	
    2025-06-23 09:33:00,629 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: prepareData->ProcessComputePartTypeTransactions -> _stream_key_by_map_operator (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
    2025-06-23 11:33:00.661	
    java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.
    I am guessing the one task manager whcih got unresponsive was responsible for the checkpoint failure which happened due to timeout. Can anyone help with identifying the root cause?
  • t

    Tudor Plugaru

    06/24/2025, 1:45 PM
    Hello team! Does anyone added distributing tracing capabilities to their Flink pipelines, as-in reading Kafka records with tracing headers and writing to Kafka also with tracing headers while preserving the same
    traceparent
    ? Currently, from the work I did it seems that it's very very hard to properly do it, unless you pass tracing metadata as part of the POJO that get's passed from operator to operator, but even in this case, you'll have to "activate" the trace in each operator of your pipeline. I found this to be not very DX friendly and very error prone. We're on Flink 1.18 if it's important. I'm looking for inspiration mainly on how to do it nicer and more DX friendly, so open for suggestions/ideas. Thanks
    p
    • 2
    • 1
  • g

    George Leonard

    06/25/2025, 5:32 AM
    wonder if anyone can help, have the following import... and here is the associated pom entry, import is stll failing as far as vscode and mvn compile is concerned. from code.
    Copy code
    import org.apache.flink.api.common.serialization.SerializableSerializer;
    ...
    
        @Override
        public SimpleVersionedSerializer<SnmpSourceSplit> getSplitSerializer() {
            return new SerializableSerializer<>();
        }
    
        @Override
        public SimpleVersionedSerializer<List<SnmpSourceSplit>> getEnumeratorCheckpointSerializer() {
            return new SerializableSerializer<>();
        }
    parent pom.xml
    Copy code
    <properties>
            <flink.version>1.20.1</flink.version>
            <java.version>17</java.version>
            <maven-compiler.version>3.11.0</maven-compiler.version>
            <maven-shade.version>3.5.2</maven-shade.version>
            <maven-compiler.source>${java.version}</maven-compiler.source>
            <maven-compiler.target>${java.version}</maven-compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         ...
    local pom.xml
    Copy code
    <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>${flink.version}</version>
                <scope>provided</scope>
            </dependency>
    a
    • 2
    • 7
  • f

    Fabrizzio Chavez

    06/25/2025, 1:50 PM
    Hello!, I would like to know if anyone has experience configuring flink sql gateway with the flink kubernetes operator, I've tried to declare a deployment running the command ./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest-address=0.0.0.0, the endpoint /v1/info works fine, but when I send a sql query using the flink jdbc driver, it returns an error after 1min, how can I configure the the jobmanager urls?
  • a

    Adam Richardson

    06/26/2025, 3:27 AM
    Hi all. I have a few questions about the legacy two-phase commit sink. My application reads data from Kafka and writes it to a Delta Lake via the open-source Delta sink, which is implemented as a 2PC sink using the legacy APIs (
    org.apache.flink.api.connector.sink.{Writer,Committer,GlobalCommitter,Sink,...}
    ). I'm observing strange behavior where consumer group latency on Kafka is close to zero after a checkpoint, but the data in the output Delta table lags significantly. I can see in the TM logs that Delta commits are happening quickly and reliably after each checkpoint, but recent data is still missing. After some digging I found a reference in an unrelated issue on Flink: "it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline". This lines up very well with the symptoms I'm seeing -- my app checkpoints every 5m, and I'm seeing latency in the output table fluctuating from 5m (immediately after a checkpoint) to 10m (immediately before the next checkpoint). My questions: • Is it expected behavior that the committed data consistently lags behind by one checkpoint? Are there any more authoritative docs on this behavior? • Is there any workaround (config or code changes) to avoid/fix this behavior? • Does the v2 sink API have this problem? Thank you!
  • r

    Rushikesh Gulve

    06/26/2025, 6:49 AM
    Hi everyone, I have a flink application deployed using kubernetes operator. I have 40 task manager pods and I am using unaligned checkpointing. The tolerable failed checkpoint is 3. I got these logs in one of my task manager where task manager did not log anything for 15 minutes and then logged a SIGSEGV error. During this interval 3 checkpoints have failed and all task managers are killed.
    Copy code
    # JRE version: OpenJDK Runtime Environment Temurin-11.0.27+6 (11.0.27+6) (build 11.0.27+6)
    2025-06-23 11:43:03.749	
    #
    2025-06-23 11:43:03.749	
    #  SIGSEGV (0xb) at pc=0x00007fa5f75ff898, pid=1, tid=3479
    2025-06-23 11:43:03.748	
    #
    2025-06-23 11:43:03.748	
    # A fatal error has been detected by the Java Runtime Environment:
    2025-06-23 11:43:03.748	
    #
    2025-06-23 11:43:03.748	
    pthread lock: Invalid argument
    2025-06-23 11:29:59.808	
    2025-06-23 09:29:59,808 INFO  /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - after minio data_output: WindowProcessComputeIOStates-5-1736416800000, 64558e0e8caa4ebabc262d76e8117256, 1736416800000
    During this time, other task managers are logging a failiure in checkpointing.
    Copy code
    org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-dist-1.20.1.jar:1.20.1]
    2025-06-23 11:33:00.661	
    	at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:127) ~[flink-dist-1.20.1.jar:1.20.1]
    2025-06-23 11:33:00.661	
    2025-06-23 09:33:00,634 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - KEYED PROCESS -> (Extract-Timestamp -> Timestamps/Watermarks -> Remove-Timestamp -> Map -> Sink: Writer -> Sink: Committer, Map, Map -> Sink: Writer -> Sink: Committer) (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
    2025-06-23 11:33:00.661	
    2025-06-23 09:33:00,629 INFO  org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: prepareData->ProcessComputePartTypeTransactions -> _stream_key_by_map_operator (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
    2025-06-23 11:33:00.661	
    java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.
    I am guessing the one task manager whcih got unresponsive was responsible for the checkpoint failure which happened due to timeout. Can anyone help with identifying the root cause?
  • m

    Magdalena Kobusch

    06/26/2025, 8:19 AM
    Does Flink support Amazon s3 Table buckets for Catalog definition? Similar to this example for Spark? I'm attempting to define a catalog on s3 table warehouse in Flink TableAPI, like this:
    Copy code
    tEnv.executeSql(
                        "CREATE CATALOG pulse_table WITH ("
                                + "'type'='iceberg',"
                                + "'warehouse'='arn:aws:s3tables:us-east-1:058264243434:bucket/kyk-dataeng-table-iceberg',"
                                + "'catalog-impl'='org.apache.iceberg.aws.s3tables.S3TablesCatalog',"
                                + ")"
                );
    But that is not working for me. I have added:
    Copy code
    dependencies {
        implementation 'software.amazon.awssdk:s3tables:2.29.26'
        implementation 'software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.7'
    }
    But I'm still getting:
    Copy code
    Exception in thread "main" java.lang.IllegalArgumentException: Cannot initialize Catalog implementation org.apache.iceberg.aws.s3tables.S3TablesCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
    	Missing org.apache.iceberg.aws.s3tables.S3TablesCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.aws.s3tables.S3TablesCatalog]
    a
    • 2
    • 11
  • j

    Jan Bergeson

    06/26/2025, 5:42 PM
    Is there any "blessed" solution for doing zero-downtime Flink deployments? What I've tried to do so far is something like this: 1. Make a 2nd Kafka topic to output the new data to 2. Make a 2nd deployment of the Flink job with the new changes and wait for it to catch up on all the data from the input topics 3. Once it's caught up ^ switch over the consumers to the new Kafka topic 4. Delete the old Kafka topic and Flink deployment. At the moment, each step is manual, which is not great. I could of course automate this process but it seems a bit tricky. You'd need: • A) An abstraction over Kafka topics - some central config that producers & consumers would have to look at to get their Kafka topic assigned (
    my_topic.v1
    for the old job,
    my_topic.v2
    for the new job) • B) An abstraction over Flink deployments so you could automatically have multiple deployments (
    my_flink_deployment_v1
    ,
    my_flink_deployment_v2
    ) which originate from different versions of the same code (We're using the k8s operator) So, questions: • Is there a better way to do this? ^ (maybe the flow is different/easier when using AWS managed Flink?) • If this is the way to do it, are there any existing libs that could help with automating this flow?
    p
    • 2
    • 2
  • d

    dontu balu

    06/27/2025, 12:56 AM
    Hi team, I am working on Flink CDC solution from MYSQL to Iceberg. The user input is a database and table.The user will not specify any schema. Since, I do not know the schema of MYSQL table. Is it possible that the Flink CDC automatically infers the schema when using Flink SQL? Or is it always mandatory to specify the schema when creating source table in Flink SQL?
    r
    p
    • 3
    • 3
  • g

    George Leonard

    06/27/2025, 2:27 PM
    ... guys. stuck, none of my log4j messages are making it into the console nor the specified files. attached is my log4j.properties files... and the snippet from docker-compose.yml. I've even now tried some , trying both to see what works... what give me output as I'm trying to find a buggggg.
    Copy code
    System.out.println(
                Thread.currentThread().getName() + " SNMP Source Reader initialized. s(Direct System.out)"
            );
            LOG.debug("{} SNMP Source Reader initialized.",
                Thread.currentThread().getName()
            );
    Copy code
    environment:
          - ENV_ROOTLOG_LEVEL=INFO
          - ENV_FLINKLOG_LEVEL=INFO
          - ENV_SNMPLOG_LEVEL=DEBUG      
          - ENV_ZOOKEEPERLOG_LEVEL=INFO
          - ENV_PEKKOLOG_LEVEL=INFO
          - ENV_KAFKALOG_LEVEL=INFO
          - ENV_HADOOPLOG_LEVEL=INFO  
          - FLINK_PROPERTIES_JAVA_OPTS=-Dlog4j.configurationFile=file:///opt/flink/conf/log4j.properties
    log4j-session.propertieslog4j-console.propertieslog4j.properties
  • g

    George Leonard

    06/27/2025, 5:26 PM
    we're talking flink 1.20.1 / java 17 / slfj 1.7.36 / 2.17.1
    d
    • 2
    • 2
  • d

    dmitri

    06/30/2025, 11:41 AM
    Hi All, I have a problem with window tumbling aggregation. Flink version 1.20.1 with this environment:
    Copy code
    java 11.0.26 2025-01-21 LTS
    Java(TM) SE Runtime Environment 18.9 (build 11.0.26+7-LTS-187)
    Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.26+7-LTS-187, mixed mode)
    OS: Windows 11
    IDE: VSCode
    I send the data from the Python producer and let Flink consumer uses the logic as below 1. Read Kafka Stock topic with Datastream API: This is how I read the kafka topics:
    Copy code
    KafkaSource<Stock> kafkaSource = createKafkaSource(env, inputProperties, new JsonDeserializationSchema<>(Stock.class));
    DataStream<Stock> stockNoWatermark = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka source");
    2. After that, I convert the data to Table SQL because I prefer to manipulate the data using SQL:
    Copy code
    Table stockTableWatermark = tableEnv.fromDataStream(
    	stockNoWatermark,
    	Schema.newBuilder()
     .column("event_type", DataTypes.STRING())
     .column("exchange", <http://DataTypes.INT|DataTypes.INT>())
     .column("id", DataTypes.BIGINT())
     .column("price", DataTypes.FLOAT())
     .column("sequence_number", DataTypes.BIGINT())
     .column("size", <http://DataTypes.INT|DataTypes.INT>())
     .column("symbol", DataTypes.STRING())
     .column("tape", DataTypes.STRING())
     .column("timestamp", DataTypes.TIMESTAMP_LTZ(3))
     .column("trf_id", DataTypes.STRING())
     .column("trf_timestamp", DataTypes.STRING())
     .column("actual_timestamp", DataTypes.STRING())
     .watermark("timestamp", "`timestamp` - INTERVAL '1' SECOND")
     .build()
    The result from stockTableWatermark like this:
    Copy code
    5> +I[T, 2015, 4991, 158.85, 282034, 95, GOOG, null, +57465-02-11T05:36:48Z, null, null, 2025-06-30 00:10:53.808]
    5> +I[T, 4231, 4642, 181.31, 751310, 35, NVDA, null, +57465-02-11T05:36:51Z, null, null, 2025-06-30 00:10:53.811]
    5> +I[T, 2692, 2536, 236.31, 435106, 50, AAPL, null, +57465-02-11T05:36:58Z, null, null, 2025-06-30 00:10:53.818]
    5> +I[T, 3531, 1780, 137.95, 879217, 15, NVDA, null, +57465-02-11T05:37:31Z, null, null, 2025-06-30 00:10:53.851]
    5> +I[T, 2046, 2779, 340.58, 658954, 24, NVDA, null, +57465-02-11T05:37:37Z, null, null, 2025-06-30 00:10:53.857]
    3. I aggregated with tumbling based on the column timestamp:
    Copy code
    Table resultTable = stockTableWatermark
    .window(Tumble.over(lit(1).minutes()).on($("timestamp")).as("window")) // define window
    	.groupBy($("symbol"), $("window")) // group by key and window
    	.select(
     $("symbol").as("ticker"),
     $("window").start(),
     $("window").end(),
     $("sequence_number").count().as("trades")
    	);
    But why, when I print out the output of resultTable, it shows empty? And this is the last log message:
    Copy code
    14:48:34,340 INFO org.apache.kafka.clients.consumer.ConsumerConfig      [] - ConsumerConfig values: 
        allow.auto.create.topics = true
        <http://auto.commit.interval.ms|auto.commit.interval.ms> = 5000
        auto.include.jmx.reporter = true
        auto.offset.reset = earliest
        bootstrap.servers = [localhost:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = flink-dev-group-stock-consumer-4
        client.rack =
        <http://connections.max.idle.ms|connections.max.idle.ms> = 540000
        <http://default.api.timeout.ms|default.api.timeout.ms> = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        <http://fetch.max.wait.ms|fetch.max.wait.ms> = 500
        fetch.min.bytes = 1
        group.id = flink-dev-group-stock-consumer
        group.instance.id = null
        <http://heartbeat.interval.ms|heartbeat.interval.ms> = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        <http://max.poll.interval.ms|max.poll.interval.ms> = 300000
        max.poll.records = 500
        <http://metadata.max.age.ms|metadata.max.age.ms> = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        <http://metrics.sample.window.ms|metrics.sample.window.ms> = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
        receive.buffer.bytes = 65536
        <http://reconnect.backoff.max.ms|reconnect.backoff.max.ms> = 1000
        <http://reconnect.backoff.ms|reconnect.backoff.ms> = 50
        <http://request.timeout.ms|request.timeout.ms> = 30000
        <http://retry.backoff.ms|retry.backoff.ms> = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        <http://sasl.login.connect.timeout.ms|sasl.login.connect.timeout.ms> = null
        <http://sasl.login.read.timeout.ms|sasl.login.read.timeout.ms> = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        <http://sasl.login.retry.backoff.max.ms|sasl.login.retry.backoff.max.ms> = 10000
        <http://sasl.login.retry.backoff.ms|sasl.login.retry.backoff.ms> = 100
        sasl.mechanism = GSSAPI
        sasl.oauthbearer.clock.skew.seconds = 30
        sasl.oauthbearer.expected.audience = null
        sasl.oauthbearer.expected.issuer = null
        <http://sasl.oauthbearer.jwks.endpoint.refresh.ms|sasl.oauthbearer.jwks.endpoint.refresh.ms> = 3600000
        <http://sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms> = 10000
        <http://sasl.oauthbearer.jwks.endpoint.retry.backoff.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.ms> = 100
        sasl.oauthbearer.jwks.endpoint.url = null
        sasl.oauthbearer.scope.claim.name = scope
        sasl.oauthbearer.sub.claim.name = sub
        sasl.oauthbearer.token.endpoint.url = null
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        <http://session.timeout.ms|session.timeout.ms> = 45000
        <http://socket.connection.setup.timeout.max.ms|socket.connection.setup.timeout.max.ms> = 30000
        <http://socket.connection.setup.timeout.ms|socket.connection.setup.timeout.ms> = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    14:48:34,413 WARN org.apache.kafka.clients.consumer.ConsumerConfig      [] - These configurations '[client.id.prefix, <http://partition.discovery.interval.ms|partition.discovery.interval.ms>, aws.secret.username, aws.database.username, environment, kafka.bootstrap.servers, aws.database.password, aws.database.database, aws.database.hostname, kafka.topic, aws.secret.password]' were supplied but are not used yet.   
    14:48:34,413 INFO org.apache.kafka.common.utils.AppInfoParser         [] - Kafka version: 3.4.0
    14:48:34,414 INFO org.apache.kafka.common.utils.AppInfoParser         [] - Kafka commitId: 2e1947d240607d53
    14:48:34,414 INFO org.apache.kafka.common.utils.AppInfoParser         [] - Kafka startTimeMs: 1751269714413
    14:48:34,427 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
    14:48:34,436 INFO org.apache.kafka.clients.consumer.KafkaConsumer       [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Assigned to partition(s): dev-stock-topic-0
    14:48:34,444 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Seeking to earliest offset of partition dev-stock-topic-0
    14:48:34,470 INFO org.apache.kafka.clients.Metadata              [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Resetting the last seen epoch of partition dev-stock-topic-0 to 0 since the associated topicId changed from null to jKk4sUaiRfSsg8h4GfqpbQ
    14:48:34,471 INFO org.apache.kafka.clients.Metadata              [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Cluster ID: MkU3OEVBNTcwNTJENDM2Qk
    14:48:34,491 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Resetting offset for partition dev-stock-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
    As you can see there is no row showing in the console from resultTable. Could anyone help me address the issue? Because I assigned the event_time and watermark, I print the output and the timestamp is there. But it is failed to group by window_start and window_end.
    p
    • 2
    • 7
  • j

    Jaehyeon Kim

    06/30/2025, 9:45 PM
    I use Flink 1.20.1 and an iceberg catalog is set up using a hive metastore backend.
    Copy code
    CREATE CATALOG demo_ib WITH (
      'type' = 'iceberg',
      'catalog-type' = 'hive',
      'uri' = '<thrift://hive-metastore:9083>'
    );
    I can create an iceberg table (CREATE TABLE ... USING) and describe it without an issue. However I cannot create a view from it. i.e. CREATE VIEW ... AS SELECT ... FROM <iceberg_table> fails with java.lang.IllegalArgumentException: table should be resolved. The select part is not an issue as EXPLAIN SELECT ... FROM <iceberg_table> works. Note I can create a view on a hive catalog. Can you inform me how to fix this issue?
    • 1
    • 1
  • n

    Niharika Sakuru (Niha)

    07/01/2025, 3:14 PM
    Cross-posting: Hey #C065944F9M2 — I’ve been looking into how Flink starts Python processes via
    PythonEnvUtils.java
    , and I noticed that it logs the entire environment variable map when launching PyFlink jobs. This seems like it could be a security issue, especially in Kubernetes setups using the Flink Kubernetes Operator — since secrets are commonly mounted as env vars in pods. That means things like
    AWS_SECRET_ACCESS_KEY
    ,
    DB_PASSWORD
    , etc. could end up in plaintext JobManager or TaskManager logs. 📌 Here's an example of what’s being logged:
    Copy code
    Starting Python process with environment variables: AWS_SECRET_ACCESS_KEY=..., DB_PASSWORD=...
    Has anyone else run into this? Curious if there's already been discussion or a fix proposed upstream. Would love thoughts from others who are deploying PyFlink in production or using secrets in K8s environments. I've created https://issues.apache.org/jira/browse/FLINK-38035 with all details
  • l

    L P V

    07/02/2025, 9:29 AM
    Hello guy, I'm trying to test Flink 2.0 and paimon but jar file for Paimon is not found: https://repository.apache.org/snapshots/org/apache/paimon/paimon-flink-2.0/1.3-SNAPSHOT/ --> show error not found. Has anyone else tried this?
  • m

    Martin Egri

    07/02/2025, 3:57 PM
    I'm getting the error
    java.lang.ClassCastException: class org.apache.flink.table.types.logical.LegacyTypeInformationType cannot be cast to class org.apache.flink.table.types.logical.DecimalType
    during runtime using a homebrew KafkaSource that deserialises Avro data. I'm not sure where the LegacyTypeInformationType comes from; I've googled but the only thing that looks similar is this: https://github.com/apache/flink/pull/11874#discussion_r414251140 I'm doing the conversion like this:
    Copy code
    case byteBuffer: ByteBuffer =>
        logicalType match
            case decimal: LogicalTypes.Decimal =>
                val precision = decimal.getPrecision
                val scale     = decimal.getScale
                DecimalConversion().fromBytes(byteBuffer, schema, LogicalTypes.decimal(precision, scale))
            case _: LogicalTypes.BigDecimal =>
                BigDecimalConversion().fromBytes(byteBuffer, schema, LogicalTypes.bigDecimal)
            case _ =>
                val bytes = Array.ofDim[Byte](byteBuffer.remaining)
                byteBuffer.get(bytes)
                bytes
    case genericFixed: GenericFixed =>
        logicalType match
            case decimal: LogicalTypes.Decimal =>
                val precision = decimal.getPrecision
                val scale     = decimal.getScale
                DecimalConversion().fromFixed(genericFixed, schema, LogicalTypes.decimal(precision, scale))
            case _ => genericFixed.bytes
    … and for the `TypeInformation`s I return the corresponding:
    Copy code
    case Schema.Type.BYTES =>
        avroSchema.getLogicalType match
            case _: LogicalTypes.Decimal    => Types.BIG_DEC
            case _: LogicalTypes.BigDecimal => Types.BIG_DEC
            case _                          => Types.PRIMITIVE_ARRAY(Types.BYTE)
    
    case Schema.Type.FIXED =>
        avroSchema.getLogicalType match
            case _: LogicalTypes.Decimal => Types.BIG_DEC
            case _                       => Types.PRIMITIVE_ARRAY(Types.BYTE)
    • 1
    • 2
  • v

    Vikas Patil

    07/02/2025, 7:10 PM
    Which version of zookeeper I need to use for Flink 1.19 ? I am not able to find proper documentation for this.
    r
    p
    • 3
    • 3
  • s

    Sachin

    07/04/2025, 5:37 AM
    Can someone please share the idea if they have a disaster recovery support for flink cluster in production and how the exactly-once is being managed in such setups?
    p
    s
    • 3
    • 4