https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • g

    George Leonard

    09/01/2025, 12:24 PM
    hi all.. anyone willing to help, have a jsonb (postgres) column thats being converted to a string, the string contains a root lefel fields, one of them is a doc (no issues), other is a field/doc thats an array of docs. need it all flipped into a flink table complex structure. tried a couple of things and not coming right.
  • j

    Jashwanth S J

    09/02/2025, 4:27 AM
    Hi All, We're seeing one weird issue while brining up flinksession job through operator. We're currently using AWS signed URL for jar access through S3 which is working fine, but when we replace it with just s3 endpoint without public access, it is failing. Can someone help here?
  • g

    George Leonard

    09/02/2025, 9:10 AM
    Hi all. Anyone willing to step in and help me please. https://apache-flink.slack.com/archives/C03JKTFFX0S/p1756706228476759
  • m

    Mohammed Salman Ali Pary

    09/02/2025, 10:24 AM
    Is there a way to add operatorid in flink sql?
    l
    • 2
    • 1
  • l

    L P V

    09/02/2025, 3:50 PM
    Hi all, I'm facing a problem when create a catalog from Flink SQL client -> the catalog just exist in a local context, even when I'm using HiveMetastore (edited) So everytime I'm create a new Flink SQL session, I have to re-create a new catalog. This is my sample catalog in Flink Sql client:
    Copy code
    CREATE CATALOG iceberg_catalog WITH (
       'type' = 'iceberg',
       'catalog-type' = 'hive',
       'uri' = '<thrift://hivems.lakehouse.db-prod:9083>'
    )
    Is there anyway I could use only 1 catalog which persistent with HMS?
    e
    r
    • 3
    • 5
  • a

    Alexei Pozdniakov

    09/02/2025, 10:44 PM
    Hello there! I've been studying about Flink for a while, especially about watermarks. I think I've read all the docs, but I still have some questions. Can you help me? 1. I declare "WATERMARK FOR column AS column - 5 sec", where "column" stands for the column name with time semantics, and "column - 5 sec" stands for a watermark strategy. Why do we need to write the column name (after "FOR") separately? How will it be used and checked in the future? 2. Will it be checked if I write a streaming operation over a column without a watermark? 3. Are there any theoretical limitations to the watermark strategy (after "AS")? I've just found out that the strategy's output type should be a timestamp. Can I use something more complex here, e.g., "column + interval" or "column1 + column2" or "min(column, now())"? I know that those expressions are meaningless, but nevertheless
    l
    r
    • 3
    • 2
  • e

    Elad

    09/03/2025, 1:54 PM
    Hello! I'm writing a flink process that writes to two Kafka sinks different data - events and CRUD operations. I'm getting an error of found duplicated "transactionalIdPrefix for multiple Kafka sinks: null" on running, BUT - I'm not using any transactions (enable idempotence is false, there is no transaction config and the delivery guarantee is none). It seems like it happend because of the WriteableBackChannel that is used for communication between the commiter and the writer, but it relies on the transactionalIdPrefix even if it's not configured. The even bigger problem is, that even when I'm trying to configure this property, it looks like the same error is occuring, just without the null but with one of the transactionalIdPrefix that I set for the sinks (they are different... 🥴) Can anyone help?
    l
    • 2
    • 4
  • v

    Vadim Vararu

    09/04/2025, 8:56 AM
    Hi all. We've got a Flink job that uses state and works well ~2-3 days. After that period, suddenly it starts to constantly fail at checkpoint with no chance to recover it. We've found the exception thrown by the process which leads to TM pod killed by K8s:
    Copy code
    2025-09-04 08:20:53,380 INFO  org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo serializer scala extensions are not available.
    Exception in thread "Thread-57" java.lang.IllegalArgumentException: classLoader cannot be null.
    	at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
    	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:553)
    	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:394)
    	at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
    	at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
    	at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
    #
    # A fatal error has been detected by the Java Runtime Environment:
    #
    #  SIGSEGV (0xb) at pc=0x0000ffff9440bb90, pid=1, tid=391
    #
    # JRE version: OpenJDK Runtime Environment Temurin-11.0.26+4 (11.0.26+4) (build 11.0.26+4)
    # Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.26+4 (11.0.26+4, mixed mode, sharing, tiered, compressed oops, serial gc, linux-aarch64)
    # Problematic frame:
    # V  [libjvm.so+0x5fbb90]  Exceptions::_throw_oop(Thread*, char const*, int, oopDesc*)+0x180
    #
    # Core dump will be written. Default location: /opt/flink/core.1
    #
    # An error report file with more information is saved as:
    # /opt/flink/hs_err_pid1.log
    #
    # If you would like to submit a bug report, please visit:
    #   <https://github.com/adoptium/adoptium-support/issues>
    #
    
    [error occurred during error reporting (), id 0x5, SIGTRAP (0x5) at pc=0x0000ffff958971ec]
    Looks like something that happens during the RocksDb compaction... 😞 The FLink version - 1.18.1
  • h

    Harish Sharma

    09/04/2025, 12:26 PM
    Hi Team, I am getting following error with the SQLServer CDC
    Copy code
    2025-09-04 14:51:26,611 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Flink CDC Event Source: sqlserver -> SchemaOperator -> PrePartition (1/2) (ee122e9200a52b9ee0b7a74d889052cb_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on localhost:55664-e801e4 @ localhost (dataPort=55667).
    java.lang.RuntimeException: One or more fetchers have encountered exception
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-dist-1.20.1.jar:1.20.1]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist-1.20.1.jar:1.20.1]
    	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
    Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the records
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    	... 1 more
    Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={commit_lsn=0000002a:00000d28:009c, change_lsn=0000002a:00000d28:009c}, endOffset={change_lsn=7f}, isSuspended=false, isSnapshotCompleted=true} error due to Currently unsupported by the SQL Server connector.
    	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    	... 1 more
    Caused by: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={commit_lsn=0000002a:00000d28:009c, change_lsn=0000002a:00000d28:009c}, endOffset={change_lsn=7f}, isSuspended=false, isSnapshotCompleted=true} error due to Currently unsupported by the SQL Server connector.
    	at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.checkReadException(IncrementalSourceStreamFetcher.java:137) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:115) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:192) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    	... 1 more
    Caused by: java.lang.UnsupportedOperationException: Currently unsupported by the SQL Server connector
    	at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:113) ~[debezium-connector-sqlserver-1.9.8.Final.jar:1.9.8.Final]
    	at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:168) ~[flink-connector-sqlserver-cdc-local-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:77) ~[flink-connector-sqlserver-cdc-local-3.4.0.jar:3.4.0]
    	at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ~[flink-cdc-base-3.4.0.jar:3.4.0]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
    	... 1 more
  • g

    George Leonard

    09/04/2025, 1:50 PM
    in version 1.20.2
    Copy code
    Flink SQL> SELECT JSON_ARRAY(JSON('{"nested_json": {"value": 42}}'));
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature JSON(<CHARACTER>)
    
    Flink SQL>
    g
    • 2
    • 2
  • j

    Jaya Ananthram

    09/05/2025, 7:06 AM
    Hello 👋 Question related to MSF: I want to process some data from MSK and write it back to MSK in at-least-once mode. However, I have not found a way to configure MSF to run in at-least-once mode, as the default is exactly-once and cannot be modified here. Does this mean there are no options at all for running MSF in at least once mode?
    • 1
    • 1
  • f

    Francis Altomare

    09/05/2025, 7:19 AM
    Hey everyone 👋 — I have a question about using a
    TumblingProcessingTimeWindow
    and a reduce function to ensure only a single event within my window is emitted at a time 🧵.
    e
    • 2
    • 3
  • m

    Michał Slabik

    09/05/2025, 6:14 PM
    Hello, I'm using Flink with multiple Kafka sinks with checkpointing enabled. After bumping to version 1.20.2 (from 1.19.1), my application started to log warnings:
    Name collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported.
    I found two related bug tickets: https://issues.apache.org/jira/browse/FLINK-35321, https://issues.apache.org/jira/browse/FLINK-37559. Those warnings started after introducing this change: https://issues.apache.org/jira/browse/FLINK-36455, and were applied to versions 2.0.0, 1.19.2, 1.20.1. As this setup seems rather common, I'm wondering if maybe this is related to some configuration (whether sink or checkpointing) on my side. Could you help find a solution to mitigate those warnings (maybe it can be safely ignored?) @Arvid Heise allow me to tag you as you are the author of the mentioned changes - maybe you'll have some insights on that.
  • c

    Chennupati Gopal

    09/07/2025, 2:55 AM
    Hi all I’m running flink cluster in application mode in aws EKS cluster, want to expose jmx metrics on job and task managers, can someone please help me with the MBean configurations, I don’t want that native flink metrics exporter.
    p
    • 2
    • 2
  • m

    Mohammed Salman Ali Pary

    09/08/2025, 8:34 AM
    https://stackoverflow.com/questions/79758622/how-do-i-reset-offset-for-a-topic-in-flink-sql-and-kafka Does someone know the answer?
    v
    p
    e
    • 4
    • 5
  • j

    Jo Wijnant

    09/09/2025, 7:50 AM
    I'm testing the new feature for faster checkpoint recovery using s5cmd on Flink 2.0 but I get an error when my jobs are initializing and recovering their data:
    s3 url should start with "s3://"
    Here's the relevant configurations (I left out some configs):
    Copy code
    s3:
      s5cmd:
      path: /opt/flink/bin/s5cmd
    execution:
      checkpointing:
        dir: <s3p://flink-p/checkpoints>
    I use s3p filesystem for checkpointing and s3a for sinking to parquet files on s3 but it seems pretty clear that s5cmd needs a proper s3:// url
  • g

    George Leonard

    09/09/2025, 2:23 PM
    ... can anyone see anything wrong with this...
    Copy code
    it's failing on 'CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>)
    trying to get the array of accounts into a json object, More than happy to pre create the table as a complex table if need be and then do a insert into select... actually prefer that pattern, just running out of ideas how to get this done. and my eyes are seeing double from reading docs. -- postgres_catalog.inbound.adultsx -> Target table
    Copy code
    create table c_paimon.outbound.adultsx WITH (
         'file.format'                       = 'parquet'
        ,'compaction.min.file-num'           = '2'
        ,'compaction.early-max.file-num'     = '50'
        ,'snapshot.time-retained'            = '1h'
        ,'snapshot.num-retained.min'         = '5'
        ,'snapshot.num-retained.max'         = '20'
        ,'table.exec.sink.upsert-materialize'= 'NONE'
    ) AS 
    select
         JSON_VALUE(data, '$.nationalid')       AS nationalid
        ,JSON_VALUE(data, '$._id')              AS _id
        ,JSON_VALUE(data, '$.name')             AS name
        ,JSON_VALUE(data, '$.surname')          AS surname
        ,JSON_VALUE(data, '$.gender')           AS gender
        ,JSON_VALUE(data, '$.dob')              AS dob
        ,JSON_VALUE(data, '$.marital_status')   AS marital_status
        ,JSON_VALUE(data, '$.status')           AS status
        ,JSON_QUERY(data, '$.address')          AS address
        ,CAST(JSON_QUERY(data, '$.account')     AS ARRAY<STRING>) AS account
        ,created_at                             AS created_at
    FROM postgres_catalog.inbound.adults;
    I'm consuming the data see atached, from Postgres using CDC into the below table. -- postgres_catalog.inbound.adults -> CDC Source
    Copy code
    CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
         id                 BIGINT
        ,nationalid         VARCHAR(14) --NOT NULL
        ,data               STRING     -- The json as per attached sit in here...
        ,created_at         TIMESTAMP_LTZ(3)
        ,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
        ,PRIMARY KEY (nationalid) NOT ENFORCED
    ) WITH (
          'connector'                           = 'postgres-cdc'
         ,'hostname'                            = 'postgrescdc'
         ,'port'                                = '5432'               -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping.
         ,'username'                            = 'dbadmin'
         ,'password'                            = 'dbpassword'
         ,'database-name'                       = 'demog'
         ,'schema-name'                         = 'public'
         ,'table-name'                          = 'adults'
         ,'slot.name'                           = 'adults0'
         ,'scan.incremental.snapshot.enabled'   = 'true'               -- experimental feature: incremental snapshot (default off)
         ,'scan.startup.mode'                   = 'initial'            -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>
         ,'decoding.plugin.name'                = 'pgoutput'
    );
    adults.json
  • g

    George Leonard

    09/10/2025, 4:46 AM
    ... wondering...do we have something funny at the data layer, as the command looks right, and even the err does not hint to a bad command.
    Copy code
    Flink SQL> create table c_paimon.outbound.adultsxx WITH (
    >      'file.format'                       = 'parquet'
    >     ,'compaction.min.file-num'           = '2'
    >     ,'compaction.early-max.file-num'     = '50'
    >     ,'snapshot.time-retained'            = '1h'
    >     ,'snapshot.num-retained.min'         = '5'
    >     ,'snapshot.num-retained.max'         = '20'
    >     ,'table.exec.sink.upsert-materialize'= 'NONE'
    > ) AS 
    > select
    >      JSON_VALUE(data, '$.nationalid')       AS nationalid
    >     ,JSON_VALUE(data, '$._id')              AS _id
    >     ,JSON_VALUE(data, '$.name')             AS name
    >     ,JSON_VALUE(data, '$.surname')          AS surname
    >     ,JSON_VALUE(data, '$.gender')           AS gender
    >     ,JSON_VALUE(data, '$.dob')              AS dob
    >     ,JSON_VALUE(data, '$.marital_status')   AS marital_status
    >     ,JSON_VALUE(data, '$.status')           AS status
    >     ,CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>) as account
    >     ,JSON_QUERY(data, '$.address')          AS address
    >     ,created_at                             AS created_at
    > FROM postgres_catalog.inbound.adults;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type VARCHAR(2147483647) to type VARCHAR(2147483647) ARRAY
    e
    • 2
    • 9
  • g

    George Leonard

    09/10/2025, 4:51 AM
    Example payload: piped into paimon via another route.
  • j

    Jo Wijnant

    09/10/2025, 2:16 PM
    About the variable checkpoint interval introduced in Flink 1.19 (https://cwiki.apache.org/confluence/display/FLINK/FLIP-309%3A+Support+using+larger+checkpointing+interval+when+source+is+processing+backlog) How can I verify that this is effectively working? I restarted my job from scratch, so it is reading from a large Kafka topic where the oldest records have timestamps from months ago. The job may take a few hours to catch up with the latest kafka offsets. I configured checkpointing in the SQL job:
    Copy code
    SET 'execution.checkpointing.interval' = '3m';
    SET 'execution.checkpointing.interval-during-backlog' = '10m';
    However, all checkpoints are taken every 3m. Is this the expected behavior? I'd thought it would checkpoint every 10m in the first phase... I'm on Flink 2.0
    l
    • 2
    • 2
  • p

    Pavan

    09/11/2025, 7:47 AM
    Hey everyone 👋 - I'm running a flink cluster in standalone mode in local system to stream CDC changes for mysql using
    flink-sql-connector-mysql-cdc
    everything works fine until I push an insert statement on a table which is captured by flink, following error pops up and after continuous retries of the command the error disappears I'm new to flink and still learning how CDC connectors work. Thanks in advance.
    Copy code
    java.lang.RuntimeException: One or more fetchers have encountered exception
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
        at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
        at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
        at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.base/java.lang.Thread.run(Thread.java:842)
    Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        ... 1 more
    Caused by: java.lang.NullPointerException: Cannot read field "value" because "s2" is null
        at java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:2055)
        at java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:2047)
        at java.base/java.lang.String.compareToIgnoreCase(String.java:2091)
        at org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.compareTo(BinlogOffset.java:241)
        at org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.isAfter(BinlogOffset.java:272)
        at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.hasEnterPureBinlogPhase(BinlogSplitReader.java:295)
        at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.shouldEmit(BinlogSplitReader.java:254)
        at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.pollSplitRecords(BinlogSplitReader.java:176)
        at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:150)
        at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
        at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
        at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
        ... 6 more
    • 1
    • 1
  • p

    Pavan

    09/11/2025, 7:48 AM
    If anyone came across a similar issue please point me to any resources you beleive will help
  • j

    Jashwanth S J

    09/11/2025, 10:45 AM
    I've installed plugin on JM and TM through common podtemplate init container, but still seeing this error to bring up flinksessionjob through jar from s3. DO we need to install plugin in Flink Kubernetes operator? If yes, how do we do that through values file for helm provided to install operator from community?
    Copy code
    Events:
      Type     Reason               Age                    From  Message
      ----     ------               ----                   ----  -------
      Warning  SessionJobException  3m32s (x117 over 77m)  Job   Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
    m
    • 2
    • 10
  • a

    Antonio Davide Cali

    09/11/2025, 1:20 PM
    Hello everyone, out of curiosity, I am trying to do a Pattern Matching on Flink where I want to match as many events of a specific type A (A+) but I want to find a pattern where B? event B doesn't appear at all in a 30 minutes window since the last A. my pattern (A+ B?) complains because it says optional cannot be after greedy. any idea?
    p
    • 2
    • 4
  • b

    B S Mohammed Ashfaq

    09/13/2025, 1:37 PM
    Hi Everyone, Could you please let me know where I can find the list of Flink SQL-supported sources and sinks along with the respective flink version compatibility? Thanks!
    j
    • 2
    • 1
  • m

    Madhusudhan Reddy

    09/14/2025, 1:23 PM
    Hi Everyone , I have an application that 1. Stream data from Kafka 2. Insert the data received into Flink Table-Api 3. Perform Join on tables and emit event 4. StreamExecutionEnvironment and StreamTableEnvironment is used to execute Problem & Troubleshooting done We are trying to write Integration Test(BDD) by 1. MiniClusterWithClientResource is used to start Flink Service
    Copy code
    miniCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
                    .setConfiguration(configuration)
                    .setNumberSlotsPerTaskManager(2)
                    .setNumberTaskManagers(1)
                    .build());
     // Set classloader to parent-first
            configuration.setString("classloader.resolve-order", "parent-first");
    PackagedProgram program = PackagedProgram.newBuilder()
                        .setArguments(args)
                        .setJarFile(new File(JAR_PATH + "APP-TO-Test.jar"))
                        .build();
                // Create Job Graph
                JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, new Configuration(), 1, true);
                assertNotNull(jobGraph, "JobGraph should be created");
    
                // submit a job
                miniCluster.getClusterClient().submitJob(jobGraph).get();
    2. We are able to push data into Embedded kakfa 3. App is reading messages from Kafka 4. But SQL execute is not called while streaming the data , I think the reason is when streamExecutionEnvironment.execute(); is called when Flink is running inside miniCluster- StreamExecutionEnvironment._getExecutionEnvironment_() is returning StreamPlanEnvironment which have overridden executeAsync method that throws ProgramAbortException this cause the stream to be running but SQL Asynch execute method is not called due to ProgramAbortException
    Copy code
    @Override
        public JobClient executeAsync(StreamGraph streamGraph) {
            pipeline = streamGraph;
    
            // do not go on with anything now!
            throw new ProgramAbortException();
        }
    Question for which i am looking for answer 1. My understanding is here instead if call to StreamExecutionEnvironment._getExecutionEnvironment_() inside miniCluster would have returned LocalStreamEnvironment then SQL execution would have performed as part of executeAsync ( when App is executed inside in Apache Flink then StreamExecutionEnvironment._getExecutionEnvironment_() is returning LocalStreamEnvironment) 2. Who can i make sure SQL execute call works while app is running inside miniCluster( during Integration Test) @David Anderson
    d
    • 2
    • 2
  • j

    Jacob Jona Fahlenkamp

    09/18/2025, 12:41 AM
    Hello I need to do a currency conversion for a stream of events. I thought of using a temporal join. The problem is the probe side has a bounded out of orderness of 40 days. I cannot wait for 40 days for timely events to be processed. Am I correct, in that a temporal join will not help me here, because it will wait for the watermark from both the probe and the build side?
  • j

    Jaya Ananthram

    09/18/2025, 5:36 PM
    Hello, do we have any way to visualize the Flink execution plan in UI? Previously, we used to have this https://flink.apache.org/visualizer/, but this is not working anymore.
  • d

    Darin Amos

    09/18/2025, 5:38 PM
    Hi All! I am getting the following error in my logs when my job starts from a checkpoint in Kubernetes. I was also able to find the oversized payload error messages as well showing the size of the incoming message and the max size (akka.framesize). I’m wondering if there is any way to debug the contents of the
    TaskDeploymentDescriptor
    ? The only change we made was to switch to a new custom file sink that uses the Two-Phase-Commit. But the sink is running at-least-once and holds zero state, so I’m not sure why we’d get this issue.
    Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [akka.tcp://flink@172.27.241.201:6122/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
  • f

    Francis Altomare

    09/18/2025, 6:27 PM
    Hi everyone, I’m running into an issue with a Kafka source in Flink when using an offset initializer with
    KafkaPartitionSplit.COMMITTED_OFFSET.
    In some cases, it looks like Flink falls back to the Kafka broker’s committed offsets, even though offsets are already available in Flink’s state store. We’ve observed data loss in some of our stateful jobs after restoring from a savepoint. Our best guess as to what’s happening is that the broker’s committed offsets may be ahead of the state offsets. Question: • Does anyone know if there are cases when Flink could revert to broker committed offsets even though it has offsets in its state store? • If yes, under what circumstances does the Kafka source revert to the broker’s committed offsets instead of using the offsets from Flink state? • Is there a recommended way to ensure Flink always prefers state offsets to avoid data loss after restore? Thanks a lot for your help!