George Leonard
09/01/2025, 12:24 PMJashwanth S J
09/02/2025, 4:27 AMGeorge Leonard
09/02/2025, 9:10 AMMohammed Salman Ali Pary
09/02/2025, 10:24 AML P V
09/02/2025, 3:50 PMCREATE CATALOG iceberg_catalog WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'uri' = '<thrift://hivems.lakehouse.db-prod:9083>'
)
Is there anyway I could use only 1 catalog which persistent with HMS?Alexei Pozdniakov
09/02/2025, 10:44 PMElad
09/03/2025, 1:54 PMVadim Vararu
09/04/2025, 8:56 AM2025-09-04 08:20:53,380 INFO org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer [] - Kryo serializer scala extensions are not available.
Exception in thread "Thread-57" java.lang.IllegalArgumentException: classLoader cannot be null.
at com.esotericsoftware.kryo.Kryo.setClassLoader(Kryo.java:975)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.checkKryoInitialized(KryoSerializer.java:553)
at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:394)
at org.apache.flink.api.common.typeutils.CompositeSerializer.deserialize(CompositeSerializer.java:156)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextElementLastAccessTimestamp(RocksDbTtlCompactFiltersManager.java:205)
at org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager$ListElementFilter.nextUnexpiredOffset(RocksDbTtlCompactFiltersManager.java:191)
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x0000ffff9440bb90, pid=1, tid=391
#
# JRE version: OpenJDK Runtime Environment Temurin-11.0.26+4 (11.0.26+4) (build 11.0.26+4)
# Java VM: OpenJDK 64-Bit Server VM Temurin-11.0.26+4 (11.0.26+4, mixed mode, sharing, tiered, compressed oops, serial gc, linux-aarch64)
# Problematic frame:
# V [libjvm.so+0x5fbb90] Exceptions::_throw_oop(Thread*, char const*, int, oopDesc*)+0x180
#
# Core dump will be written. Default location: /opt/flink/core.1
#
# An error report file with more information is saved as:
# /opt/flink/hs_err_pid1.log
#
# If you would like to submit a bug report, please visit:
# <https://github.com/adoptium/adoptium-support/issues>
#
[error occurred during error reporting (), id 0x5, SIGTRAP (0x5) at pc=0x0000ffff958971ec]
Looks like something that happens during the RocksDb compaction... 😞
The FLink version - 1.18.1Harish Sharma
09/04/2025, 12:26 PM2025-09-04 14:51:26,611 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Flink CDC Event Source: sqlserver -> SchemaOperator -> PrePartition (1/2) (ee122e9200a52b9ee0b7a74d889052cb_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on localhost:55664-e801e4 @ localhost (dataPort=55667).
java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) ~[flink-dist-1.20.1.jar:1.20.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist-1.20.1.jar:1.20.1]
at java.lang.Thread.run(Thread.java:1583) ~[?:?]
Caused by: java.lang.RuntimeException: SplitFetcher thread 1 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
Caused by: java.io.IOException: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={commit_lsn=0000002a:00000d28:009c, change_lsn=0000002a:00000d28:009c}, endOffset={change_lsn=7f}, isSuspended=false, isSnapshotCompleted=true} error due to Currently unsupported by the SQL Server connector.
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:101) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Read split StreamSplit{splitId='stream-split', offset={commit_lsn=0000002a:00000d28:009c, change_lsn=0000002a:00000d28:009c}, endOffset={change_lsn=7f}, isSuspended=false, isSnapshotCompleted=true} error due to Currently unsupported by the SQL Server connector.
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.checkReadException(IncrementalSourceStreamFetcher.java:137) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.pollSplitRecords(IncrementalSourceStreamFetcher.java:115) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.pollSplitRecords(IncrementalSourceSplitReader.java:192) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader.fetch(IncrementalSourceSplitReader.java:98) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) ~[flink-connector-base-1.20.1.jar:1.20.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117) ~[flink-connector-base-1.20.1.jar:1.20.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
Caused by: java.lang.UnsupportedOperationException: Currently unsupported by the SQL Server connector
at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:113) ~[debezium-connector-sqlserver-1.9.8.Final.jar:1.9.8.Final]
at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$StreamSplitReadTask.execute(SqlServerStreamFetchTask.java:168) ~[flink-connector-sqlserver-cdc-local-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask.execute(SqlServerStreamFetchTask.java:77) ~[flink-connector-sqlserver-cdc-local-3.4.0.jar:3.4.0]
at org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceStreamFetcher.lambda$submitTask$0(IncrementalSourceStreamFetcher.java:89) ~[flink-cdc-base-3.4.0.jar:3.4.0]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572) ~[?:?]
at java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[?:?]
... 1 more
George Leonard
09/04/2025, 1:50 PMFlink SQL> SELECT JSON_ARRAY(JSON('{"nested_json": {"value": 42}}'));
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature JSON(<CHARACTER>)
Flink SQL>
Jaya Ananthram
09/05/2025, 7:06 AMFrancis Altomare
09/05/2025, 7:19 AMTumblingProcessingTimeWindow
and a reduce function to ensure only a single event within my window is emitted at a time 🧵.Michał Slabik
09/05/2025, 6:14 PMName collision: Group already contains a Metric with the name 'pendingCommittables'. Metric will not be reported.
I found two related bug tickets: https://issues.apache.org/jira/browse/FLINK-35321, https://issues.apache.org/jira/browse/FLINK-37559. Those warnings started after introducing this change: https://issues.apache.org/jira/browse/FLINK-36455, and were applied to versions 2.0.0, 1.19.2, 1.20.1. As this setup seems rather common, I'm wondering if maybe this is related to some configuration (whether sink or checkpointing) on my side. Could you help find a solution to mitigate those warnings (maybe it can be safely ignored?)
@Arvid Heise allow me to tag you as you are the author of the mentioned changes - maybe you'll have some insights on that.Chennupati Gopal
09/07/2025, 2:55 AMMohammed Salman Ali Pary
09/08/2025, 8:34 AMJo Wijnant
09/09/2025, 7:50 AMs3 url should start with "s3://"
Here's the relevant configurations (I left out some configs):
s3:
s5cmd:
path: /opt/flink/bin/s5cmd
execution:
checkpointing:
dir: <s3p://flink-p/checkpoints>
I use s3p filesystem for checkpointing and s3a for sinking to parquet files on s3 but it seems pretty clear that s5cmd needs a proper s3:// urlGeorge Leonard
09/09/2025, 2:23 PMit's failing on 'CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>)
trying to get the array of accounts into a json object,
More than happy to pre create the table as a complex table if need be and then do a insert into select... actually prefer that pattern, just running out of ideas how to get this done. and my eyes are seeing double from reading docs.
-- postgres_catalog.inbound.adultsx -> Target table
create table c_paimon.outbound.adultsx WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
select
JSON_VALUE(data, '$.nationalid') AS nationalid
,JSON_VALUE(data, '$._id') AS _id
,JSON_VALUE(data, '$.name') AS name
,JSON_VALUE(data, '$.surname') AS surname
,JSON_VALUE(data, '$.gender') AS gender
,JSON_VALUE(data, '$.dob') AS dob
,JSON_VALUE(data, '$.marital_status') AS marital_status
,JSON_VALUE(data, '$.status') AS status
,JSON_QUERY(data, '$.address') AS address
,CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>) AS account
,created_at AS created_at
FROM postgres_catalog.inbound.adults;
I'm consuming the data see atached, from Postgres using CDC into the below table.
-- postgres_catalog.inbound.adults -> CDC Source
CREATE OR REPLACE TABLE postgres_catalog.inbound.adults (
id BIGINT
,nationalid VARCHAR(14) --NOT NULL
,data STRING -- The json as per attached sit in here...
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (nationalid) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432' -- NOTE: this is the port of the db on the container, not the external docker exported port via a port mapping.
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'adults'
,'slot.name' = 'adults0'
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position>
,'decoding.plugin.name' = 'pgoutput'
);
George Leonard
09/10/2025, 4:46 AMFlink SQL> create table c_paimon.outbound.adultsxx WITH (
> 'file.format' = 'parquet'
> ,'compaction.min.file-num' = '2'
> ,'compaction.early-max.file-num' = '50'
> ,'snapshot.time-retained' = '1h'
> ,'snapshot.num-retained.min' = '5'
> ,'snapshot.num-retained.max' = '20'
> ,'table.exec.sink.upsert-materialize'= 'NONE'
> ) AS
> select
> JSON_VALUE(data, '$.nationalid') AS nationalid
> ,JSON_VALUE(data, '$._id') AS _id
> ,JSON_VALUE(data, '$.name') AS name
> ,JSON_VALUE(data, '$.surname') AS surname
> ,JSON_VALUE(data, '$.gender') AS gender
> ,JSON_VALUE(data, '$.dob') AS dob
> ,JSON_VALUE(data, '$.marital_status') AS marital_status
> ,JSON_VALUE(data, '$.status') AS status
> ,CAST(JSON_QUERY(data, '$.account') AS ARRAY<STRING>) as account
> ,JSON_QUERY(data, '$.address') AS address
> ,created_at AS created_at
> FROM postgres_catalog.inbound.adults;
[ERROR] Could not execute SQL statement. Reason:
org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type VARCHAR(2147483647) to type VARCHAR(2147483647) ARRAY
George Leonard
09/10/2025, 4:51 AMJo Wijnant
09/10/2025, 2:16 PMSET 'execution.checkpointing.interval' = '3m';
SET 'execution.checkpointing.interval-during-backlog' = '10m';
However, all checkpoints are taken every 3m. Is this the expected behavior? I'd thought it would checkpoint every 10m in the first phase...
I'm on Flink 2.0Pavan
09/11/2025, 7:47 AMflink-sql-connector-mysql-cdc
everything works fine until I push an insert statement on a table which is captured by flink, following error pops up and after continuous retries of the command the error disappears
I'm new to flink and still learning how CDC connectors work. Thanks in advance.
java.lang.RuntimeException: One or more fetchers have encountered exception
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:333)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:228)
    at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:190)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:168)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:117)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    ... 1 more
