Francisco Morillo
10/28/2025, 7:29 PMNoufal Rijal
10/29/2025, 5:59 AMapiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-session
namespace: flink-test
spec:
image: pyflink-session-test:v1.0
flinkVersion: v1_20
imagePullPolicy: Always
serviceAccount: flink
mode: standalone
flinkConfiguration:
# All Flink runtime config keys go here
fs.allowed-fallback-filesystems: "file"
io.tmp.dirs: "/tmp"
taskmanager.numberOfTaskSlots: "4"
# ===== OPERATOR AUTOSCALER =====
kubernetes.operator.job.autoscaler.enabled: "true"
kubernetes.operator.job.autoscaler.target.utilization: "0.7"
kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
kubernetes.operator.job.autoscaler.metrics.window: "5m"
kubernetes.operator.job.autoscaler.scale-up.grace-period: "1m"
kubernetes.operator.job.autoscaler.scale-down.grace-period: "5m"
# # đĄ MOVED: jobManager must be a direct child of 'spec'
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
replicas: 2
resource:
# memory: "10240m"
memory: "2048m"
cpu: 2
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: ppe-kafka-streaming
namespace: flink-test
spec:
deploymentName: flink-session
job:
jarURI: /opt/flink/opt/flink-python-1.20.3.jar
# entryClass: org.apache.flink.client.python.PythonDriver
# args:
# # 1. Main PyFlink Script
# - "--python" # Changed from -py
# - "/opt/flink/usrlib/streaming_test.py"
# # 2. Python Archives
# - "--pyArchives" # Changed from -pyarch
# - "blob_path#flink_venv"
# # 3. Python Executable
# - "-pyexec" # This is correct!
# - "flink_venv/bin/python"
args:
- "-py"
- "/opt/flink/usrlib/streaming_test.py"
- "-pyarch"
- "blob_path#flink_venv"
- "-pyexec"
- "flink_venv/bin/python"
parallelism: 2
Request for your help if you have faced and tackled a similar issue.
#C065944F9M2 #C03G7LJTS2G #C03GV7L3G2CSaketh
10/29/2025, 7:09 AMMohamed Galal
10/29/2025, 7:19 AMAndrĂŠ Santos
10/29/2025, 7:13 PMAbstractFlinkService.submitJobToSessionCluster() - it bypasses rest pod leader discovery entirely.Royston
10/30/2025, 11:43 AMIain Dixon
10/30/2025, 1:23 PMinPoolUsage and outPoolUsage are good metrics with which to assess the presence of backpressure. To test things out I built a really simple setup as seen in the picture below, where records are generated in the generator (via a loop to create the rate and Thread.sleep at the end to buff out the rest of the second, based on the DS2 wordcount here https://github.com/strymon-system/ds2/blob/master/flink-examples/src/main/java/ch/ethz/systems/strymon/ds2/flink/wordcount/sources/RateControlledSourceFunction.java), sent to a pipeline workload simulator (which is a single operator which counts the number of recieved records and runs a Thread.sleep for different frequencies in order to simulate pipeline workload), and exit to a sink where records are received but not saved or sent onwards. I bound the parallelism of each operator to 1 (to create the minimumal possible pipeline). The generator produces a constant workload of 1000 records per second, and the workload simulator produces a constant work for every n records.
!experimental_setupâž
outPoolUsage would trend up to some value (as roughly 500ms of "work" should be created every second) and remain relatively constant at that value, raher than dipping back down and jumping up as seen in the graph. I'm not sure what mechanism in Flink would be responsible for this behaviour if the workload is constant, and I was wondering anyone working on Flink could explain what's occuring or point me in the right direction. I'm aware (from the linked blog post above) that the outPoolUsage metric is an aggregation of the floatingBuffersUsage and exclusiveBuffersUsage metrics, so the dropping to 10% would be one of the exclusiveBuffers, but why would floating buffers come and go if the pipeline workload and arrival rates are constant?
!buffer_questionâž
Noufal Rijal
11/04/2025, 4:19 PMSidharth Ramalingam
11/05/2025, 6:12 AMAsyncIO function with Futures to make external gRPC calls. Currently, we have set the async capacity to 1, and we are using a blocking stub to make those calls. For each event, we trigger 4 Futures (i.e., 4 gRPC calls per event).
Does this mean that the Executors.newFixedThreadPool() needs to have at least 4 threads to avoid queue starvation? Also, if we increase the async capacity to 2, should we increase the thread pool size to 8 to keep up with the parallel calls?Jashwanth S J
11/06/2025, 6:50 AMjarURI: "file:///opt/flink/integration_testing_ui_demo.jar". Operator is failing to find the jar inside the JM/TM pod to bring up TM pods even though jar is present within image. I could exec and see the jar. Can someone help to find the cause
Operator logs:
2025-11-06 06:36:55,168 o.a.f.k.o.r.ReconciliationUtils [WARN ][ncm-sc/nc-int] Attempt count: 11, last attempt: false
2025-11-06 06:36:55,259 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2025-11-06 06:36:55,362 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2025-11-06 06:36:55,459 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][ncm-sc/fsc-flink-cluster] Resource fully reconciled, nothing to do...
2025-11-06 06:36:55,469 i.j.o.p.e.EventProcessor [ERROR][ncm-sc/nc-int] Error during event processing ExecutionScope{ resource id: ResourceID{name='nc-int', namespace='ncm-sc'}, version: 48211671}
org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:130)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:58)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
Exec to JM Pod: I have jar inside
⯠kubectl exec -it -n ncm-sc fsc-flink-cluster-6c47f5964b-t5jhb -- bash
Defaulted container "flink-main-container" out of: flink-main-container, setup-certs-and-plugins (init)
root@fsc-flink-cluster-6c47f5964b-t5jhb:/opt/flink# ls
'${sys:log.file}' bin examples lib licenses NOTICE plugins README.txt
artifacts conf integration_testing_ui_demo.jar LICENSE log opt pod-template
Describe flinksession job
⯠kubectl describe flinksessionjob nc-int -n ncm-sc | tail -n 20
Status:
Error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{},"throwableList":[{"type":"java.io.FileNotFoundException","message":"/opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{}}]}
Job Status:
Checkpoint Info:
Last Periodic Checkpoint Timestamp: 0
Job Id: f72f53c6355212276b75452aa2dc376e
Savepoint Info:
Last Periodic Savepoint Timestamp: 0
Savepoint History:
Lifecycle State: UPGRADING
Observed Generation: 2
Reconciliation Status:
Last Reconciled Spec: {"spec":{"job":{"jarURI":"file:///opt/flink/integration_testing_ui_demo.jar","parallelism":1,"entryClass":"com.beam.screaltime.worker.ncm.process_functions.NcmEventsJob","args":["--jobType","cdc","--cloudProvider","nx","--buildId","dev1-6404","--operatorParallelism","{\"default\":1}"],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null,"autoscalerResetNonce":null},"restartNonce":1,"flinkConfiguration":null,"deploymentName":"fsc-flink-cluster"},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","firstDeployment":true}}
Reconciliation Timestamp: 1762411304062
State: UPGRADING
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning SessionJobException 7m5s (x19 over 23m) Job /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
Normal Submit 7m5s (x19 over 23m) Job Starting deploymentwindwheel
11/07/2025, 8:59 AMKamakshi
11/07/2025, 10:59 PMElad
11/09/2025, 1:03 PMHan You
11/10/2025, 3:53 PMVasu Dins
11/11/2025, 6:53 AMflink-connector-elasticsearch7-3.1.0-1.20.jar.
when I run my job, Iâm getting the following error:
Exception in thread "Thread-5" java.lang.NoClassDefFoundError: org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase
py4j.protocol.Py4JError: org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder does not exist in the JVM
hereâs a minimal example of my code
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///C:/flink-1.20.1/lib/flink-connector-elasticsearch7-3.1.0-1.20.jar")
data_stream = env.from_collection([
{"id": "1", "name": "John", "age": 25},
{"id": "2", "name": "Jane", "age": 30}
])
json_stream = data_stream.map(lambda x: json.dumps(x))
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static_index('my-index', 'id')) \
.set_hosts(['10.0.0.102:9200']) \
.build()
json_stream.sink_to(es7_sink)
env.execute("Flink Elasticsearch 7 Job")
iâm running it using
flink run -py sampletestelastic.py
has anyone faced this issue before? seems like ElasticsearchSinkBuilderBase class is missing from the jar or not visible to PyFlink.
do i need an extra dependency or different jar for flink 1.20.1?
It seems like ElasticsearchSinkBuilderBase might be missing or not accessible from the JVM side.
any guidance or suggestions would be really appreciatedVikas Patil
11/11/2025, 9:02 PMĺžĺšł
11/12/2025, 5:16 AMGAURAV MIGLANI
11/12/2025, 8:30 AMHristo Yordanov
11/12/2025, 7:28 PMAnanth bharadwaj
11/13/2025, 2:44 AMMehdi Jalili
11/13/2025, 2:45 PMscan.watermark.idle-timeout. Iâm using the SQL Gateway to join two tables, backed by Kafka topics. One is a fact table that receives a constant stream of data. The other one is a compact topic populated by Debezium that contains dimension data and Iâm joining the two to enrich the facts.
I want to sink the results to an append-only Kafka table sink which is why I have made the dimension table versioned and use a Temporal join to get the snapshot of the dimensions at the specified fact timestamp.
Because the dimension stream rarely gets any new data, the watermark will usually be hours if not days behind the fact stream watermark and this will stop the output watermark from progressing. I have used scan.watermark.idle-timeout on the dimension table to let the watermark generator know to ignore the dimension table watermark but I still see the watermark on the TemporalJoin operator as the minimum of the two. What am i I doing wrong?
Sorry if my grasp of these concepts are somewhat shaky. Iâd really appreciate the help. Thank you.
Hereâs some simplified code to demonstrate the setup:Brin
11/14/2025, 12:40 PMtest.vip_record) and the sink table (test.ods_vip_record) currently have exactly the same schema, and no DDL changes have been made during the job execution.
However, the job keeps failing after running for some time, and the logs show the following error:
2025-11-14 20:32:03
java.lang.IllegalStateException: Unable to coerce data record from test.vip_record (schema: null) to test.ods_vip_record (schema: null)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:235)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:232)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
The key part is:
schema: null
which suggests that both the source and sink schemas are unexpectedly lost or not recognized during the runtime.
---
### Additional information
* Deployment mode: Flink on Kubernetes (Native K8s Mode)
* Source: MySQL 5.7
* Sink: StarRocks
* Flink CDC performs snapshot + incremental reading normally after restart
* After 1â2 hours of normal running, the job fails with the above error
* No schema changes were made during this time
---
### Questions
1. Under what conditions would SchemaOperator encounter (schema: null) for both source and sink?
2. Could this be related to state inconsistency or schema metadata loss inside checkpoint/savepoint?
3. Is this a known issue in Flink CDC 3.3 with Flink 1.20?
4. Are there recommended configurations to ensure stable schema management for MySQL â StarRocks pipelines?
Thank you!melin li
11/17/2025, 7:04 AMIdan Sheinberg
11/17/2025, 11:54 AMtaskmanager.numOfTaskSlots via the Flink Kubenetes Operator.
I'm using the operator version 1.13.0 and Flink 1.20.3. While the configmap generated by the operator correctly reflects the set value ('3' in my case). It looks like during the actual taskmanager startup process, it resolves taskmanager.numOfTaskSlots=1 as a Dynamic configuration property, which overrides the value at runtime. Has anyone successfully set a value larger than 1 in K8s in the past?Roberto Serafin
11/17/2025, 8:03 PMLee Wallen
11/17/2025, 11:40 PMSaslClientAuthenticator failing to configure.
Relevant configuration:
⢠Added classloader.parent-first-patterns.additional per Confluent Cloud support to avoid child-first classloading issues with OAuth:
org.apache.kafka.common.security.oauthbearer.;
org.apache.kafka.common.security.auth.;
org.apache.kafka.common.security.plain.;
org.apache.kafka.common.security.scram.
⢠Both sasl.login.callback.handler.class and sasl.client.callback.handler.class are set to:
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
Runtime classpath (Kafka-related):
+--- io.confluent:kafka-avro-serializer:7.4.12
| +--- io.confluent:kafka-schema-serializer:7.4.12
| | +--- io.confluent:kafka-schema-registry-client:7.4.12
| +--- io.confluent:kafka-schema-registry-client:7.4.12 (*)
+--- org.apache.flink:flink-connector-kafka:3.3.0-1.19
| +--- org.apache.kafka:kafka-clients:3.4.0
Stacktrace excerpt:
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
Has anyone encountered this issue or have suggestions on how to resolve it?
I'll add a thread and include a snippet with the full stacktrace.Anuj Jain
11/18/2025, 6:24 AMif you use helm to install flink-kubernetes-operator, it allows you to specify a postStart hook to download the required plugins.
i need help how we use this posStart hook, is there any example ?Ashish Marottickal Gopi
11/19/2025, 11:10 AMJatin Kumar
11/19/2025, 2:03 PMXianxin Lin
11/19/2025, 8:32 PMCollected metrics:
Backlog information at each source
Incoming data rate at the sources (e.g. records/sec written into the Kafka topic)
Record processing rate at each job vertex
Busy and backpressured time at each job vertex