Jacob Jona Fahlenkamp
09/18/2025, 12:41 AMJaya Ananthram
09/18/2025, 5:36 PMDarin Amos
09/18/2025, 5:38 PMTaskDeploymentDescriptor? The only change we made was to switch to a new custom file sink that uses the Two-Phase-Commit. But the sink is running at-least-once and holds zero state, so I’m not sure why we’d get this issue.
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [akka.tcp://flink@172.27.241.201:6122/user/rpc/taskmanager_0] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
Francis Altomare
09/18/2025, 6:27 PMKafkaPartitionSplit.COMMITTED_OFFSET.
In some cases, it looks like Flink falls back to the Kafka broker’s committed offsets, even though offsets are already available in Flink’s state store. We’ve observed data loss in some of our stateful jobs after restoring from a savepoint. Our best guess as to what’s happening is that the broker’s committed offsets may be ahead of the state offsets.
Question:
• Does anyone know if there are cases when Flink could revert to broker committed offsets even though it has offsets in its state store?
• If yes, under what circumstances does the Kafka source revert to the broker’s committed offsets instead of using the offsets from Flink state?
• Is there a recommended way to ensure Flink always prefers state offsets to avoid data loss after restore?
Thanks a lot for your help!Nikola Stanisavljevic
09/21/2025, 8:47 PMAshish Marottickal Gopi
09/22/2025, 10:06 AM"fixed-delay" strategy for failure restarts. This led to job being in Terminally failed status . As per the documentation, we went for manual restore with deletion of the FlinkDeployment CRD and restoring from checkpoint. But in our case the application still doesnt start with FlinkDeployment CRD status being in Reconciling state with error as :
JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been del │ │ eted. Manual restore required.
Could someone help with this ?Mourad HARMIM
09/22/2025, 2:06 PMCarlos Sanabria Miranda
09/23/2025, 1:49 PMRichAsyncFunction to call an external HTTP service, with org.apache.http.nio.client.HttpAsyncClient as HTTP client. It is an async HTTP client that follows the non-blocking I/O (NIO) model, allowing it to handle many concurrent HTTP connections with a very small number of threads, using the Reactor pattern.
I'm trying to figure out the best way to create and configure this HTTP client when used inside a RichAsyncFunction:
1. Client creation: Is it best to create one client for each subtask (in the open() method), or should we share a single static client across all operator subtasks in the same JVM? I've read this last option is an anti-pattern in Flink, but just wanted to see if it still makes sense.
2. Num threads: How should we configure the number of IO dispatch threads in the client? Should we limit it to one thread per client and rely on Flink's parallelism, or set it higher? By default this library configures 2 threads. Should we set as many threads as CPU processors available to the JVM (Runtime.getRuntime.availableProcessors())?
Cheers!Jo Wijnant
09/24/2025, 1:38 PMproperties.client.id on a source table (kafka consumer) is ignored whereas on a sink table (kafka producer) it effectively applies.
This is the relevant SQL for all my tables:
CREATE TABLE ...
WITH (
'connector' = 'upsert-kafka',
'properties.client.id' = 'flink-project-usage',
...)
The client.id is applied to the producer (logs of taskmanager):
2025-09-24 13:20:17,676 INFO org.apache.kafka.common.config.AbstractConfig [] - ProducerConfig values:
acks = -1
...
client.id = flink-project-usage
But not for the consumer. It takes {group.id}-{index}
2025-09-24 13:20:17,478 INFO org.apache.kafka.common.config.AbstractConfig [] - ConsumerConfig values:
allow.auto.create.topics = false
...
client.id = coz_irods_project_usage-0
Why can I not set client.id on consumer?
I like to label all consumers with the same client.id so I can easily define kafka quotas for this client. But now I'd have to define multiple quotas (coz_irods_project_usage-0, coz_irods_project_usage-1, coz_irods_project_usage-2 ...)Michał Sochoń
09/24/2025, 6:26 PMBoice Lu
09/26/2025, 2:00 AMFlinkSessionJob that consumes data from a Kafka topic and writes it to a BigQuery table.
When the JobManager starts, it seems that the initialization step — possibly checking the BigQuery table — takes a long time. During this delay, the web upload temporary file appears to be deleted, causing the job to fail.
We are seeing the following exception:
Caused by: java.io.FileNotFoundException:
/tmp/flink-web-11008d54-f3d5-4458-8b2c-531d6c88ab1a/flink-web-upload/edc628df-da1b-4b0f-b508-88b3d9891edb_flink-datastream-1.0.0.jar
(No such file or directory)
Has anyone encountered this issue before?
Is there a recommended way to prevent the uploaded jar file from being deleted during job initialization?
Thanks in advance for any help! 🙏Royston
09/29/2025, 2:29 PMJeremy Tee
10/01/2025, 7:35 AML P V
10/01/2025, 12:03 PM'format' = 'protobuf' for a Kafka source table. The CREATE TABLE succeeds, but EXPLAIN PLAN FOR SELECT * fails with ClassNotFoundException during schema inference/planning. This happens in a distributed setup with Flink SQL Gateway and a remote Flink Session Cluster. I've tried dynamic JAR loading via GCS (gs://) but the class isn't resolvable at planning time, even though GCS access works (no filesystem errors).
Setup:
• Flink version: 1.19.1
• SQL Client startup: ./bin/sql-client.sh gateway --endpoint 0.0.0.0:8083 -Drest.address=xxxxx -Drest.port=8081
• JAR: trackify_proto_file_deploy.jar (built from Protobuf-generated code via Maven/Gradle). Verified contents with `jar -tvf`: Includes vn/com/momo/mle/proto/v1/UserCountResult.class and inner classes (e.g., UserCountResult$1.class). It's a fat JAR with protobuf-java shaded in.
Steps to Reproduce:
• Start SQL Client (as above) and connect to Gateway session.
• Add JAR and set config:
ADD JAR 'gs://..../trackify_proto_file_deploy.jar';
SET 'pipeline.jars' = 'gs://ml-game-short-term-storage-sing/flink_sql/jars/trackify_proto_file_deploy.jar';
◦ Both succeed: [INFO] Execute statement succeed. / [INFO] JAR added successfully.
• Create table (parses fine, registers in catalog):
CREATE TABLE src.kafka.test (
.......
) WITH (
'connector' = 'kafka',
'topic' = '',
'properties.bootstrap.servers' = '...',
'format' = 'protobuf',
'protobuf.message-class-name' = 'vn.com.momo.mle.proto.v1.UserCountResult',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.group.id' = ',
'properties.sasl.jaas.config' = '...',
'scan.startup.mode' = 'earliest-offset'
);
Succeeds: [INFO] Execute statement succeed.
• Run planning:
EXPLAIN PLAN FOR SELECT * FROM src.kafka.test;
• Fails with: [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: vn.com.momo.mle.proto.v1.UserCountResult
What I've Tried (No Redeploy):
• SHOW JARS; lists the added JAR.
Question/Goal:
• Why does Protobuf class loading fail at planning time (EXPLAIN PLAN) in a Gateway + remote Session Cluster, even with pipeline.jars set to a resolvable URL (gs:// or HTTP via REST upload)? Is planning strictly client-side, or does the Gateway/JM need additional setup for dynamic JARs in custom formats like Protobuf?
• How to dynamically add/update Protobuf JARs (e.g., for schema evolution in Kafka topics) without redeploying the Flink image or putting in /opt/flink/lib/?Lukasz Krawiec
10/01/2025, 1:43 PMמייקי בר יעקב
10/01/2025, 9:38 PMמייקי בר יעקב
10/02/2025, 10:20 AMMadhusudhan Reddy
10/02/2025, 3:42 PMVictor Babenko
10/03/2025, 5:02 AMChennupati Gopal
10/03/2025, 7:35 AMJaya Ananthram
10/04/2025, 11:17 AMSource → Map). Does serialization/deserialization occur?
2. Between slots within the same TaskManager
◦ If two subtasks are running in different slots of the same TaskManager, does Flink serialize/deserialize the record when passing data between them?
3. Between slots across different TaskManagers
◦ When data is exchanged between subtasks (e.g., after keyBy) located in different TaskManagers, does Flink always perform serialization/deserialization during transfer? - I believe this is obviously yes, as it is different from JVM communication
(Context: I am dealing with some performance fine-tuning, so I would like to know the behaviour to understand my job bottleneck)Royston
10/04/2025, 7:27 PMraphaelauv
10/05/2025, 10:46 PMio.confluent.kafka.serializers.subject.TopicNameStrategy
but only an explicit subject
does anyone, has already found a solution ? thanks allGrzegorz Liter
10/06/2025, 9:21 AMat app//org.rocksdb.Checkpoint.createCheckpoint(Native Method)
at app//org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:50)
at app//org.apache.flink.state.rocksdb.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:174)Evan NotEvan
10/06/2025, 11:36 AM1024, the output sequence should be:
1024 -> 512 -> 256 -> ... -> 1
The interesting part is that I’m using Flink 2.1, where DataStream.iterate() and `IterativeStream`—basically the built-in Iteration API—have been removed. On top of that, Flink ML currently only supports Flink versions up to 1.17.
Without these options, it’s up to the user to implement their own iteration. So far, I’ve put together the following code, which attempts to demonstrate a feedback loop. I’ve also included a diagram to better illustrate the implementation.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class FeedbackLoopExample {
// Define a side output for feedback
private static final OutputTag<Integer> FEEDBACK_TAG = new OutputTag<Integer>("feedback"){};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initial input
DataStream<Integer> input = env.fromElements(1024)
.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks());
// Process function that splits into converged vs non-converged
SingleOutputStreamOperator<Integer> processed = input.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) {
if (value > 1) {
// Not yet converged → send to feedback
ctx.output(FEEDBACK_TAG, value / 2); // iteration step
} else {
// Converged → emit as output
out.collect(value);
}
}
});
// Feedback stream (non-converged)
DataStream<Integer> feedback = processed.getSideOutput(FEEDBACK_TAG);
// Apply the same process again (iterative feedback)
SingleOutputStreamOperator<Integer> processedFeedback = feedback.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) {
if (value > 1) {
ctx.output(FEEDBACK_TAG, value / 2);
} else {
out.collect(value);
}
}
});
// Union feedback outputs → continue iteration
DataStream<Integer> loopFeedback = processedFeedback.getSideOutput(FEEDBACK_TAG);
// Final results = outputs from both initial and feedback iterations
DataStream<Integer> finalResults = processed.union(processedFeedback);
finalResults.print("Converged");
loopFeedback.print("Feedback"); // shows values still circulating
env.execute("Flink Feedback Loop Example");
}
}
This is the output if you run the code
1024 -> 512 -> 256 (stops here)
It seems like the job only performs two iterations and then stops instead of counting down to 1. This makes me think that my attempt to loop back the feedback stream, didn’t actually work.
My first question is: what’s the correct way to implement a feedback loop in Flink 2.0+ given the constrains mentioned above?
Second question: is it ok if two different process functions use the same side out to write data ?
Third question; is there any way to extract or analyze the graph topology of a job in Flink ?Fabrizzio Chavez
10/06/2025, 9:55 PMDennis Sosnoski
10/06/2025, 10:55 PMAjay Sarjoo
10/07/2025, 3:53 PMRoyston
10/09/2025, 10:50 AMTrystan
10/09/2025, 3:12 PM