https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • f

    Fabio Carvalho

    08/18/2025, 9:53 AM
    BTW, on each K8s cluster, we have now 19 Flink clusters in K8s HA managed by ArgoCD.
  • j

    Jina Mizrahi

    08/18/2025, 12:14 PM
    Hey all, I am facing an issue with flink-connector-jdbc-core:4.0.0-2.0 and clickhouseclickhouse jdbc0.8.6. I have created JdbcSink using ClickHouseDriver and every time the SimpleBatchStatementExecutor::executeBatch is been called, we insert all the previous batches as well. I looked at the code and saw that there is no clear of the field batchValues (of com.clickhouse.jdbc.PreparedStatementImpl) in any phase, only when we create new preparedStatement of course. So using both of them together is not possible since we insert each batch in every insert command. Is it a known issue? is there any fix for it? can we create new PreparedStatement after each executeBatch function call?
  • r

    raphaelauv

    08/18/2025, 12:26 PM
    question : FLINK & confluent schema registry hi all , how can I set schema-registry settings on the kafka producer like
    auto.register.schemas
    or
    use.schema.id
    ... ? I have share a full code example in stackoverflow https://stackoverflow.com/questions/79738462/flink-confluentregistryavroserializationschema-not-respecting-registryconfigs Thanks all
    • 1
    • 1
  • r

    Rehan Sayed

    08/19/2025, 9:28 AM
    Hey folks, The TaskManager pods in my flink pipeline that use the RocksDB state backend have been consistently experiencing very high memory utilization, with memory usage remaining steadily around 95% over an extended period. Despite normal fluctuations observed in JVM heap memory, the overall memory consumption within the pods stays persistently high, which has been leading to resource pressure and intermittent pod failures due to OOM Using flink 1.18.1, flink-kubernetes-operator version 1.11.0, jemalloc as memory allocator and n2d-highmem-32 VMs Following are the task manager configs:
    Copy code
    taskManager:
      managedMemoryFraction: "0.01"
      nodeSelector:
        flinknode: taskmanager
        type: highmem32
      tolerations:
        - key: "flinknode"
          operator: "Equal"
          value: "taskmanager"
          effect: "NoSchedule"
      taskSlots: 32
      resource:
        memoryFactor: 110
        processMemory: 220000 # 220GB. 245 GB is allocatable.Total memory assigned to pod is 220*1.1=242 GB
        jvmMetaspace: 256
        offHeap: 16000 # 16 Gb
        cpu: 30
    These are the current rocksDB configs :
    Copy code
    rocksDbSettings: |
      state.backend.type: "rocksdb"
      state.backend.incremental: "true"
      state.backend.rocksdb.checkpoint.transfer.thread.num: "8"
      state.backend.rocksdb.thread.num: "8"
      state.backend.rocksdb.memory.managed: "false"
      state.backend.rocksdb.writebuffer.size: "256mb"
      state.backend.rocksdb.block.cache-size: "128mb"
      state.backend.rocksdb.block.blocksize: "64kb"
      state.backend.rocksdb.writebuffer.count: "4"
      state.backend.rocksdb.writebuffer.number-to-merge: "2"
      state.backend.rocksdb.files.open: "400"
      state.backend.rocksdb.predefined-options: "FLASH_SSD_OPTIMIZED"
    Attached are diagrams of the block cache usage alongside the task manager memory graph (where increases in block cache size coincide with pod OOMs) and the heap usage graph I would appreciate any advice on how to investigate or resolve this issue.
    s
    d
    • 3
    • 3
  • b

    B S Mohammed Ashfaq

    08/20/2025, 1:30 AM
    I'm using Apache flink 1.19.2 and when i submit a flink sql job for eg kafka to db the whole create table, select table is logged in the logs and this is exposing the sensitive info like credentials etc how can this be fixed please let me know
    m
    • 2
    • 1
  • b

    Brice Loustau

    08/20/2025, 9:22 PM
    Hi folks, this is my first message here The website https://nightlies.apache.org/ which hosts the Flink documentation seems to be down. Not sure where to report this
    plusone 2
    l
    f
    • 3
    • 3
  • i

    Ian Stewart

    08/22/2025, 6:43 PM
    Does anyone have suggestions on a pattern for deduplicating messages using process time? We have an event stream that can sometimes become backed up. When it is we want to dedupe those events so we're only processing the latest ones. We were hoping to use Window Duplication but it currently does not support process time dedupe: https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/sql/queries/window-deduplication/#limitation-on-time-attribute-of-order-key
  • e

    Elad

    08/24/2025, 8:15 PM
    Hello folks. First time using slack, but I do really need a hand here. In the DataStream API I’m using session windows. Sometimes a window has already fired and emitted downstream, but then a late event arrives that causes this window to merge into another session window. I’d like to run custom logic at the time of the merge (for example, emit a retraction or compensation event for the earlier window that already fired). I know that Trigger.onMerge exists, but it does not allow me to directly hook into the fact that “window A that had already fired is now being absorbed into window B” and use it. To get more specific - in my use case, I would like to send to a side output the id of the events that existed in the merged window, so I could delete it from the data base and consider it irrelevant 👉 Is there a recommended way in Flink’s DataStream API to implement a custom callback on window merge so I can handle these “already-fired-but-now-merged” windows?
    a
    • 2
    • 3
  • c

    Clemens Valiente

    08/25/2025, 2:27 AM
    hi, is there any way to fix this?
    Copy code
    2025-08-22 06:51:30,525 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class com.google.protobuf.ByteString does not contain a getter for field hash
    2025-08-22 06:51:30,525 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class com.google.protobuf.ByteString does not contain a setter for field hash
    2025-08-22 06:51:30,526 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class com.google.protobuf.ByteString cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
    the hash field definitely shouldn't be set in the first place, and I couldn't figure out how to write/define a custom TypeExtractor? 🤔
  • c

    Clemens Valiente

    08/25/2025, 2:37 AM
    because I noticed that the Flink app spends a lot of time going through this exception:
  • c

    Clemens Valiente

    08/25/2025, 2:37 AM
    over and over again and I don't think that should be necessary
  • c

    Clemens Valiente

    08/25/2025, 4:40 AM
    it seems like it is ignoring my registrations? with the debugger I can see this entry in
    kryoRegistrations
    Copy code
    "streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource" -> {KryoRegistration@26135} 
     key = "streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
     value = {KryoRegistration@26135} 
      registeredClass = {Class@7694} "class streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
      serializerClass = {Class@21986} "class com.grab.grabdefence.acorn.proto.ScalaPbEnumSerializer"
      serializableSerializerInstance = null
      serializerDefinitionType = {KryoRegistration$SerializerDefinitionType@26334} "CLASS"
    but for some reason a different serializer is used
    Copy code
    type = {Class@7694} "class streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
    kryo = {KryoBase@21592} com.twitter.chill.KryoBase@7a852
     objSer = {ObjectSerializer@26028} com.twitter.chill.ObjectSerializer@11d3192e
    with a debug checkpoint on
    copy
    here:
    Copy code
    try {
                checkKryoInitialized();
                try {
                    return kryo.copy(from);
                } catch (KryoException ke) {
    a
    • 2
    • 12
  • j

    Jashwanth S J

    08/25/2025, 8:59 AM
    We're seeing one weird issue while brining up flinksession job through operator. We're currently using AWS signed URL for jar access through S3 which is working fine, but when we replace it with just s3 endpoint without public access, it is failing. Can someone help here?
  • f

    Fabricio Lemos

    08/25/2025, 8:16 PM
    Any idea when we can expect the JDBC connector to become compatible with Flink 2.1? It's the only thing blocking us from upgrading.
  • d

    Dheeraj Panangat

    08/26/2025, 11:41 AM
    Hi All, When is the Flink Kubernetes Operator planned to be release with support for flink version 2.1.0?
  • b

    Brad Murry

    08/26/2025, 3:33 PM
    Hello all, Can anyone provide guidance on whether we can reliably set metrics reporters programmatically? In many java apps, I can just register new MetricRegistries if I want to push to a new place, but I'm not clear iof Flink's flink-conf.yml and config toolchain would support this use case? I'm running Flink applications in a managed environment so I don't have access to flink-conf.yml Thanks for any pointers!
    • 1
    • 1
  • l

    L P V

    08/27/2025, 8:57 AM
    Hi guy, I'm working with Flink SQL on a Kafka topic use protobuf with Schema Registry. Btw seem Flink is not support Confluent protobuf native https://issues.apache.org/jira/browse/FLINK-29731 Is there any way I could work around to by pass this except build another job to convert protobuf-confluent to protobuf?
  • d

    Dennis Sosnoski

    08/27/2025, 9:05 AM
    I'm trying to collect operator-level metrics from Flink jobs, but it looks like nothing shows up if the whole job runs with parallelism 1 (meaning it's just using a single task manager, as I understand it, with no serialization between operator steps) - not even application-level metrics. Is this a known issue, and is there any way around it (short of creating an artificial parallelism difference just to split it into chunks)?
  • y

    Yoshi Nagasaki

    08/27/2025, 3:41 PM
    Hi folks. I'm playing with Flink and having some trouble -- I have my job workflow listening to a DDB stream, processing the CDC events, and writing some stats to another DDB table. My workflow looks like this:
    Copy code
    final var watermarkStrategy = WatermarkStrategy.<CDCEvent>forMonotonousTimestamps()
                    .withIdleness(Duration.ofSeconds(1))
                    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
            final var lateTag = new OutputTag<Metric>("late") {
            };
    
            final var workflow = env.fromSource(dynamoDbStreamsSource, watermarkStrategy, "DynamoDB Streams source")
                    .returns(TypeInformation.of(CDCEvent.class))
                    .uid("custom-uid")
                    .filter(...)
                    .map(...compute deltas based on changes...)
                    .filter(...)
                    .flatMap(...map deltas to 1 or more "Metrics"...)
                    .keyBy(...key by the metric key...)
                    .window(TumblingEventTimeWindows.of(Duration.ofSeconds(10)))
                    .sideOutputLateData(lateTag)
                    .aggregate(new MetricAggregator()); // this is an AggregateFunction<> that merges the delta values across metric instances in the window
    
            final var lateMetrics = workflow.getSideOutput(lateTag)
                    .sinkTo(new PrintSink<>(true));
    
            workflow.sinkTo(...dynamo sink...);
    As you can see, nothing fancy, pretty straightforward... My problem is: when I set the
    InitialPosition
    of the stream to
    TRIM_HORIZON
    , I see in the debug output that it's receiving, processing, and sinking all the historical events in the last 24 hours... but once it reaches the end, it is completely stuck. I make requests to the DDB table that result in new CDC events, I see in the output that it receives the event immediately, does all the stateless processing (all the filters and maps before the windowing), and then it just disappears and nothing happens. I have logging in the
    MetricAggregator
    and my custom sink, and neither logs get triggered for these new events. When I set the
    InitialPosition
    to
    LATEST
    and feed new events, it flows through fine! (I still have to submit a new change before the previous change gets fully processed, despite the idleness setting in the watermark strategy, but at least I can get it to process.) My custom sink is modeled closely after the existing DDB connector, and has these properties:
    Copy code
    maxBatchSize.       = 10
    maxInFlightRequests = 20
    maxBufferedRequests = 1000
    maxTimeInBufferMS.  = 5000
    Just adding this in case it's relevant but I don't think it should matter. Based on what I've read on watermarking, I assume this has to do with the strategy and specifically the idleness, which I set arbitrarily low, but it doesn't seem to matter/do anything (with either
    TRIM_HORIZON
    or
    LATEST
    ). I've tried both built-in watermarking strategies and have the same problem. Any ideas? Thanks.
    • 1
    • 2
  • r

    raphaelauv

    08/27/2025, 4:34 PM
    flink kafka table api Hi, is it possible to set a custom class in TABLE API for a kafka sink for the serialization of the value ? the same way I can do it with setValueSerializationSchema in JAVA API ? here
    SafeConfluentRegistryAvroSerializationSchema
    is my custom serialization class
    Copy code
    KafkaSink.<Car>>builder()
            .setRecordSerializer(
                    KafkaRecordSerializationSchema.builder()
                            .setValueSerializationSchema(
                                    new SafeConfluentRegistryAvroSerializationSchema(
                                            Car.class,
                                            "flink-output-value",
                                            "<http://confluent-schema-registry-local:8081>",
                                            sr_settings
                                    ))
    thanks all
  • g

    George Leonard

    08/28/2025, 6:28 AM
    just realised i maybe should have posted my query for assistance here. pls see: in the flink-cdc channel https://apache-flink.slack.com/archives/C076E3QGX8A/p1756362179848529
  • r

    Rushikesh Gulve

    08/28/2025, 7:08 AM
    Hi everyone, I am trying to deploy a pyflink application using Kubernetes operator. My purpose is to have a parallelism of 1, and I want to distribute different subtasks among multiple task managers. So, ultimately something like 7 task managers handling different tasks of a single job with parallelism 1. I tried multiple configuration to achieve this but I end up getting task managers equal to the parallelism which is not my goal. Is it possible to achieve this configuration? If yes, how??
  • u

    Urs Schoenenberger

    08/28/2025, 8:36 AM
    Hi folks, for the Kafka Sink - is it possible to enforce that the "Sink: Committer" will be chained to the "Sink: Writer"? My job graph always has the committers attached via a forward, rather than chained.
  • g

    George Leonard

    08/28/2025, 10:07 AM
    RESOLVED 🎉 ... case matters.
  • g

    George Leonard

    08/28/2025, 10:07 AM
    😉
  • g

    George Leonard

    08/28/2025, 11:03 AM
    ... why would i be getting data delivered to the flink writer (output to Paimon) but it gets stuck there and does not go further.
    Copy code
    INSERT INTO c_paimon.outbound.children
    SELECT 
        JSON_VALUE(data, '$._id') as _id,
        JSON_VALUE(data, '$.name') as name,
        JSON_VALUE(data, '$.surname') as surname,
        JSON_VALUE(data, '$.gender') as gender,
        JSON_VALUE(data, '$.dob') as dob,
        JSON_VALUE(data, '$.nationalid') as nationalid,
        JSON_VALUE(data, '$.family_id') as family_id,
        JSON_VALUE(data, '$.father_nationalid') as father_nationalid,
        JSON_VALUE(data, '$.mother_nationalid') as mother_nationalid,
        ROW(
            JSON_VALUE(data, '$.address.street_1'),
            JSON_VALUE(data, '$.address.street_2'),
            JSON_VALUE(data, '$.address.neighbourhood'),
            JSON_VALUE(data, '$.address.town'),
            JSON_VALUE(data, '$.address.county'),
            JSON_VALUE(data, '$.address.province'),
            JSON_VALUE(data, '$.address.country'),
            JSON_VALUE(data, '$.address.country_code'),
            JSON_VALUE(data, '$.address.postal_code'),
            JSON_VALUE(data, '$.address.parcel_id')
        ) as address,
        created_at
    FROM postgres_catalog.inbound.children;
    a
    • 2
    • 2
  • g

    George Leonard

    08/28/2025, 1:26 PM
    hi hi all. I'm stuck trying to move some data around. please see the attached. I can see my problem is the insert statement, and it's having problems with the array of accounts... not sure how to fix. attached is my code from pg source db, through to cdc table in flink and then out to paimon, including an example payload.
    code.mdadults.json
  • g

    George Leonard

    08/29/2025, 5:42 AM
    i'd like to unpack my original postgress structure into a flink and into a paimon structured record. pls help
  • j

    Jashwanth S J

    08/29/2025, 8:57 AM
    Hi Team, We are currently using Apache Flink 1.20.2 and planning to upgrade to Java 21 for our next release. But Flink’s Docker Hub does not offer a Flink 1.20.2 image with Java 21, only versions up to Java 17 are available. What are our options to move forward?
    a
    • 2
    • 4
  • g

    George Leonard

    08/29/2025, 12:06 PM
    have something strange. kick off 2 inserts from flink tables into paimon based tables, run fine for some minutes. using 2 task slots out of 60 available. and then suddenly it all goes bad and starts failing and then eat up task slots until it runs out and then flink dies... at this point i need to kill jobs and restart flink stack.
    jobmanager_log.txt
1...9495969798Latest