Jashwanth S J
08/29/2025, 8:57 AMGeorge Leonard
08/29/2025, 12:06 PMVikas Patil
08/30/2025, 10:26 PMOr Keren
09/01/2025, 6:48 AMZeyu Qiu
09/01/2025, 8:02 AMparallelism = 1
env = StreamExecutionEnvironment.get_execution_environment(config)
env.set_parallelism(parallelism) # I don't know what's that, try 2
env.enable_checkpointing(10 * 60 * 1000) # milliseconds
# checkpoints have to complete within one minute, or are discarded
env.get_checkpoint_config().set_checkpoint_timeout(int(10 * 60 * 1000))
env.get_checkpoint_config().set_checkpointing_mode(
CheckpointingMode.EXACTLY_ONCE
)
env.disable_operator_chaining() # If drop this line, create multiple pipeline in one python job will raise error in Flink GUI
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=settings)
ss = t_env.create_statement_set()
source_sql = f"""
CREATE TABLE mysql_source (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
id BIGINT,
content STRING,
update_time TIMESTAMP(3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '{db_secret["host"]}',
'port' = '{db_secret["port"]}',
'username' = '{db_secret["username"]}',
'password' = '{db_secret["password"]}',
'database-name' = 'my_db',
'table-name' = 'my_table',
'server-id' = '{server_id}',
'debezium.snapshot.mode' = 'schema_only',
'scan.startup.mode' = 'timestamp',
'scan.startup.timestamp-millis' = '{binlog_start_time}',
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.key-column' = 'id'
);
"""
t_env.execute_sql(source_sql)
sink_sql = f"""
CREATE TABLE hudi_sink (
id BIGINT,
content STRING,
update_time TIMESTAMP(3),
hudi_ts double,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'hudi',
'path' = '<s3a://xxxx>',
'table.type' = 'COPY_ON_WRITE',
'write.precombine.field' = 'hudi_ts',
'write.operation' = 'upsert',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.partitionpath.field' = '',
'write.tasks' = '1',
'compaction.tasks' = '1',
'clean.retain_commits' = '6',
'hoodie.keep.min.commits' = '7',
'hoodie.keep.max.commits' = '8',
'compaction.async.enabled' = 'true',
'hoodie.write.lock.provider' = 'org.apache.hudi.client.transaction.lock.InProcessLockProvider',
'index.partition.regex' = 'false',
'index.bootstrap.enabled' = 'false',
'write.rate.limit' = '2000'
);
"""
t_env.execute_sql(sink_sql)
insert_sql = f"""
INSERT INTO hudi_sink
SELECT id,content,update_time,
UNIX_TIMESTAMP() as hudi_ts FROM mysql_source where db_name = 'my_db'
and table_name = 'my_table';
"""
ss.add_insert_sql(insert_sql)
ss.attach_as_datastream()
env.execute(f'mysql_cdc_to_hudi')
After I submit this job to Flink, I expect the Hudi table in S3 could have real-time data same as MySQL table. But in fact, when I execute an update statement in MySQL, there will be 2 duplicate rows appear in Hudi table. For example here’s data in MySQL:
id |content |update_time
1 |a |2025-08-01 00:00:00
And I execute:
update my_table set content = 'b' where id = 1
Then the data in Hudi table will looks like that:
id |content |update_time
1 |a |2025-08-01 00:00:00
1 |b |2025-09-01 00:00:00
Do you guys have any idea about why that’s happening? As I’ve set 'hoodie.datasource.write.recordkey.field' = 'id'
, I suppose there shouldn’t be duplicate row in the Hudi table. And this error cannot be reproduced stably. Sometimes the result is normal, and sometimes there is an error. This is especially likely to occur when there are multiple Hudi sinks in a job.
I’m using Flink 1.16.2
, Hudi 0.13.0
, FlinkCDC 3.1.1
In addition, I found some log in Flink like:
2025-09-01 01:10:20,612 INFO org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer [] - The state is cleared because of state ttl. This will result in incorrect result. You can increase the state ttl to avoid this.
Not sure if this related to the problem I metGeorge Leonard
09/01/2025, 12:24 PMJashwanth S J
09/02/2025, 4:27 AMGeorge Leonard
09/02/2025, 9:10 AMMohammed Salman Ali Pary
09/02/2025, 10:24 AML P V
09/02/2025, 3:50 PMCREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'uri' = '<thrift://hivems.lakehouse.db-prod:9083>'
)
Is there anyway I could use only 1 catalog which persistent with HMS?Alexei Pozdniakov
09/02/2025, 10:44 PMElad
09/03/2025, 1:54 PMVadim Vararu
09/04/2025, 8:56 AM2025-09-04 08:20:53,380 INFO org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo serializer scala extensions are not available.
Exception in thread "Thread-57" java.lang.IllegalArgumentException: classLoader cannot be null.
at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:553)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:394)
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x0000ffff9440bb90, pid=1, tid=391
#
# JRE version: OpenJDK Runtime Environment Temurin-11.0.26+4 (11.0.26+4) (build 11.0.26+4)
# Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.26+4 (11.0.26+4, mixed mode, sharing, tiered, compressed oops, serial gc, linux-aarch64)
# Problematic frame:
# V [libjvm.so+0x5fbb90] Exceptions::_throw_oop(Thread*, char const*, int, oopDesc*)+0x180
#
# Core dump will be written. Default location: /opt/flink/core.1
#
# An error report file with more information is saved as:
# /opt/flink/hs_err_pid1.log
#
# If you would like to submit a bug report, please visit:
# <https://github.com/adoptium/adoptium-support/issues>
#
[error occurred during error reporting (), id 0x5, SIGTRAP (0x5) at pc=0x0000ffff958971ec]
Looks like something that happens during the RocksDb compaction... 😞
The FLink version - 1.18.1Harish Sharma
09/04/2025, 12:26 PM2025-09-04 14:51:26,611 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Flink CDC Event Source: sqlserver -> SchemaOperator -> PrePartition (1/2) (ee122e9200a52b9ee0b7a74d889052cb_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on localhost:55664-e801e4 @ localhost (dataPort=55667).
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist-1.20.1.jar:1.20.1]
at java.lang.Thread.run(Thread.java:1583) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={commit_lsn=0000002a:00000d28:009c, change_lsn=0000002a:00000d28:009c}, endOffset={change_lsn=7f}, isSuspended=false, isSnapshotCompleted=true} error due to Currently unsupported by the SQL Server connector.
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={commit_lsn=0000002a:00000d28:009c, change_lsn=0000002a:00000d28:009c}, endOffset={change_lsn=7f}, isSuspended=false, isSnapshotCompleted=true} error due to Currently unsupported by the SQL Server connector.
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.checkReadException(IncrementalSourceStreamFetcher.java:137) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:115) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:192) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
Caused by: java.lang.UnsupportedOperationException: Currently unsupported by the SQL Server connector
at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:113) ~[debezium-connector-sqlserver-1.9.8.Final.jar:1.9.8.Final]
at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:168) ~[flink-connector-sqlserver-cdc-local-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:77) ~[flink-connector-sqlserver-cdc-local-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
George Leonard
09/04/2025, 1:50 PMFlink SQL> SELECT JSON_ARRAY(JSON('{"nested_json": {"value": 42}}'));
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature JSON(<CHARACTER>)
Flink SQL>
Jaya Ananthram
09/05/2025, 7:06 AMFrancis Altomare
09/05/2025, 7:19 AMTumblingProcessingTimeWindow
and a reduce function to ensure only a single event within my window is emitted at a time 🧵.Michał Slabik
09/05/2025, 6:14 PMName collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported.
I found two related bug tickets: https://issues.apache.org/jira/browse/FLINK-35321, https://issues.apache.org/jira/browse/FLINK-37559. Those warnings started after introducing this change: https://issues.apache.org/jira/browse/FLINK-36455, and were applied to versions 2.0.0, 1.19.2, 1.20.1. As this setup seems rather common, I'm wondering if maybe this is related to some configuration (whether sink or checkpointing) on my side. Could you help find a solution to mitigate those warnings (maybe it can be safely ignored?)
@Arvid Heise allow me to tag you as you are the author of the mentioned changes - maybe you'll have some insights on that.Chennupati Gopal
09/07/2025, 2:55 AMMohammed Salman Ali Pary
09/08/2025, 8:34 AMJo Wijnant
09/09/2025, 7:50 AMs3 url should start with "s3://"
Here's the relevant configurations (I left out some configs):
s3:
s5cmd:
path: /opt/flink/bin/s5cmd
execution:
checkpointing:
dir: <s3p://flink-p/checkpoints>
I use s3p filesystem for checkpointing and s3a for sinking to parquet files on s3 but it seems pretty clear that s5cmd needs a proper s3:// urlGeorge Leonard
09/09/2025, 2:23 PMit's failing on 'CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>)
trying to get the array of accounts into a json object,
More than happy to pre create the table as a complex table if need be and then do a insert into select... actually prefer that pattern, just running out of ideas how to get this done. and my eyes are seeing double from reading docs.
-- postgres_catalog.inbound.adultsx -> Target table
create table c_paimon.outbound.adultsx WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
select
JSON_VALUE(data, '$.nationalid') AS nationalid
,JSON_VALUE(data, '$._id') AS _id
,JSON_VALUE(data, '$.name') AS name
,JSON_VALUE(data, '$.surname') AS surname
,JSON_VALUE(data, '$.gender') AS gender
,JSON_VALUE(data, '$.dob') AS dob
,JSON_VALUE(data, '$.marital_status') AS marital_status
,JSON_VALUE(data, '$.status') AS status
,JSON_QUERY(data, '$.address') AS address
,CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>) AS account
,created_at AS created_at
FROM postgres_catalog.inbound.adults;
I'm consuming the data see atached, from Postgres using CDC into the below table.
-- postgres_catalog.inbound.adults -> CDC Source
CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
id BIGINT
,nationalid VARCHAR(14) --NOT NULL
,data STRING -- The json as per attached sit in here...
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (nationalid) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432' -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping.
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'adults'
,'slot.name' = 'adults0'
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>
,'decoding.plugin.name' = 'pgoutput'
);
George Leonard
09/10/2025, 4:46 AMFlink SQL> create table c_paimon.outbound.adultsxx WITH (
> 'file.format' = 'parquet'
> ,'compaction.min.file-num' = '2'
> ,'compaction.early-max.file-num' = '50'
> ,'snapshot.time-retained' = '1h'
> ,'snapshot.num-retained.min' = '5'
> ,'snapshot.num-retained.max' = '20'
> ,'table.exec.sink.upsert-materialize'= 'NONE'
> ) AS
> select
> JSON_VALUE(data, '$.nationalid') AS nationalid
> ,JSON_VALUE(data, '$._id') AS _id
> ,JSON_VALUE(data, '$.name') AS name
> ,JSON_VALUE(data, '$.surname') AS surname
> ,JSON_VALUE(data, '$.gender') AS gender
> ,JSON_VALUE(data, '$.dob') AS dob
> ,JSON_VALUE(data, '$.marital_status') AS marital_status
> ,JSON_VALUE(data, '$.status') AS status
> ,CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>) as account
> ,JSON_QUERY(data, '$.address') AS address
> ,created_at AS created_at
> FROM postgres_catalog.inbound.adults;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type VARCHAR(2147483647) to type VARCHAR(2147483647) ARRAY
George Leonard
09/10/2025, 4:51 AMJo Wijnant
09/10/2025, 2:16 PMSET 'execution.checkpointing.interval' = '3m';
SET 'execution.checkpointing.interval-during-backlog' = '10m';
However, all checkpoints are taken every 3m. Is this the expected behavior? I'd thought it would checkpoint every 10m in the first phase...
I'm on Flink 2.0Pavan
09/11/2025, 7:47 AMflink-sql-connector-mysql-cdc
everything works fine until I push an insert statement on a table which is captured by flink, following error pops up and after continuous retries of the command the error disappears
I'm new to flink and still learning how CDC connectors work. Thanks in advance.
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
... 1 more
Caused by: java.lang.NullPointerException: Cannot read field "value" because "s2" is null
at java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:2055)
at java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:2047)
at java.base/java.lang.String.compareToIgnoreCase(String.java:2091)
at org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.compareTo(BinlogOffset.java:241)
at org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.isAfter(BinlogOffset.java:272)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.hasEnterPureBinlogPhase(BinlogSplitReader.java:295)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.shouldEmit(BinlogSplitReader.java:254)
at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.pollSplitRecords(BinlogSplitReader.java:176)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:150)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
... 6 more
Pavan
09/11/2025, 7:48 AMJashwanth S J
09/11/2025, 10:45 AMEvents:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning SessionJobException 3m32s (x117 over 77m) Job Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
Antonio Davide Cali
09/11/2025, 1:20 PMB S Mohammed Ashfaq
09/13/2025, 1:37 PM