https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • b

    B S Mohammed Ashfaq

    08/20/2025, 1:30 AM
    I'm using Apache flink 1.19.2 and when i submit a flink sql job for eg kafka to db the whole create table, select table is logged in the logs and this is exposing the sensitive info like credentials etc how can this be fixed please let me know
    m
    • 2
    • 1
  • b

    Brice Loustau

    08/20/2025, 9:22 PM
    Hi folks, this is my first message here The website https://nightlies.apache.org/ which hosts the Flink documentation seems to be down. Not sure where to report this
    plusone 2
    l
    f
    • 3
    • 3
  • i

    Ian Stewart

    08/22/2025, 6:43 PM
    Does anyone have suggestions on a pattern for deduplicating messages using process time? We have an event stream that can sometimes become backed up. When it is we want to dedupe those events so we're only processing the latest ones. We were hoping to use Window Duplication but it currently does not support process time dedupe: https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/sql/queries/window-deduplication/#limitation-on-time-attribute-of-order-key
  • e

    Elad

    08/24/2025, 8:15 PM
    Hello folks. First time using slack, but I do really need a hand here. In the DataStream API I’m using session windows. Sometimes a window has already fired and emitted downstream, but then a late event arrives that causes this window to merge into another session window. I’d like to run custom logic at the time of the merge (for example, emit a retraction or compensation event for the earlier window that already fired). I know that Trigger.onMerge exists, but it does not allow me to directly hook into the fact that “window A that had already fired is now being absorbed into window B” and use it. To get more specific - in my use case, I would like to send to a side output the id of the events that existed in the merged window, so I could delete it from the data base and consider it irrelevant 👉 Is there a recommended way in Flink’s DataStream API to implement a custom callback on window merge so I can handle these “already-fired-but-now-merged” windows?
    a
    • 2
    • 3
  • c

    Clemens Valiente

    08/25/2025, 2:27 AM
    hi, is there any way to fix this?
    Copy code
    2025-08-22 06:51:30,525 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class com.google.protobuf.ByteString does not contain a getter for field hash
    2025-08-22 06:51:30,525 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - class com.google.protobuf.ByteString does not contain a setter for field hash
    2025-08-22 06:51:30,526 INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class com.google.protobuf.ByteString cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
    the hash field definitely shouldn't be set in the first place, and I couldn't figure out how to write/define a custom TypeExtractor? 🤔
  • c

    Clemens Valiente

    08/25/2025, 2:37 AM
    because I noticed that the Flink app spends a lot of time going through this exception:
  • c

    Clemens Valiente

    08/25/2025, 2:37 AM
    over and over again and I don't think that should be necessary
  • c

    Clemens Valiente

    08/25/2025, 4:40 AM
    it seems like it is ignoring my registrations? with the debugger I can see this entry in
    kryoRegistrations
    Copy code
    "streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource" -> {KryoRegistration@26135} 
     key = "streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
     value = {KryoRegistration@26135} 
      registeredClass = {Class@7694} "class streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
      serializerClass = {Class@21986} "class com.grab.grabdefence.acorn.proto.ScalaPbEnumSerializer"
      serializableSerializerInstance = null
      serializerDefinitionType = {KryoRegistration$SerializerDefinitionType@26334} "CLASS"
    but for some reason a different serializer is used
    Copy code
    type = {Class@7694} "class streams.tis.trust.fhpstream.filtered_hydra_payload.RequestSource"
    kryo = {KryoBase@21592} com.twitter.chill.KryoBase@7a852
     objSer = {ObjectSerializer@26028} com.twitter.chill.ObjectSerializer@11d3192e
    with a debug checkpoint on
    copy
    here:
    Copy code
    try {
                checkKryoInitialized();
                try {
                    return kryo.copy(from);
                } catch (KryoException ke) {
    a
    • 2
    • 12
  • j

    Jashwanth S J

    08/25/2025, 8:59 AM
    We're seeing one weird issue while brining up flinksession job through operator. We're currently using AWS signed URL for jar access through S3 which is working fine, but when we replace it with just s3 endpoint without public access, it is failing. Can someone help here?
  • f

    Fabricio Lemos

    08/25/2025, 8:16 PM
    Any idea when we can expect the JDBC connector to become compatible with Flink 2.1? It's the only thing blocking us from upgrading.
  • d

    Dheeraj Panangat

    08/26/2025, 11:41 AM
    Hi All, When is the Flink Kubernetes Operator planned to be release with support for flink version 2.1.0?
  • b

    Brad Murry

    08/26/2025, 3:33 PM
    Hello all, Can anyone provide guidance on whether we can reliably set metrics reporters programmatically? In many java apps, I can just register new MetricRegistries if I want to push to a new place, but I'm not clear iof Flink's flink-conf.yml and config toolchain would support this use case? I'm running Flink applications in a managed environment so I don't have access to flink-conf.yml Thanks for any pointers!
    • 1
    • 1
  • l

    L P V

    08/27/2025, 8:57 AM
    Hi guy, I'm working with Flink SQL on a Kafka topic use protobuf with Schema Registry. Btw seem Flink is not support Confluent protobuf native https://issues.apache.org/jira/browse/FLINK-29731 Is there any way I could work around to by pass this except build another job to convert protobuf-confluent to protobuf?
  • d

    Dennis Sosnoski

    08/27/2025, 9:05 AM
    I'm trying to collect operator-level metrics from Flink jobs, but it looks like nothing shows up if the whole job runs with parallelism 1 (meaning it's just using a single task manager, as I understand it, with no serialization between operator steps) - not even application-level metrics. Is this a known issue, and is there any way around it (short of creating an artificial parallelism difference just to split it into chunks)?
  • y

    Yoshi Nagasaki

    08/27/2025, 3:41 PM
    Hi folks. I'm playing with Flink and having some trouble -- I have my job workflow listening to a DDB stream, processing the CDC events, and writing some stats to another DDB table. My workflow looks like this:
    Copy code
    final var watermarkStrategy = WatermarkStrategy.<CDCEvent>forMonotonousTimestamps()
                    .withIdleness(Duration.ofSeconds(1))
                    .withTimestampAssigner((event, timestamp) -> event.getTimestamp());
            final var lateTag = new OutputTag<Metric>("late") {
            };
    
            final var workflow = env.fromSource(dynamoDbStreamsSource, watermarkStrategy, "DynamoDB Streams source")
                    .returns(TypeInformation.of(CDCEvent.class))
                    .uid("custom-uid")
                    .filter(...)
                    .map(...compute deltas based on changes...)
                    .filter(...)
                    .flatMap(...map deltas to 1 or more "Metrics"...)
                    .keyBy(...key by the metric key...)
                    .window(TumblingEventTimeWindows.of(Duration.ofSeconds(10)))
                    .sideOutputLateData(lateTag)
                    .aggregate(new MetricAggregator()); // this is an AggregateFunction<> that merges the delta values across metric instances in the window
    
            final var lateMetrics = workflow.getSideOutput(lateTag)
                    .sinkTo(new PrintSink<>(true));
    
            workflow.sinkTo(...dynamo sink...);
    As you can see, nothing fancy, pretty straightforward... My problem is: when I set the
    InitialPosition
    of the stream to
    TRIM_HORIZON
    , I see in the debug output that it's receiving, processing, and sinking all the historical events in the last 24 hours... but once it reaches the end, it is completely stuck. I make requests to the DDB table that result in new CDC events, I see in the output that it receives the event immediately, does all the stateless processing (all the filters and maps before the windowing), and then it just disappears and nothing happens. I have logging in the
    MetricAggregator
    and my custom sink, and neither logs get triggered for these new events. When I set the
    InitialPosition
    to
    LATEST
    and feed new events, it flows through fine! (I still have to submit a new change before the previous change gets fully processed, despite the idleness setting in the watermark strategy, but at least I can get it to process.) My custom sink is modeled closely after the existing DDB connector, and has these properties:
    Copy code
    maxBatchSize.       = 10
    maxInFlightRequests = 20
    maxBufferedRequests = 1000
    maxTimeInBufferMS.  = 5000
    Just adding this in case it's relevant but I don't think it should matter. Based on what I've read on watermarking, I assume this has to do with the strategy and specifically the idleness, which I set arbitrarily low, but it doesn't seem to matter/do anything (with either
    TRIM_HORIZON
    or
    LATEST
    ). I've tried both built-in watermarking strategies and have the same problem. Any ideas? Thanks.
    • 1
    • 2
  • r

    raphaelauv

    08/27/2025, 4:34 PM
    flink kafka table api Hi, is it possible to set a custom class in TABLE API for a kafka sink for the serialization of the value ? the same way I can do it with setValueSerializationSchema in JAVA API ? here
    SafeConfluentRegistryAvroSerializationSchema
    is my custom serialization class
    Copy code
    KafkaSink.<Car>>builder()
            .setRecordSerializer(
                    KafkaRecordSerializationSchema.builder()
                            .setValueSerializationSchema(
                                    new SafeConfluentRegistryAvroSerializationSchema(
                                            Car.class,
                                            "flink-output-value",
                                            "<http://confluent-schema-registry-local:8081>",
                                            sr_settings
                                    ))
    thanks all
  • g

    George Leonard

    08/28/2025, 6:28 AM
    just realised i maybe should have posted my query for assistance here. pls see: in the flink-cdc channel https://apache-flink.slack.com/archives/C076E3QGX8A/p1756362179848529
  • r

    Rushikesh Gulve

    08/28/2025, 7:08 AM
    Hi everyone, I am trying to deploy a pyflink application using Kubernetes operator. My purpose is to have a parallelism of 1, and I want to distribute different subtasks among multiple task managers. So, ultimately something like 7 task managers handling different tasks of a single job with parallelism 1. I tried multiple configuration to achieve this but I end up getting task managers equal to the parallelism which is not my goal. Is it possible to achieve this configuration? If yes, how??
  • u

    Urs Schoenenberger

    08/28/2025, 8:36 AM
    Hi folks, for the Kafka Sink - is it possible to enforce that the "Sink: Committer" will be chained to the "Sink: Writer"? My job graph always has the committers attached via a forward, rather than chained.
  • g

    George Leonard

    08/28/2025, 10:07 AM
    RESOLVED 🎉 ... case matters.
  • g

    George Leonard

    08/28/2025, 10:07 AM
    😉
  • g

    George Leonard

    08/28/2025, 11:03 AM
    ... why would i be getting data delivered to the flink writer (output to Paimon) but it gets stuck there and does not go further.
    Copy code
    INSERT INTO c_paimon.outbound.children
    SELECT 
        JSON_VALUE(data, '$._id') as _id,
        JSON_VALUE(data, '$.name') as name,
        JSON_VALUE(data, '$.surname') as surname,
        JSON_VALUE(data, '$.gender') as gender,
        JSON_VALUE(data, '$.dob') as dob,
        JSON_VALUE(data, '$.nationalid') as nationalid,
        JSON_VALUE(data, '$.family_id') as family_id,
        JSON_VALUE(data, '$.father_nationalid') as father_nationalid,
        JSON_VALUE(data, '$.mother_nationalid') as mother_nationalid,
        ROW(
            JSON_VALUE(data, '$.address.street_1'),
            JSON_VALUE(data, '$.address.street_2'),
            JSON_VALUE(data, '$.address.neighbourhood'),
            JSON_VALUE(data, '$.address.town'),
            JSON_VALUE(data, '$.address.county'),
            JSON_VALUE(data, '$.address.province'),
            JSON_VALUE(data, '$.address.country'),
            JSON_VALUE(data, '$.address.country_code'),
            JSON_VALUE(data, '$.address.postal_code'),
            JSON_VALUE(data, '$.address.parcel_id')
        ) as address,
        created_at
    FROM postgres_catalog.inbound.children;
    a
    • 2
    • 2
  • g

    George Leonard

    08/28/2025, 1:26 PM
    hi hi all. I'm stuck trying to move some data around. please see the attached. I can see my problem is the insert statement, and it's having problems with the array of accounts... not sure how to fix. attached is my code from pg source db, through to cdc table in flink and then out to paimon, including an example payload.
    code.mdadults.json
  • g

    George Leonard

    08/29/2025, 5:42 AM
    i'd like to unpack my original postgress structure into a flink and into a paimon structured record. pls help
  • j

    Jashwanth S J

    08/29/2025, 8:57 AM
    Hi Team, We are currently using Apache Flink 1.20.2 and planning to upgrade to Java 21 for our next release. But Flink’s Docker Hub does not offer a Flink 1.20.2 image with Java 21, only versions up to Java 17 are available. What are our options to move forward?
    a
    • 2
    • 4
  • g

    George Leonard

    08/29/2025, 12:06 PM
    have something strange. kick off 2 inserts from flink tables into paimon based tables, run fine for some minutes. using 2 task slots out of 60 available. and then suddenly it all goes bad and starts failing and then eat up task slots until it runs out and then flink dies... at this point i need to kill jobs and restart flink stack.
    jobmanager_log.txt
  • v

    Vikas Patil

    08/30/2025, 10:26 PM
    Is using the prometheus endpoint of a taskmanager as a liveness probe alright ? Or does it have any inherent risks ? We are not able to use the recommended TCP probes as that does not seem to detect JVM stalls. Any insights on this ?
  • o

    Or Keren

    09/01/2025, 6:48 AM
    Does anyone know if there's a due date for the full release of disaggregated state?
  • z

    Zeyu Qiu

    09/01/2025, 8:02 AM
    Hi team, I’m using Flink CDC + Hudi to transfer data from MySQL to AWS S3. But I met some problem. My Flink job looks like:
    Copy code
    parallelism = 1
        env = StreamExecutionEnvironment.get_execution_environment(config)
        env.set_parallelism(parallelism)  # I don't know what's that, try 2
        env.enable_checkpointing(10 * 60 * 1000)  # milliseconds
        # checkpoints have to complete within one minute, or are discarded
        env.get_checkpoint_config().set_checkpoint_timeout(int(10 * 60 * 1000))
        env.get_checkpoint_config().set_checkpointing_mode(
            CheckpointingMode.EXACTLY_ONCE
        )
        env.disable_operator_chaining()  # If drop this line, create multiple pipeline in one python job will raise error in Flink GUI
        settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
        t_env = StreamTableEnvironment.create(env, environment_settings=settings)
        ss = t_env.create_statement_set()
    
        source_sql = f"""
                            CREATE TABLE mysql_source (
                                db_name STRING METADATA FROM 'database_name' VIRTUAL,
                                table_name STRING METADATA  FROM 'table_name' VIRTUAL,
                                id BIGINT,
                                content STRING,
                                update_time TIMESTAMP(3)
                            ) WITH (
                                'connector' = 'mysql-cdc',
                                'hostname' = '{db_secret["host"]}',
                                'port' = '{db_secret["port"]}',
                                'username' = '{db_secret["username"]}',
                                'password' = '{db_secret["password"]}',
                                'database-name' = 'my_db',
                                'table-name' = 'my_table',
                                'server-id' = '{server_id}',
                                'debezium.snapshot.mode' = 'schema_only',
                                'scan.startup.mode' = 'timestamp',
                                'scan.startup.timestamp-millis' = '{binlog_start_time}', 
                                'scan.incremental.snapshot.enabled' = 'true',
                                'scan.incremental.snapshot.chunk.key-column' = 'id'
                            );
                        """
        t_env.execute_sql(source_sql)
    
        sink_sql = f"""
                        CREATE TABLE hudi_sink (
                            id BIGINT,
                            content STRING,
                            update_time TIMESTAMP(3),
                            hudi_ts double,
                            PRIMARY KEY (id) NOT ENFORCED
                        ) WITH (
                            'connector' = 'hudi',
                            'path' = '<s3a://xxxx>',
                            'table.type' = 'COPY_ON_WRITE',
                            'write.precombine.field' = 'hudi_ts',
                            'write.operation' = 'upsert',
                            'hoodie.datasource.write.recordkey.field' = 'id',
                            'hoodie.datasource.write.partitionpath.field' = '',
                            'write.tasks' = '1',
                            'compaction.tasks' = '1',
                            'clean.retain_commits' = '6',
                            'hoodie.keep.min.commits' = '7',
                            'hoodie.keep.max.commits' = '8',
                            'compaction.async.enabled' = 'true',
                            'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.InProcessLockProvider',
                            'index.partition.regex' = 'false',
                            'index.bootstrap.enabled' = 'false',
                            'write.rate.limit' = '2000'
                        );
                    """
        t_env.execute_sql(sink_sql)
        insert_sql = f"""
                                    INSERT INTO hudi_sink
                                    SELECT id,content,update_time, 
                                    UNIX_TIMESTAMP() as hudi_ts FROM mysql_source where db_name = 'my_db' 
                                    and table_name = 'my_table';
                                """
        ss.add_insert_sql(insert_sql)
        ss.attach_as_datastream()
        env.execute(f'mysql_cdc_to_hudi')
    After I submit this job to Flink, I expect the Hudi table in S3 could have real-time data same as MySQL table. But in fact, when I execute an update statement in MySQL, there will be 2 duplicate rows appear in Hudi table. For example here’s data in MySQL:
    Copy code
    id       |content     |update_time
    1        |a           |2025-08-01 00:00:00
    And I execute:
    Copy code
    update my_table set content = 'b' where id = 1
    Then the data in Hudi table will looks like that:
    Copy code
    id       |content     |update_time
    1        |a           |2025-08-01 00:00:00
    1        |b           |2025-09-01 00:00:00
    Do you guys have any idea about why that’s happening? As I’ve set
    'hoodie.datasource.write.recordkey.field' = 'id'
    , I suppose there shouldn’t be duplicate row in the Hudi table. And this error cannot be reproduced stably. Sometimes the result is normal, and sometimes there is an error. This is especially likely to occur when there are multiple Hudi sinks in a job. I’m using
    Flink 1.16.2
    ,
    Hudi 0.13.0
    ,
    FlinkCDC 3.1.1
    In addition, I found some log in Flink like:
    Copy code
    2025-09-01 01:10:20,612 INFO  org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.
    Not sure if this related to the problem I met
  • g

    George Leonard

    09/01/2025, 12:24 PM
    hi all.. anyone willing to help, have a jsonb (postgres) column thats being converted to a string, the string contains a root lefel fields, one of them is a doc (no issues), other is a field/doc thats an array of docs. need it all flipped into a flink table complex structure. tried a couple of things and not coming right.