Caused by: java.lang.NullPointerException: Cannot read field "value" because "s2" is null
    at java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:2055)
    at java.base/java.lang.String$CaseInsensitiveComparator.compare(String.java:2047)
    at java.base/java.lang.String.compareToIgnoreCase(String.java:2091)
    at org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.compareTo(BinlogOffset.java:241)
    at org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.isAfter(BinlogOffset.java:272)
    at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.hasEnterPureBinlogPhase(BinlogSplitReader.java:295)
    at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.shouldEmit(BinlogSplitReader.java:254)
    at org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.pollSplitRecords(BinlogSplitReader.java:176)
    at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.pollSplitRecords(MySqlSplitReader.java:150)
    at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:84)
    at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
    ... 6 more
Pavan
09/11/2025, 7:48 AMJashwanth S J
09/11/2025, 10:45 AMEvents:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning SessionJobException 3m32s (x117 over 77m) Job Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
Antonio Davide Cali
09/11/2025, 1:20 PMB S Mohammed Ashfaq
09/13/2025, 1:37 PMMadhusudhan Reddy
09/14/2025, 1:23 PMminiCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setConfiguration(configuration)
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build());
// Set classloader to parent-first
configuration.setString("classloader.resolve-order", "parent-first");
PackagedProgram program = PackagedProgram.newBuilder()
.setArguments(args)
.setJarFile(new File(JAR_PATH + "APP-TO-Test.jar"))
.build();
// Create Job Graph
JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, new Configuration(), 1, true);
assertNotNull(jobGraph, "JobGraph should be created");
// submit a job
miniCluster.getClusterClient().submitJob(jobGraph).get();
2. We are able to push data into Embedded kakfa
3. App is reading messages from Kafka
4. But SQL execute is not called while streaming the data , I think the reason is when
streamExecutionEnvironment.execute(); is called when Flink is running inside miniCluster-
StreamExecutionEnvironment._getExecutionEnvironment_() is returning StreamPlanEnvironment which have overridden executeAsync method that throws ProgramAbortException this cause the stream to be running but SQL Asynch execute method is not called due to ProgramAbortException
@Override
public JobClient executeAsync(StreamGraph streamGraph) {
pipeline = streamGraph;
// do not go on with anything now!
throw new ProgramAbortException();
}
Question for which i am looking for answer
1. My understanding is here instead if call to StreamExecutionEnvironment._getExecutionEnvironment_() inside miniCluster would have returned LocalStreamEnvironment then
SQL execution would have performed as part of executeAsync
( when App is executed inside in Apache Flink then StreamExecutionEnvironment._getExecutionEnvironment_() is returning LocalStreamEnvironment)
2. Who can i make sure SQL execute call works while app is running inside miniCluster( during Integration Test)
@David AndersonJacob Jona Fahlenkamp
09/18/2025, 12:41 AMJaya Ananthram
09/18/2025, 5:36 PMDarin Amos
09/18/2025, 5:38 PMTaskDeploymentDescriptor
? The only change we made was to switch to a new custom file sink that uses the Two-Phase-Commit. But the sink is running at-least-once and holds zero state, so I’m not sure why we’d get this issue.
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [akka.tcp://flink@172.27.241.201:6122/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
Francis Altomare
09/18/2025, 6:27 PMKafkaPartitionSplit.COMMITTED_OFFSET.
In some cases, it looks like Flink falls back to the Kafka broker’s committed offsets, even though offsets are already available in Flink’s state store. We’ve observed data loss in some of our stateful jobs after restoring from a savepoint. Our best guess as to what’s happening is that the broker’s committed offsets may be ahead of the state offsets.
Question:
• Does anyone know if there are cases when Flink could revert to broker committed offsets even though it has offsets in its state store?
• If yes, under what circumstances does the Kafka source revert to the broker’s committed offsets instead of using the offsets from Flink state?
• Is there a recommended way to ensure Flink always prefers state offsets to avoid data loss after restore?
Thanks a lot for your help!