徐平
11/12/2025, 5:16 AMGAURAV MIGLANI
11/12/2025, 8:30 AMHristo Yordanov
11/12/2025, 7:28 PMAnanth bharadwaj
11/13/2025, 2:44 AMMehdi Jalili
11/13/2025, 2:45 PMscan.watermark.idle-timeout. I’m using the SQL Gateway to join two tables, backed by Kafka topics. One is a fact table that receives a constant stream of data. The other one is a compact topic populated by Debezium that contains dimension data and I’m joining the two to enrich the facts.
I want to sink the results to an append-only Kafka table sink which is why I have made the dimension table versioned and use a Temporal join to get the snapshot of the dimensions at the specified fact timestamp.
Because the dimension stream rarely gets any new data, the watermark will usually be hours if not days behind the fact stream watermark and this will stop the output watermark from progressing. I have used scan.watermark.idle-timeout on the dimension table to let the watermark generator know to ignore the dimension table watermark but I still see the watermark on the TemporalJoin operator as the minimum of the two. What am i I doing wrong?
Sorry if my grasp of these concepts are somewhat shaky. I’d really appreciate the help. Thank you.
Here’s some simplified code to demonstrate the setup:Brin
11/14/2025, 12:40 PMtest.vip_record) and the sink table (test.ods_vip_record) currently have exactly the same schema, and no DDL changes have been made during the job execution.
However, the job keeps failing after running for some time, and the logs show the following error:
2025-11-14 20:32:03
java.lang.IllegalStateException: Unable to coerce data record from test.vip_record (schema: null) to test.ods_vip_record (schema: null)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:235)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:232)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
The key part is:
schema: null
which suggests that both the source and sink schemas are unexpectedly lost or not recognized during the runtime.
---
### Additional information
* Deployment mode: Flink on Kubernetes (Native K8s Mode)
* Source: MySQL 5.7
* Sink: StarRocks
* Flink CDC performs snapshot + incremental reading normally after restart
* After 1–2 hours of normal running, the job fails with the above error
* No schema changes were made during this time
---
### Questions
1. Under what conditions would SchemaOperator encounter (schema: null) for both source and sink?
2. Could this be related to state inconsistency or schema metadata loss inside checkpoint/savepoint?
3. Is this a known issue in Flink CDC 3.3 with Flink 1.20?
4. Are there recommended configurations to ensure stable schema management for MySQL → StarRocks pipelines?
Thank you!melin li
11/17/2025, 7:04 AMIdan Sheinberg
11/17/2025, 11:54 AMtaskmanager.numOfTaskSlots via the Flink Kubenetes Operator.
I'm using the operator version 1.13.0 and Flink 1.20.3. While the configmap generated by the operator correctly reflects the set value ('3' in my case). It looks like during the actual taskmanager startup process, it resolves taskmanager.numOfTaskSlots=1 as a Dynamic configuration property, which overrides the value at runtime. Has anyone successfully set a value larger than 1 in K8s in the past?Roberto Serafin
11/17/2025, 8:03 PMLee Wallen
11/17/2025, 11:40 PMSaslClientAuthenticator failing to configure.
Relevant configuration:
• Added classloader.parent-first-patterns.additional per Confluent Cloud support to avoid child-first classloading issues with OAuth:
org.apache.kafka.common.security.oauthbearer.;
org.apache.kafka.common.security.auth.;
org.apache.kafka.common.security.plain.;
org.apache.kafka.common.security.scram.
• Both sasl.login.callback.handler.class and sasl.client.callback.handler.class are set to:
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
Runtime classpath (Kafka-related):
+--- io.confluent:kafka-avro-serializer:7.4.12
| +--- io.confluent:kafka-schema-serializer:7.4.12
| | +--- io.confluent:kafka-schema-registry-client:7.4.12
| +--- io.confluent:kafka-schema-registry-client:7.4.12 (*)
+--- org.apache.flink:flink-connector-kafka:3.3.0-1.19
| +--- org.apache.kafka:kafka-clients:3.4.0
Stacktrace excerpt:
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
Has anyone encountered this issue or have suggestions on how to resolve it?
I'll add a thread and include a snippet with the full stacktrace.Anuj Jain
11/18/2025, 6:24 AMif you use helm to install flink-kubernetes-operator, it allows you to specify a postStart hook to download the required plugins.
i need help how we use this posStart hook, is there any example ?Ashish Marottickal Gopi
11/19/2025, 11:10 AMJatin Kumar
11/19/2025, 2:03 PMXianxin Lin
11/19/2025, 8:32 PMCollected metrics:
Backlog information at each source
Incoming data rate at the sources (e.g. records/sec written into the Kafka topic)
Record processing rate at each job vertex
Busy and backpressured time at each job vertexSebastian Tota
11/20/2025, 7:42 PMAvroParquetWriters.forReflectRecord since it specifically returns back the T object we pass in, but we know this is the least efficient solution. I thought about switching to using forGenericRecord, but that method returns a ParquetWriterFactory of GenericRecord which means we no longer have access to the event time information of the record in our Bucket Assigner.
Does anyone have any recommendations for how we can optimize this Parquet writer?Ricardo Mendoza
11/21/2025, 2:40 AMPrafful Javare
11/22/2025, 4:52 PMFROM flink:1.17.1
RUN wget -P /opt/flink/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.4.0-1.20/flink-sql-connector-kafka-3.4.0-1.20.jar>
RUN chown -R flink:flink /opt/flink/lib
COPY ./exercises/target/*.jar .
services:
broker:
image: apache/kafka:4.0.0
platform: linux/amd64
container_name: broker
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: <PLAINTEXT://0.0.0.0:9092>,<CONTROLLER://localhost:9093>
KAFKA_ADVERTISED_LISTENERS: <PLAINTEXT://broker:9092>
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
kcat:
image: edenhill/kcat:1.7.1
platform: linux/amd64
container_name: flink_kcat
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
sql-client:
build: .
command: bin/sql-client.sh
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
jobmanager:
build: .
ports:
- "8081:8081"
command: jobmanager
volumes:
- flink_data:/tmp/
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
state.backend: hashmap
state.checkpoints.dir: file:///tmp/flink-checkpoints
heartbeat.interval: 1000
heartbeat.timeout: 5000
rest.flamegraph.enabled: true
web.backpressure.refresh-interval: 10000
taskmanager:
build: .
depends_on:
- jobmanager
command: taskmanager
volumes:
- flink_data:/tmp/
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 3
state.backend: hashmap
state.checkpoints.dir: file:///tmp/flink-checkpoints
heartbeat.interval: 1000
heartbeat.timeout: 5000
volumes:
flink_data:
but when I run docker compose up and try to submit the job to the flink cluster, I get this error:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.lineage.LineageVertexProvider
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
... 46 more
I am using flink 1.17.1 if that helps. I am able to run the WordCount example, but this specific exercise involving writing to a kafka sink is not working. Any pointers? Thanks!Vedanth Baliga
11/23/2025, 9:42 AM# ---------- Namespace ----------
apiVersion: v1
kind: Namespace
metadata:
name: telemetry
---
# ---------- Redpanda-1 ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-1
namespace: telemetry
spec:
selector:
app: redpanda-1
ports:
- name: kafka
port: 29092
targetPort: 29092
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-1
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-1
template:
metadata:
labels:
app: redpanda-1
spec:
containers:
- name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
args:
- redpanda
- start
- --smp
- "1"
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- "1"
- --kafka-addr
- <PLAINTEXT://0.0.0.0:29092>
- --advertise-kafka-addr
- <PLAINTEXT://redpanda-1:29092>
- --rpc-addr
- 0.0.0.0:33145
- --advertise-rpc-addr
- redpanda-1:33145
- --pandaproxy-addr
- <PLAINTEXT://0.0.0.0:28082>
- --advertise-pandaproxy-addr
- <PLAINTEXT://redpanda-1:28082>
ports:
- containerPort: 29092
- containerPort: 28082
- containerPort: 33145
volumeMounts:
- name: data
mountPath: /var/lib/redpanda
volumes:
- name: data
emptyDir: {}
---
# ---------- Redpanda-2 ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-2
namespace: telemetry
spec:
selector:
app: redpanda-2
ports:
- name: kafka
port: 29093
targetPort: 29093
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-2
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-2
template:
metadata:
labels:
app: redpanda-2
spec:
containers:
- name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
args:
- redpanda
- start
- --smp
- "1"
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- "2"
- --seeds
- redpanda-1:33145
- --kafka-addr
- <PLAINTEXT://0.0.0.0:29093>
- --advertise-kafka-addr
- <PLAINTEXT://redpanda-2:29093>
- --rpc-addr
- 0.0.0.0:33146
- --advertise-rpc-addr
- redpanda-2:33146
- --pandaproxy-addr
- <PLAINTEXT://0.0.0.0:28083>
- --advertise-pandaproxy-addr
- <PLAINTEXT://redpanda-2:28083>
ports:
- containerPort: 29093
- containerPort: 28083
- containerPort: 33146
volumeMounts:
- name: data
mountPath: /var/lib/redpanda
volumes:
- name: data
emptyDir: {}
---
# ---------- Redpanda Console ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-console
namespace: telemetry
spec:
selector:
app: redpanda-console
ports:
- name: http
port: 8080
targetPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-console
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-console
template:
metadata:
labels:
app: redpanda-console
spec:
containers:
- name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v2.2.4
command: ["/bin/sh", "-c"]
args:
- echo "$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda-1:29092", "redpanda-2:29093"]
schemaRegistry:
enabled: false
redpanda:
adminApi:
enabled: false
connect:
enabled: false
ports:
- containerPort: 8080
---
# ---------- Flink: JobManager ----------
apiVersion: v1
kind: Service
metadata:
name: jobmanager
namespace: telemetry
spec:
selector:
app: jobmanager
ports:
- name: rpc
port: 6123
targetPort: 6123
- name: ui
port: 8081
targetPort: 8081
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: jobmanager
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: jobmanager
template:
metadata:
labels:
app: jobmanager
spec:
containers:
- name: jobmanager
image: flink-sql-k8s:1.19
args: ["jobmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
jobmanager.bind-host: 0.0.0.0
jobmanager.rpc.port: 6123
rest.address: jobmanager
rest.bind-address: 0.0.0.0
rest.port: 8081
ports:
- containerPort: 6123
- containerPort: 8081
---
# ---------- Flink: TaskManager ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: taskmanager
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: taskmanager
template:
metadata:
labels:
app: taskmanager
spec:
containers:
- name: taskmanager
image: flink-sql-k8s:1.19
args: ["taskmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 20
taskmanager.bind-host: 0.0.0.0
ports:
- containerPort: 6121
- containerPort: 6122
---
# ---------- Flink: SQL Client ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: sql-client
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: sql-client
template:
metadata:
labels:
app: sql-client
spec:
containers:
- name: sql-client
image: flink-sql-k8s:1.19
command: ["bash", "-c", "sleep infinity"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
---
# ---------- ClickHouse ----------
apiVersion: v1
kind: Service
metadata:
name: clickhouse
namespace: telemetry
spec:
selector:
app: clickhouse
ports:
- name: http
port: 8123
targetPort: 8123
- name: native
port: 9000
targetPort: 9000
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: clickhouse
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: clickhouse
template:
metadata:
labels:
app: clickhouse
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:23.8
ports:
- containerPort: 8123
- containerPort: 9000
volumeMounts:
- name: data
mountPath: /var/lib/clickhouse
- name: logs
mountPath: /var/log/clickhouse-server
volumes:
- name: data
emptyDir: {}
- name: logs
emptyDir: {}
---
# ---------- Telemetry Producer (Python) ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: telemetry-producer
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: telemetry-producer
template:
metadata:
labels:
app: telemetry-producer
spec:
containers:
- name: telemetry-producer
image: telemetry-producer:latest
imagePullPolicy: IfNotPresent
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "redpanda-1:29092"
- name: KAFKA_TOPIC
value: "fleet.prod.telemetry.raw"
My Dockerfile
# base image
FROM flink:1.19-scala_2.12-java11
USER root
RUN wget -P /opt/flink/lib/ \
<https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar> && \
wget -P /opt/flink/lib/ \
<https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.19.0/flink-json-1.19.0.jar>Bhargav Vekariya
11/24/2025, 6:11 PMsource:
type: mysql
hostname: 10.3.4.168
name: ms_mysql_prod_sales_us_ca
port: 3306
username: flink_replication_admin
password: Uq^ZXXXXXXX
tables: |
ms.sales_flat_quote_item,
ms_canada.sales_flat_quote_item
server-id: 8000-8500
server-time-zone: UTC
scan.newly-added-table.enabled: true
schema-change.enabled: true
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:<mysql://10.231.XX.XX:9030>
load-url: 10.231.XX.XX:8030
username: root
password: dpBjIXXXXXX
sink.buffer-flush.interval-ms: 2000
sink.buffer-flush.max-bytes: 67108864
table.create.properties.replication_num: 1
sink.at-least-once.use-transaction-stream-load: false
sink.io.thread-count: 6
sink.label-prefix: flink_sync_custom
route:
- source-table: ms.\.*
sink-table: ms.mage_us_<>
replace-symbol: <>
description: route all tables in ms to ms
- source-table: ms_canada.\.*
sink-table: ms.mage_ca_<>
replace-symbol: <>
description: route all tables in ms_canada to ms
pipeline:
parallelism: 4
name: Sync MySQL Sales US/CA DWH Tables to StarRocks
execution.checkpointing.interval: 30000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 300000
execution.checkpointing.min-pause: 5000
state.backend.changelog.enabled: trueAshish Marottickal Gopi
11/24/2025, 8:06 PMprocessBroadcastElement function
Question: Is Broadcasting the state the right approach ? I'm specifically confused on following points:
1. Will the broadcasted state be consistent across the parallel instances of the non-broadcasted stream ?
2. Since I have the CDC kind of data in the broadcasted stream, I believe the state update that i make will be consistent across the parallel tasks of the non-broadcasted stream ?
3. Should I change the parallelism of the broadcast stream to 1 ?
4. Or does it seems good to just use a KeyedCoProcessJoin for this ?Eddy Agossou
11/25/2025, 11:51 AMBen Mali
11/27/2025, 8:05 AMFlinkDeployment , defining spec.jobManager.replicas works as expected, but when I'm trying to specify spec.taskManager.replicas it seems to be ignored. Does anyone know how can I manually configure the number of task manager pods? Should I configure it using HPA?Philipp
11/27/2025, 10:12 AMHiếu trịnh
11/28/2025, 3:32 AMH.P. Grahsl
12/01/2025, 10:41 AMJon Slusher
12/02/2025, 6:52 PMAleksei Perminov
12/03/2025, 3:21 PM2025-11-22 10:53:52.155 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}, current pending count: 2.
2025-11-22 10:53:52.156 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.e.ExternalResourceUtils - Enabled external resources: []
2025-11-22 10:53:52.157 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.configuration.Configuration - Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.taskmanager.service-account'
2025-11-22 10:53:52.160 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Creating new TaskManager pod with name taskmanager-3-2 and resource <16384,12.0>.
2025-11-22 10:53:52.260 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:35310] failed with null
2025-11-22 10:53:52.272 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Pod taskmanager-3-2 is created.
2025-11-22 10:53:52.281 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Received new TaskManager pod: taskmanager-3-2
2025-11-22 10:53:52.282 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requested worker taskmanager-3-2 with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}.
2025-11-22 10:53:52.399 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.ReliableDeliverySupervisor - Association with remote system [<pekko.tcp://flink@10.0.0.180:6122>] has failed, address is now gated for [50] ms. Reason: [Disassociated]
2025-11-22 10:54:07.260 [flink-pekko.actor.default-dispatcher-19] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:34164] failed with null
2025-11-22 10:54:15.283 [flink-pekko.actor.default-dispatcher-15] INFO o.a.f.r.r.a.ActiveResourceManager - Registering TaskManager with ResourceID taskmanager-3-2 (<pekko.tcp://flink@10.0.0.181:6122/user/rpc/taskmanager_0>) at ResourceManager
2025-11-22 10:54:15.291 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.s.FineGrainedSlotManager - Registering task executor taskmanager-3-2 under 12848c284ec438712332f1fd3b76a28a at the slot manager.
2025-11-22 10:54:15.292 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Worker taskmanager-3-2 is registered.
2025-11-22 10:54:15.300 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Closing TaskExecutor connection taskmanager-3-2 because: Failed to send initial slot report to ResourceManager. java.lang.IllegalStateException: Cannot decrease, no worker of spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=25}.
Basically, in FlinkDeployment the taskmanager.numberOfTaskSlots is set to 24, but for some reason the slot report shows 25 slots in the active Task Manager, which leads to the mentioned exception.
It doesn't matter whether I set 24 or 32 numberOfTaskSlots, the issue will be the same - +1 slot in slot report and exception.
This happens for about 5 minutes and then suddenly the job starts working fine.
We use 1.12.1 Flink K8S Operator version and Flink 2.0.0. However this issue rarely appears on 1.20.1 as well.
Thanks in advance.morti
12/03/2025, 3:50 PMprogramArgs query parameter is deleted, No program arguments (e.g, --batchSize 5000) are sent to the job.
I'm currently using flink-2.1.1-java21 image. There is also related stackoverflow question:
https://stackoverflow.com/questions/79702619/how-do-i-pass-program-arguments-from-apache-flink-2-0-0-web-gui-to-my-job-proper
It works with rest api mentioned in stackoverflow question but as I said, program arguments are not sent to job with webui.
Any help or Tip, would be appreciated!Jon Slusher
12/05/2025, 9:18 PMRion Williams
12/06/2025, 3:59 AM