https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    Jashwanth S J

    08/29/2025, 8:57 AM
    Hi Team, We are currently using Apache Flink 1.20.2 and planning to upgrade to Java 21 for our next release. But Flink’s Docker Hub does not offer a Flink 1.20.2 image with Java 21, only versions up to Java 17 are available. What are our options to move forward?
    a
    • 2
    • 4
  • g

    George Leonard

    08/29/2025, 12:06 PM
    have something strange. kick off 2 inserts from flink tables into paimon based tables, run fine for some minutes. using 2 task slots out of 60 available. and then suddenly it all goes bad and starts failing and then eat up task slots until it runs out and then flink dies... at this point i need to kill jobs and restart flink stack.
    jobmanager_log.txt
  • v

    Vikas Patil

    08/30/2025, 10:26 PM
    Is using the prometheus endpoint of a taskmanager as a liveness probe alright ? Or does it have any inherent risks ? We are not able to use the recommended TCP probes as that does not seem to detect JVM stalls. Any insights on this ?
  • o

    Or Keren

    09/01/2025, 6:48 AM
    Does anyone know if there's a due date for the full release of disaggregated state?
  • z

    Zeyu Qiu

    09/01/2025, 8:02 AM
    Hi team, I’m using Flink CDC + Hudi to transfer data from MySQL to AWS S3. But I met some problem. My Flink job looks like:
    Copy code
    parallelism = 1
        env = StreamExecutionEnvironment.get_execution_environment(config)
        env.set_parallelism(parallelism)  # I don't know what's that, try 2
        env.enable_checkpointing(10 * 60 * 1000)  # milliseconds
        # checkpoints have to complete within one minute, or are discarded
        env.get_checkpoint_config().set_checkpoint_timeout(int(10 * 60 * 1000))
        env.get_checkpoint_config().set_checkpointing_mode(
            CheckpointingMode.EXACTLY_ONCE
        )
        env.disable_operator_chaining()  # If drop this line, create multiple pipeline in one python job will raise error in Flink GUI
        settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
        t_env = StreamTableEnvironment.create(env, environment_settings=settings)
        ss = t_env.create_statement_set()
    
        source_sql = f"""
                            CREATE TABLE mysql_source (
                                db_name STRING METADATA FROM 'database_name' VIRTUAL,
                                table_name STRING METADATA  FROM 'table_name' VIRTUAL,
                                id BIGINT,
                                content STRING,
                                update_time TIMESTAMP(3)
                            ) WITH (
                                'connector' = 'mysql-cdc',
                                'hostname' = '{db_secret["host"]}',
                                'port' = '{db_secret["port"]}',
                                'username' = '{db_secret["username"]}',
                                'password' = '{db_secret["password"]}',
                                'database-name' = 'my_db',
                                'table-name' = 'my_table',
                                'server-id' = '{server_id}',
                                'debezium.snapshot.mode' = 'schema_only',
                                'scan.startup.mode' = 'timestamp',
                                'scan.startup.timestamp-millis' = '{binlog_start_time}', 
                                'scan.incremental.snapshot.enabled' = 'true',
                                'scan.incremental.snapshot.chunk.key-column' = 'id'
                            );
                        """
        t_env.execute_sql(source_sql)
    
        sink_sql = f"""
                        CREATE TABLE hudi_sink (
                            id BIGINT,
                            content STRING,
                            update_time TIMESTAMP(3),
                            hudi_ts double,
                            PRIMARY KEY (id) NOT ENFORCED
                        ) WITH (
                            'connector' = 'hudi',
                            'path' = '<s3a://xxxx>',
                            'table.type' = 'COPY_ON_WRITE',
                            'write.precombine.field' = 'hudi_ts',
                            'write.operation' = 'upsert',
                            'hoodie.datasource.write.recordkey.field' = 'id',
                            'hoodie.datasource.write.partitionpath.field' = '',
                            'write.tasks' = '1',
                            'compaction.tasks' = '1',
                            'clean.retain_commits' = '6',
                            'hoodie.keep.min.commits' = '7',
                            'hoodie.keep.max.commits' = '8',
                            'compaction.async.enabled' = 'true',
                            'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.InProcessLockProvider',
                            'index.partition.regex' = 'false',
                            'index.bootstrap.enabled' = 'false',
                            'write.rate.limit' = '2000'
                        );
                    """
        t_env.execute_sql(sink_sql)
        insert_sql = f"""
                                    INSERT INTO hudi_sink
                                    SELECT id,content,update_time, 
                                    UNIX_TIMESTAMP() as hudi_ts FROM mysql_source where db_name = 'my_db' 
                                    and table_name = 'my_table';
                                """
        ss.add_insert_sql(insert_sql)
        ss.attach_as_datastream()
        env.execute(f'mysql_cdc_to_hudi')
    After I submit this job to Flink, I expect the Hudi table in S3 could have real-time data same as MySQL table. But in fact, when I execute an update statement in MySQL, there will be 2 duplicate rows appear in Hudi table. For example here’s data in MySQL:
    Copy code
    id       |content     |update_time
    1        |a           |2025-08-01 00:00:00
    And I execute:
    Copy code
    update my_table set content = 'b' where id = 1
    Then the data in Hudi table will looks like that:
    Copy code
    id       |content     |update_time
    1        |a           |2025-08-01 00:00:00
    1        |b           |2025-09-01 00:00:00
    Do you guys have any idea about why that’s happening? As I’ve set
    'hoodie.datasource.write.recordkey.field' = 'id'
    , I suppose there shouldn’t be duplicate row in the Hudi table. And this error cannot be reproduced stably. Sometimes the result is normal, and sometimes there is an error. This is especially likely to occur when there are multiple Hudi sinks in a job. I’m using
    Flink 1.16.2
    ,
    Hudi 0.13.0
    ,
    FlinkCDC 3.1.1
    In addition, I found some log in Flink like:
    Copy code
    2025-09-01 01:10:20,612 INFO  org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.
    Not sure if this related to the problem I met
  • g

    George Leonard

    09/01/2025, 12:24 PM
    hi all.. anyone willing to help, have a jsonb (postgres) column thats being converted to a string, the string contains a root lefel fields, one of them is a doc (no issues), other is a field/doc thats an array of docs. need it all flipped into a flink table complex structure. tried a couple of things and not coming right.
  • j

    Jashwanth S J

    09/02/2025, 4:27 AM
    Hi All, We're seeing one weird issue while brining up flinksession job through operator. We're currently using AWS signed URL for jar access through S3 which is working fine, but when we replace it with just s3 endpoint without public access, it is failing. Can someone help here?
  • g

    George Leonard

    09/02/2025, 9:10 AM
    Hi all. Anyone willing to step in and help me please. https://apache-flink.slack.com/archives/C03JKTFFX0S/p1756706228476759
  • m

    Mohammed Salman Ali Pary

    09/02/2025, 10:24 AM
    Is there a way to add operatorid in flink sql?
    l
    • 2
    • 1
  • l

    L P V

    09/02/2025, 3:50 PM
    Hi all, I'm facing a problem when create a catalog from Flink SQL client -> the catalog just exist in a local context, even when I'm using HiveMetastore (edited) So everytime I'm create a new Flink SQL session, I have to re-create a new catalog. This is my sample catalog in Flink Sql client:
    Copy code
    CREATE CATALOG iceberg_catalog WITH (
       'type' = 'iceberg',
       'catalog-type' = 'hive',
       'uri' = '<thrift://hivems.lakehouse.db-prod:9083>'
    )
    Is there anyway I could use only 1 catalog which persistent with HMS?
    e
    r
    • 3
    • 5
  • a

    Alexei Pozdniakov

    09/02/2025, 10:44 PM
    Hello there! I've been studying about Flink for a while, especially about watermarks. I think I've read all the docs, but I still have some questions. Can you help me? 1. I declare "WATERMARK FOR column AS column - 5 sec", where "column" stands for the column name with time semantics, and "column - 5 sec" stands for a watermark strategy. Why do we need to write the column name (after "FOR") separately? How will it be used and checked in the future? 2. Will it be checked if I write a streaming operation over a column without a watermark? 3. Are there any theoretical limitations to the watermark strategy (after "AS")? I've just found out that the strategy's output type should be a timestamp. Can I use something more complex here, e.g., "column + interval" or "column1 + column2" or "min(column, now())"? I know that those expressions are meaningless, but nevertheless
    l
    r
    • 3
    • 2
  • e

    Elad

    09/03/2025, 1:54 PM
    Hello! I'm writing a flink process that writes to two Kafka sinks different data - events and CRUD operations. I'm getting an error of found duplicated "transactionalIdPrefix for multiple Kafka sinks: null" on running, BUT - I'm not using any transactions (enable idempotence is false, there is no transaction config and the delivery guarantee is none). It seems like it happend because of the WriteableBackChannel that is used for communication between the commiter and the writer, but it relies on the transactionalIdPrefix even if it's not configured. The even bigger problem is, that even when I'm trying to configure this property, it looks like the same error is occuring, just without the null but with one of the transactionalIdPrefix that I set for the sinks (they are different... 🥴) Can anyone help?
    l
    • 2
    • 4
  • v

    Vadim Vararu

    09/04/2025, 8:56 AM
    Hi all. We've got a Flink job that uses state and works well ~2-3 days. After that period, suddenly it starts to constantly fail at checkpoint with no chance to recover it. We've found the exception thrown by the process which leads to TM pod killed by K8s:
    Copy code
    2025-09-04 08:20:53,380 INFO  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo serializer scala extensions are not available.
    Exception in thread "Thread-57" java.lang.IllegalArgumentException: classLoader cannot be null.
    	at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
    	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:553)
    	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:394)
    	at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
    	at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
    	at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
    #
    # A fatal error has been detected by the Java Runtime Environment:
    #
    #  SIGSEGV (0xb) at pc=0x0000ffff9440bb90, pid=1, tid=391
    #
    # JRE version: OpenJDK Runtime Environment Temurin-11.0.26+4 (11.0.26+4) (build 11.0.26+4)
    # Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.26+4 (11.0.26+4, mixed mode, sharing, tiered, compressed oops, serial gc, linux-aarch64)
    # Problematic frame:
    # V  [libjvm.so+0x5fbb90]  Exceptions::_throw_oop(Thread*, char const*, int, oopDesc*)+0x180
    #
    # Core dump will be written. Default location: /opt/flink/core.1
    #
    # An error report file with more information is saved as:
    # /opt/flink/hs_err_pid1.log
    #
    # If you would like to submit a bug report, please visit:
    #   <https://github.com/adoptium/adoptium-support/issues>
    #
    
    [error occurred during error reporting (), id 0x5, SIGTRAP (0x5) at pc=0x0000ffff958971ec]
    Looks like something that happens during the RocksDb compaction... 😞 The FLink version - 1.18.1
  • h

    Harish Sharma

    09/04/2025, 12:26 PM
    Hi Team, I am getting following error with the SQLServer CDC
    Copy code
    2025-09-04 14:51:26,611 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Flink CDC Event Source: sqlserver -> SchemaOperator -> PrePartition (1/2) (ee122e9200a52b9ee0b7a74d889052cb_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on localhost:55664-e801e4 @ localhost (dataPort=55667).
    java.lang.RuntimeException: One or more fetchers have encountered exception
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist-1.20.1.jar:1.20.1]
    	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
    Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the records
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    	... 1 more
    Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={commit_lsn=0000002a:00000d28:009c, change_lsn=0000002a:00000d28:009c}, endOffset={change_lsn=7f}, isSuspended=false, isSnapshotCompleted=true} error due to Currently unsupported by the SQL Server connector.
    	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    	... 1 more
    Caused by: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={commit_lsn=0000002a:00000d28:009c, change_lsn=0000002a:00000d28:009c}, endOffset={change_lsn=7f}, isSuspended=false, isSnapshotCompleted=true} error due to Currently unsupported by the SQL Server connector.
    	at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.checkReadException(IncrementalSourceStreamFetcher.java:137) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:115) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:192) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    	... 1 more
    Caused by: java.lang.UnsupportedOperationException: Currently unsupported by the SQL Server connector
    	at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:113) ~[debezium-connector-sqlserver-1.9.8.Final.jar:1.9.8.Final]
    	at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:168) ~[flink-connector-sqlserver-cdc-local-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:77) ~[flink-connector-sqlserver-cdc-local-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    	... 1 more
  • g

    George Leonard

    09/04/2025, 1:50 PM
    in version 1.20.2
    Copy code
    Flink SQL> SELECT JSON_ARRAY(JSON('{"nested_json": {"value": 42}}'));
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature JSON(<CHARACTER>)
    
    Flink SQL>
    g
    • 2
    • 2
  • j

    Jaya Ananthram

    09/05/2025, 7:06 AM
    Hello 👋 Question related to MSF: I want to process some data from MSK and write it back to MSK in at-least-once mode. However, I have not found a way to configure MSF to run in at-least-once mode, as the default is exactly-once and cannot be modified here. Does this mean there are no options at all for running MSF in at least once mode?
    • 1
    • 1
  • f

    Francis Altomare

    09/05/2025, 7:19 AM
    Hey everyone 👋 — I have a question about using a
    TumblingProcessingTimeWindow
    and a reduce function to ensure only a single event within my window is emitted at a time 🧵.
    e
    • 2
    • 2
  • m

    Michał Slabik

    09/05/2025, 6:14 PM
    Hello, I'm using Flink with multiple Kafka sinks with checkpointing enabled. After bumping to version 1.20.2 (from 1.19.1), my application started to log warnings:
    Name collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported.
    I found two related bug tickets: https://issues.apache.org/jira/browse/FLINK-35321, https://issues.apache.org/jira/browse/FLINK-37559. Those warnings started after introducing this change: https://issues.apache.org/jira/browse/FLINK-36455, and were applied to versions 2.0.0, 1.19.2, 1.20.1. As this setup seems rather common, I'm wondering if maybe this is related to some configuration (whether sink or checkpointing) on my side. Could you help find a solution to mitigate those warnings (maybe it can be safely ignored?) @Arvid Heise allow me to tag you as you are the author of the mentioned changes - maybe you'll have some insights on that.
  • c

    Chennupati Gopal

    09/07/2025, 2:55 AM
    Hi all I’m running flink cluster in application mode in aws EKS cluster, want to expose jmx metrics on job and task managers, can someone please help me with the MBean configurations, I don’t want that native flink metrics exporter.
    p
    • 2
    • 2
  • m

    Mohammed Salman Ali Pary

    09/08/2025, 8:34 AM
    https://stackoverflow.com/questions/79758622/how-do-i-reset-offset-for-a-topic-in-flink-sql-and-kafka Does someone know the answer?
    v
    p
    e
    • 4
    • 5
  • j

    Jo Wijnant

    09/09/2025, 7:50 AM
    I'm testing the new feature for faster checkpoint recovery using s5cmd on Flink 2.0 but I get an error when my jobs are initializing and recovering their data:
    s3 url should start with "s3://"
    Here's the relevant configurations (I left out some configs):
    Copy code
    s3:
      s5cmd:
      path: /opt/flink/bin/s5cmd
    execution:
      checkpointing:
        dir: <s3p://flink-p/checkpoints>
    I use s3p filesystem for checkpointing and s3a for sinking to parquet files on s3 but it seems pretty clear that s5cmd needs a proper s3:// url
  • g

    George Leonard

    09/09/2025, 2:23 PM
    ... can anyone see anything wrong with this...
    Copy code
    it's failing on 'CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>)
    trying to get the array of accounts into a json object, More than happy to pre create the table as a complex table if need be and then do a insert into select... actually prefer that pattern, just running out of ideas how to get this done. and my eyes are seeing double from reading docs. -- postgres_catalog.inbound.adultsx -> Target table
    Copy code
    create table c_paimon.outbound.adultsx WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    select
         JSON_VALUE(data, '$.nationalid')       AS nationalid
        ,JSON_VALUE(data, '$._id')              AS _id
        ,JSON_VALUE(data, '$.name')             AS name
        ,JSON_VALUE(data, '$.surname')          AS surname
        ,JSON_VALUE(data, '$.gender')           AS gender
        ,JSON_VALUE(data, '$.dob')              AS dob
        ,JSON_VALUE(data, '$.marital_status')   AS marital_status
        ,JSON_VALUE(data, '$.status')           AS status
        ,JSON_QUERY(data, '$.address')          AS address
        ,CAST(JSON_QUERY(data, '$.account')     AS ARRAY<STRING>) AS account
        ,created_at                             AS created_at
    FROM postgres_catalog.inbound.adults;
    I'm consuming the data see atached, from Postgres using CDC into the below table. -- postgres_catalog.inbound.adults -> CDC Source
    Copy code
    CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
         id                 BIGINT
        ,nationalid         VARCHAR(14) --NOT NULL
        ,data               STRING     -- The json as per attached sit in here...
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (nationalid) NOT ENFORCED
    ) WITH (
          'connector'                           = 'postgres-cdc'
         ,'hostname'                            = 'postgrescdc'
         ,'port'                                = '5432'               -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping.
         ,'username'                            = 'dbadmin'
         ,'password'                            = 'dbpassword'
         ,'database-name'                       = 'demog'
         ,'schema-name'                         = 'public'
         ,'table-name'                          = 'adults'
         ,'slot.name'                           = 'adults0'
         ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
         ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>
         ,'decoding.plugin.name'                = 'pgoutput'
    );
    adults.json
  • g

    George Leonard

    09/10/2025, 4:46 AM
    ... wondering...do we have something funny at the data layer, as the command looks right, and even the err does not hint to a bad command.
    Copy code
    Flink SQL> create table c_paimon.outbound.adultsxx WITH (
    >      'file.format'                       = 'parquet'
    >     ,'compaction.min.file-num'           = '2'
    >     ,'compaction.early-max.file-num'     = '50'
    >     ,'snapshot.time-retained'            = '1h'
    >     ,'snapshot.num-retained.min'         = '5'
    >     ,'snapshot.num-retained.max'         = '20'
    >     ,'table.exec.sink.upsert-materialize'= 'NONE'
    > ) AS 
    > select
    >      JSON_VALUE(data, '$.nationalid')       AS nationalid
    >     ,JSON_VALUE(data, '$._id')              AS _id
    >     ,JSON_VALUE(data, '$.name')             AS name
    >     ,JSON_VALUE(data, '$.surname')          AS surname
    >     ,JSON_VALUE(data, '$.gender')           AS gender
    >     ,JSON_VALUE(data, '$.dob')              AS dob
    >     ,JSON_VALUE(data, '$.marital_status')   AS marital_status
    >     ,JSON_VALUE(data, '$.status')           AS status
    >     ,CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>) as account
    >     ,JSON_QUERY(data, '$.address')          AS address
    >     ,created_at                             AS created_at
    > FROM postgres_catalog.inbound.adults;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type VARCHAR(2147483647) to type VARCHAR(2147483647) ARRAY
    e
    • 2
    • 9
  • g

    George Leonard

    09/10/2025, 4:51 AM
    Example payload: piped into paimon via another route.
  • j

    Jo Wijnant

    09/10/2025, 2:16 PM
    About the variable checkpoint interval introduced in Flink 1.19 (https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog) How can I verify that this is effectively working? I restarted my job from scratch, so it is reading from a large Kafka topic where the oldest records have timestamps from months ago. The job may take a few hours to catch up with the latest kafka offsets. I configured checkpointing in the SQL job:
    Copy code
    SET 'execution.checkpointing.interval' = '3m';
    SET 'execution.checkpointing.interval-during-backlog' = '10m';
    However, all checkpoints are taken every 3m. Is this the expected behavior? I'd thought it would checkpoint every 10m in the first phase... I'm on Flink 2.0
    l
    • 2
    • 2
  • p

    Pavan

    09/11/2025, 7:47 AM
    Hey everyone 👋 - I'm running a flink cluster in standalone mode in local system to stream CDC changes for mysql using
    flink-sql-connector-mysql-cdc
    everything works fine until I push an insert statement on a table which is captured by flink, following error pops up and after continuous retries of the command the error disappears I'm new to flink and still learning how CDC connectors work. Thanks in advance.
    Copy code
    java.lang.RuntimeException: One or more fetchers have encountered exception
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
        at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
        at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.base/java.lang.Thread.run(Thread.java:842)
    Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        ... 1 more
    Caused by: java.lang.NullPointerException: Cannot read field "value" because "s2" is null
        at java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:2055)
        at java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:2047)
        at java.base/java.lang.String.compareToIgnoreCase(String.java:2091)
        at org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.compareTo(BinlogOffset.java:241)
        at org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.isAfter(BinlogOffset.java:272)
        at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.hasEnterPureBinlogPhase(BinlogSplitReader.java:295)
        at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.shouldEmit(BinlogSplitReader.java:254)
        at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.pollSplitRecords(BinlogSplitReader.java:176)
        at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:150)
        at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
        at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        ... 6 more
  • p

    Pavan

    09/11/2025, 7:48 AM
    If anyone came across a similar issue please point me to any resources you beleive will help
  • j

    Jashwanth S J

    09/11/2025, 10:45 AM
    I've installed plugin on JM and TM through common podtemplate init container, but still seeing this error to bring up flinksessionjob through jar from s3. DO we need to install plugin in Flink Kubernetes operator? If yes, how do we do that through values file for helm provided to install operator from community?
    Copy code
    Events:
      Type     Reason               Age                    From  Message
      ----     ------               ----                   ----  -------
      Warning  SessionJobException  3m32s (x117 over 77m)  Job   Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
    m
    • 2
    • 7
  • a

    Antonio Davide Cali

    09/11/2025, 1:20 PM
    Hello everyone, out of curiosity, I am trying to do a Pattern Matching on Flink where I want to match as many events of a specific type A (A+) but I want to find a pattern where B? event B doesn't appear at all in a 30 minutes window since the last A. my pattern (A+ B?) complains because it says optional cannot be after greedy. any idea?
  • b

    B S Mohammed Ashfaq

    09/13/2025, 1:37 PM
    Hi Everyone, Could you please let me know where I can find the list of Flink SQL-supported sources and sinks along with the respective flink version compatibility? Thanks!