Nikola Stanisavljevic
09/21/2025, 8:47 PMAshish Marottickal Gopi
09/22/2025, 10:06 AM"fixed-delay"
strategy for failure restarts. This led to job being in Terminally failed status . As per the documentation, we went for manual restore with deletion of the FlinkDeployment CRD and restoring from checkpoint. But in our case the application still doesnt start with FlinkDeployment CRD status being in Reconciling state with error as :
JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been del │ │ eted. Manual restore required.
Could someone help with this ?Mourad HARMIM
09/22/2025, 2:06 PMCarlos Sanabria Miranda
09/23/2025, 1:49 PMRichAsyncFunction
to call an external HTTP service, with org.apache.http.nio.client.HttpAsyncClient
as HTTP client. It is an async HTTP client that follows the non-blocking I/O (NIO) model, allowing it to handle many concurrent HTTP connections with a very small number of threads, using the Reactor pattern.
I'm trying to figure out the best way to create and configure this HTTP client when used inside a RichAsyncFunction:
1. Client creation: Is it best to create one client for each subtask (in the open()
method), or should we share a single static client across all operator subtasks in the same JVM? I've read this last option is an anti-pattern in Flink, but just wanted to see if it still makes sense.
2. Num threads: How should we configure the number of IO dispatch threads in the client? Should we limit it to one thread per client and rely on Flink's parallelism, or set it higher? By default this library configures 2 threads. Should we set as many threads as CPU processors available to the JVM (Runtime.getRuntime.availableProcessors()
)?
Cheers!Jo Wijnant
09/24/2025, 1:38 PMproperties.client.id
on a source table (kafka consumer) is ignored whereas on a sink table (kafka producer) it effectively applies.
This is the relevant SQL for all my tables:
CREATE TABLE ...
WITH (
'connector' = 'upsert-kafka',
'properties.client.id' = 'flink-project-usage',
...)
The client.id is applied to the producer (logs of taskmanager):
2025-09-24 13:20:17,676 INFO org.apache.kafka.common.config.AbstractConfig [] - ProducerConfig values:
acks = -1
...
client.id = flink-project-usage
But not for the consumer. It takes {group.id}-{index}
2025-09-24 13:20:17,478 INFO org.apache.kafka.common.config.AbstractConfig [] - ConsumerConfig values:
allow.auto.create.topics = false
...
client.id = coz_irods_project_usage-0
Why can I not set client.id on consumer?
I like to label all consumers with the same client.id so I can easily define kafka quotas for this client. But now I'd have to define multiple quotas (coz_irods_project_usage-0, coz_irods_project_usage-1, coz_irods_project_usage-2 ...)Michał Sochoń
09/24/2025, 6:26 PMBoice Lu
09/26/2025, 2:00 AMFlinkSessionJob
that consumes data from a Kafka topic and writes it to a BigQuery table.
When the JobManager starts, it seems that the initialization step — possibly checking the BigQuery table — takes a long time. During this delay, the web upload temporary file appears to be deleted, causing the job to fail.
We are seeing the following exception:
Caused by: java.io.FileNotFoundException:
/tmp/flink-web-11008d54-f3d5-4458-8b2c-531d6c88ab1a/flink-web-upload/edc628df-da1b-4b0f-b508-88b3d9891edb_flink-datastream-1.0.0.jar
(No such file or directory)
Has anyone encountered this issue before?
Is there a recommended way to prevent the uploaded jar file from being deleted during job initialization?
Thanks in advance for any help! 🙏Royston
09/29/2025, 2:29 PMJeremy Tee
10/01/2025, 7:35 AML P V
10/01/2025, 12:03 PM'format' = 'protobuf'
for a Kafka source table. The CREATE TABLE
succeeds, but EXPLAIN PLAN FOR SELECT *
fails with ClassNotFoundException
during schema inference/planning. This happens in a distributed setup with Flink SQL Gateway and a remote Flink Session Cluster. I've tried dynamic JAR loading via GCS (gs://
) but the class isn't resolvable at planning time, even though GCS access works (no filesystem errors).
Setup:
• Flink version: 1.19.1
• SQL Client startup: ./bin/sql-client.sh gateway --endpoint 0.0.0.0:8083 -Drest.address=xxxxx -Drest.port=8081
• JAR: trackify_proto_file_deploy.jar
(built from Protobuf-generated code via Maven/Gradle). Verified contents with `jar -tvf`: Includes vn/com/momo/mle/proto/v1/UserCountResult.class
and inner classes (e.g., UserCountResult$1.class
). It's a fat JAR with protobuf-java
shaded in.
Steps to Reproduce:
• Start SQL Client (as above) and connect to Gateway session.
• Add JAR and set config:
ADD JAR 'gs://..../trackify_proto_file_deploy.jar';
SET 'pipeline.jars' = 'gs://ml-game-short-term-storage-sing/flink_sql/jars/trackify_proto_file_deploy.jar';
◦ Both succeed: [INFO] Execute statement succeed.
/ [INFO] JAR added successfully.
• Create table (parses fine, registers in catalog):
CREATE TABLE src.kafka.test (
.......
) WITH (
'connector' = 'kafka',
'topic' = '',
'properties.bootstrap.servers' = '...',
'format' = 'protobuf',
'protobuf.message-class-name' = 'vn.com.momo.mle.proto.v1.UserCountResult',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.group.id' = ',
'properties.sasl.jaas.config' = '...',
'scan.startup.mode' = 'earliest-offset'
);
Succeeds: [INFO] Execute statement succeed.
• Run planning:
EXPLAIN PLAN FOR SELECT * FROM src.kafka.test;
• Fails with: [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: vn.com.momo.mle.proto.v1.UserCountResult
What I've Tried (No Redeploy):
• SHOW JARS;
lists the added JAR.
Question/Goal:
• Why does Protobuf class loading fail at planning time (EXPLAIN PLAN
) in a Gateway + remote Session Cluster, even with pipeline.jars
set to a resolvable URL (gs:// or HTTP via REST upload)? Is planning strictly client-side, or does the Gateway/JM need additional setup for dynamic JARs in custom formats like Protobuf?
• How to dynamically add/update Protobuf JARs (e.g., for schema evolution in Kafka topics) without redeploying the Flink image or putting in /opt/flink/lib/
?Lukasz Krawiec
10/01/2025, 1:43 PMמייקי בר יעקב
10/01/2025, 9:38 PMמייקי בר יעקב
10/02/2025, 10:20 AMMadhusudhan Reddy
10/02/2025, 3:42 PMVictor Babenko
10/03/2025, 5:02 AMChennupati Gopal
10/03/2025, 7:35 AMJaya Ananthram
10/04/2025, 11:17 AMSource → Map
). Does serialization/deserialization occur?
2. Between slots within the same TaskManager
◦ If two subtasks are running in different slots of the same TaskManager, does Flink serialize/deserialize the record when passing data between them?
3. Between slots across different TaskManagers
◦ When data is exchanged between subtasks (e.g., after keyBy
) located in different TaskManagers, does Flink always perform serialization/deserialization during transfer? - I believe this is obviously yes, as it is different from JVM communication
(Context: I am dealing with some performance fine-tuning, so I would like to know the behaviour to understand my job bottleneck)Royston
10/04/2025, 7:27 PMraphaelauv
10/05/2025, 10:46 PMio.confluent.kafka.serializers.subject.TopicNameStrategy
but only an explicit subject
does anyone, has already found a solution ? thanks allGrzegorz Liter
10/06/2025, 9:21 AMat app//org.rocksdb.Checkpoint.createCheckpoint(Native Method)
at app//org.rocksdb.Checkpoint.createCheckpoint(Checkpoint.java:50)
at app//org.apache.flink.state.rocksdb.snapshot.RocksDBSnapshotStrategyBase.takeDBNativeCheckpoint(RocksDBSnapshotStrategyBase.java:174)
Evan NotEvan
10/06/2025, 11:36 AM1024
, the output sequence should be:
1024 -> 512 -> 256 -> ... -> 1
The interesting part is that I’m using Flink 2.1, where DataStream.iterate()
and `IterativeStream`—basically the built-in Iteration API—have been removed. On top of that, Flink ML currently only supports Flink versions up to 1.17.
Without these options, it’s up to the user to implement their own iteration. So far, I’ve put together the following code, which attempts to demonstrate a feedback loop. I’ve also included a diagram to better illustrate the implementation.
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class FeedbackLoopExample {
// Define a side output for feedback
private static final OutputTag<Integer> FEEDBACK_TAG = new OutputTag<Integer>("feedback"){};
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Initial input
DataStream<Integer> input = env.fromElements(1024)
.assignTimestampsAndWatermarks(WatermarkStrategy.noWatermarks());
// Process function that splits into converged vs non-converged
SingleOutputStreamOperator<Integer> processed = input.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) {
if (value > 1) {
// Not yet converged → send to feedback
ctx.output(FEEDBACK_TAG, value / 2); // iteration step
} else {
// Converged → emit as output
out.collect(value);
}
}
});
// Feedback stream (non-converged)
DataStream<Integer> feedback = processed.getSideOutput(FEEDBACK_TAG);
// Apply the same process again (iterative feedback)
SingleOutputStreamOperator<Integer> processedFeedback = feedback.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out) {
if (value > 1) {
ctx.output(FEEDBACK_TAG, value / 2);
} else {
out.collect(value);
}
}
});
// Union feedback outputs → continue iteration
DataStream<Integer> loopFeedback = processedFeedback.getSideOutput(FEEDBACK_TAG);
// Final results = outputs from both initial and feedback iterations
DataStream<Integer> finalResults = processed.union(processedFeedback);
finalResults.print("Converged");
loopFeedback.print("Feedback"); // shows values still circulating
env.execute("Flink Feedback Loop Example");
}
}
This is the output if you run the code
1024 -> 512 -> 256 (stops here)
It seems like the job only performs two iterations and then stops instead of counting down to 1. This makes me think that my attempt to loop back the feedback stream, didn’t actually work.
My first question is: what’s the correct way to implement a feedback loop in Flink 2.0+ given the constrains mentioned above?
Second question: is it ok if two different process functions use the same side out to write data ?
Third question; is there any way to extract or analyze the graph topology of a job in Flink ?Fabrizzio Chavez
10/06/2025, 9:55 PMDennis Sosnoski
10/06/2025, 10:55 PMAjay Sarjoo
10/07/2025, 3:53 PMRoyston
10/09/2025, 10:50 AMTrystan
10/09/2025, 3:12 PMVictor Babenko
10/09/2025, 8:22 PMRoyston
10/10/2025, 3:08 PMTrystan
10/10/2025, 5:19 PMEvent | Info | SCALINGREPORT | Scaling execution enabled, begin scaling vertices:
{ Vertex ID 5591cebca2a45b74833df01196d1a431 | Parallelism 74 -> 80 | Processing capacity 6671.45 -> 7131.00 | Target data rate 3889.81}
{ Vertex ID 162a29c968c94eac6fbdfc2f8ccc2080 | Parallelism 180 -> 128 | Processing capacity Infinity -> Infinity | Target data rate 50.67}
{ Vertex ID 009b60dbc7737a8a2e91e7b3c30d9949 | Parallelism 128 -> 169 | Processing capacity 70.68 -> 93.00 | Target data rate 50.67}
these autoscaling decisions don't make a lot of sense to me. why on earth is it scaling UP 009b
?
here's the output log from right before:
ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.TRUE_PROCESSING_RATE.Average: 71.325
ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.SCALE_DOWN_RATE_THRESHOLD.Current: 176.0
ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.SCALE_UP_RATE_THRESHOLD.Current: 56.0
is this a catchup buffer problem? maybe it thinks it would be unable to catch up within the desired window?Barak Ben-Nathan
10/12/2025, 7:35 AMState Processor API
to generate both a keyed state and a broadcast state for the same operator?