Brin
11/14/2025, 12:40 PMtest.vip_record) and the sink table (test.ods_vip_record) currently have exactly the same schema, and no DDL changes have been made during the job execution.
However, the job keeps failing after running for some time, and the logs show the following error:
2025-11-14 20:32:03
java.lang.IllegalStateException: Unable to coerce data record from test.vip_record (schema: null) to test.ods_vip_record (schema: null)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:235)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:232)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
The key part is:
schema: null
which suggests that both the source and sink schemas are unexpectedly lost or not recognized during the runtime.
---
### Additional information
* Deployment mode: Flink on Kubernetes (Native K8s Mode)
* Source: MySQL 5.7
* Sink: StarRocks
* Flink CDC performs snapshot + incremental reading normally after restart
* After 1–2 hours of normal running, the job fails with the above error
* No schema changes were made during this time
---
### Questions
1. Under what conditions would SchemaOperator encounter (schema: null) for both source and sink?
2. Could this be related to state inconsistency or schema metadata loss inside checkpoint/savepoint?
3. Is this a known issue in Flink CDC 3.3 with Flink 1.20?
4. Are there recommended configurations to ensure stable schema management for MySQL → StarRocks pipelines?
Thank you!melin li
11/17/2025, 7:04 AMIdan Sheinberg
11/17/2025, 11:54 AMtaskmanager.numOfTaskSlots via the Flink Kubenetes Operator.
I'm using the operator version 1.13.0 and Flink 1.20.3. While the configmap generated by the operator correctly reflects the set value ('3' in my case). It looks like during the actual taskmanager startup process, it resolves taskmanager.numOfTaskSlots=1 as a Dynamic configuration property, which overrides the value at runtime. Has anyone successfully set a value larger than 1 in K8s in the past?Roberto Serafin
11/17/2025, 8:03 PMLee Wallen
11/17/2025, 11:40 PMSaslClientAuthenticator failing to configure.
Relevant configuration:
• Added classloader.parent-first-patterns.additional per Confluent Cloud support to avoid child-first classloading issues with OAuth:
org.apache.kafka.common.security.oauthbearer.;
org.apache.kafka.common.security.auth.;
org.apache.kafka.common.security.plain.;
org.apache.kafka.common.security.scram.
• Both sasl.login.callback.handler.class and sasl.client.callback.handler.class are set to:
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
Runtime classpath (Kafka-related):
+--- io.confluent:kafka-avro-serializer:7.4.12
| +--- io.confluent:kafka-schema-serializer:7.4.12
| | +--- io.confluent:kafka-schema-registry-client:7.4.12
| +--- io.confluent:kafka-schema-registry-client:7.4.12 (*)
+--- org.apache.flink:flink-connector-kafka:3.3.0-1.19
| +--- org.apache.kafka:kafka-clients:3.4.0
Stacktrace excerpt:
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
Has anyone encountered this issue or have suggestions on how to resolve it?
I'll add a thread and include a snippet with the full stacktrace.Anuj Jain
11/18/2025, 6:24 AMif you use helm to install flink-kubernetes-operator, it allows you to specify a postStart hook to download the required plugins.
i need help how we use this posStart hook, is there any example ?Ashish Marottickal Gopi
11/19/2025, 11:10 AMJatin Kumar
11/19/2025, 2:03 PMXianxin Lin
11/19/2025, 8:32 PMCollected metrics:
Backlog information at each source
Incoming data rate at the sources (e.g. records/sec written into the Kafka topic)
Record processing rate at each job vertex
Busy and backpressured time at each job vertexSebastian Tota
11/20/2025, 7:42 PMAvroParquetWriters.forReflectRecord since it specifically returns back the T object we pass in, but we know this is the least efficient solution. I thought about switching to using forGenericRecord, but that method returns a ParquetWriterFactory of GenericRecord which means we no longer have access to the event time information of the record in our Bucket Assigner.
Does anyone have any recommendations for how we can optimize this Parquet writer?Ricardo Mendoza
11/21/2025, 2:40 AMPrafful Javare
11/22/2025, 4:52 PMFROM flink:1.17.1
RUN wget -P /opt/flink/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.4.0-1.20/flink-sql-connector-kafka-3.4.0-1.20.jar>
RUN chown -R flink:flink /opt/flink/lib
COPY ./exercises/target/*.jar .
services:
broker:
image: apache/kafka:4.0.0
platform: linux/amd64
container_name: broker
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: <PLAINTEXT://0.0.0.0:9092>,<CONTROLLER://localhost:9093>
KAFKA_ADVERTISED_LISTENERS: <PLAINTEXT://broker:9092>
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
kcat:
image: edenhill/kcat:1.7.1
platform: linux/amd64
container_name: flink_kcat
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
sql-client:
build: .
command: bin/sql-client.sh
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
jobmanager:
build: .
ports:
- "8081:8081"
command: jobmanager
volumes:
- flink_data:/tmp/
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
state.backend: hashmap
state.checkpoints.dir: file:///tmp/flink-checkpoints
heartbeat.interval: 1000
heartbeat.timeout: 5000
rest.flamegraph.enabled: true
web.backpressure.refresh-interval: 10000
taskmanager:
build: .
depends_on:
- jobmanager
command: taskmanager
volumes:
- flink_data:/tmp/
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 3
state.backend: hashmap
state.checkpoints.dir: file:///tmp/flink-checkpoints
heartbeat.interval: 1000
heartbeat.timeout: 5000
volumes:
flink_data:
but when I run docker compose up and try to submit the job to the flink cluster, I get this error:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.lineage.LineageVertexProvider
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
... 46 more
I am using flink 1.17.1 if that helps. I am able to run the WordCount example, but this specific exercise involving writing to a kafka sink is not working. Any pointers? Thanks!Vedanth Baliga
11/23/2025, 9:42 AM# ---------- Namespace ----------
apiVersion: v1
kind: Namespace
metadata:
name: telemetry
---
# ---------- Redpanda-1 ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-1
namespace: telemetry
spec:
selector:
app: redpanda-1
ports:
- name: kafka
port: 29092
targetPort: 29092
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-1
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-1
template:
metadata:
labels:
app: redpanda-1
spec:
containers:
- name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
args:
- redpanda
- start
- --smp
- "1"
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- "1"
- --kafka-addr
- <PLAINTEXT://0.0.0.0:29092>
- --advertise-kafka-addr
- <PLAINTEXT://redpanda-1:29092>
- --rpc-addr
- 0.0.0.0:33145
- --advertise-rpc-addr
- redpanda-1:33145
- --pandaproxy-addr
- <PLAINTEXT://0.0.0.0:28082>
- --advertise-pandaproxy-addr
- <PLAINTEXT://redpanda-1:28082>
ports:
- containerPort: 29092
- containerPort: 28082
- containerPort: 33145
volumeMounts:
- name: data
mountPath: /var/lib/redpanda
volumes:
- name: data
emptyDir: {}
---
# ---------- Redpanda-2 ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-2
namespace: telemetry
spec:
selector:
app: redpanda-2
ports:
- name: kafka
port: 29093
targetPort: 29093
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-2
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-2
template:
metadata:
labels:
app: redpanda-2
spec:
containers:
- name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
args:
- redpanda
- start
- --smp
- "1"
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- "2"
- --seeds
- redpanda-1:33145
- --kafka-addr
- <PLAINTEXT://0.0.0.0:29093>
- --advertise-kafka-addr
- <PLAINTEXT://redpanda-2:29093>
- --rpc-addr
- 0.0.0.0:33146
- --advertise-rpc-addr
- redpanda-2:33146
- --pandaproxy-addr
- <PLAINTEXT://0.0.0.0:28083>
- --advertise-pandaproxy-addr
- <PLAINTEXT://redpanda-2:28083>
ports:
- containerPort: 29093
- containerPort: 28083
- containerPort: 33146
volumeMounts:
- name: data
mountPath: /var/lib/redpanda
volumes:
- name: data
emptyDir: {}
---
# ---------- Redpanda Console ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-console
namespace: telemetry
spec:
selector:
app: redpanda-console
ports:
- name: http
port: 8080
targetPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-console
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-console
template:
metadata:
labels:
app: redpanda-console
spec:
containers:
- name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v2.2.4
command: ["/bin/sh", "-c"]
args:
- echo "$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda-1:29092", "redpanda-2:29093"]
schemaRegistry:
enabled: false
redpanda:
adminApi:
enabled: false
connect:
enabled: false
ports:
- containerPort: 8080
---
# ---------- Flink: JobManager ----------
apiVersion: v1
kind: Service
metadata:
name: jobmanager
namespace: telemetry
spec:
selector:
app: jobmanager
ports:
- name: rpc
port: 6123
targetPort: 6123
- name: ui
port: 8081
targetPort: 8081
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: jobmanager
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: jobmanager
template:
metadata:
labels:
app: jobmanager
spec:
containers:
- name: jobmanager
image: flink-sql-k8s:1.19
args: ["jobmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
jobmanager.bind-host: 0.0.0.0
jobmanager.rpc.port: 6123
rest.address: jobmanager
rest.bind-address: 0.0.0.0
rest.port: 8081
ports:
- containerPort: 6123
- containerPort: 8081
---
# ---------- Flink: TaskManager ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: taskmanager
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: taskmanager
template:
metadata:
labels:
app: taskmanager
spec:
containers:
- name: taskmanager
image: flink-sql-k8s:1.19
args: ["taskmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 20
taskmanager.bind-host: 0.0.0.0
ports:
- containerPort: 6121
- containerPort: 6122
---
# ---------- Flink: SQL Client ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: sql-client
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: sql-client
template:
metadata:
labels:
app: sql-client
spec:
containers:
- name: sql-client
image: flink-sql-k8s:1.19
command: ["bash", "-c", "sleep infinity"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
---
# ---------- ClickHouse ----------
apiVersion: v1
kind: Service
metadata:
name: clickhouse
namespace: telemetry
spec:
selector:
app: clickhouse
ports:
- name: http
port: 8123
targetPort: 8123
- name: native
port: 9000
targetPort: 9000
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: clickhouse
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: clickhouse
template:
metadata:
labels:
app: clickhouse
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:23.8
ports:
- containerPort: 8123
- containerPort: 9000
volumeMounts:
- name: data
mountPath: /var/lib/clickhouse
- name: logs
mountPath: /var/log/clickhouse-server
volumes:
- name: data
emptyDir: {}
- name: logs
emptyDir: {}
---
# ---------- Telemetry Producer (Python) ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: telemetry-producer
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: telemetry-producer
template:
metadata:
labels:
app: telemetry-producer
spec:
containers:
- name: telemetry-producer
image: telemetry-producer:latest
imagePullPolicy: IfNotPresent
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "redpanda-1:29092"
- name: KAFKA_TOPIC
value: "fleet.prod.telemetry.raw"
My Dockerfile
# base image
FROM flink:1.19-scala_2.12-java11
USER root
RUN wget -P /opt/flink/lib/ \
<https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar> && \
wget -P /opt/flink/lib/ \
<https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.19.0/flink-json-1.19.0.jar>Bhargav Vekariya
11/24/2025, 6:11 PMsource:
type: mysql
hostname: 10.3.4.168
name: ms_mysql_prod_sales_us_ca
port: 3306
username: flink_replication_admin
password: Uq^ZXXXXXXX
tables: |
ms.sales_flat_quote_item,
ms_canada.sales_flat_quote_item
server-id: 8000-8500
server-time-zone: UTC
scan.newly-added-table.enabled: true
schema-change.enabled: true
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:<mysql://10.231.XX.XX:9030>
load-url: 10.231.XX.XX:8030
username: root
password: dpBjIXXXXXX
sink.buffer-flush.interval-ms: 2000
sink.buffer-flush.max-bytes: 67108864
table.create.properties.replication_num: 1
sink.at-least-once.use-transaction-stream-load: false
sink.io.thread-count: 6
sink.label-prefix: flink_sync_custom
route:
- source-table: ms.\.*
sink-table: ms.mage_us_<>
replace-symbol: <>
description: route all tables in ms to ms
- source-table: ms_canada.\.*
sink-table: ms.mage_ca_<>
replace-symbol: <>
description: route all tables in ms_canada to ms
pipeline:
parallelism: 4
name: Sync MySQL Sales US/CA DWH Tables to StarRocks
execution.checkpointing.interval: 30000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 300000
execution.checkpointing.min-pause: 5000
state.backend.changelog.enabled: trueAshish Marottickal Gopi
11/24/2025, 8:06 PMprocessBroadcastElement function
Question: Is Broadcasting the state the right approach ? I'm specifically confused on following points:
1. Will the broadcasted state be consistent across the parallel instances of the non-broadcasted stream ?
2. Since I have the CDC kind of data in the broadcasted stream, I believe the state update that i make will be consistent across the parallel tasks of the non-broadcasted stream ?
3. Should I change the parallelism of the broadcast stream to 1 ?
4. Or does it seems good to just use a KeyedCoProcessJoin for this ?Eddy Agossou
11/25/2025, 11:51 AMBen Mali
11/27/2025, 8:05 AMFlinkDeployment , defining spec.jobManager.replicas works as expected, but when I'm trying to specify spec.taskManager.replicas it seems to be ignored. Does anyone know how can I manually configure the number of task manager pods? Should I configure it using HPA?Philipp
11/27/2025, 10:12 AMHiếu trịnh
11/28/2025, 3:32 AMH.P. Grahsl
12/01/2025, 10:41 AMJon Slusher
12/02/2025, 6:52 PMAleksei Perminov
12/03/2025, 3:21 PM2025-11-22 10:53:52.155 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}, current pending count: 2.
2025-11-22 10:53:52.156 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.e.ExternalResourceUtils - Enabled external resources: []
2025-11-22 10:53:52.157 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.configuration.Configuration - Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.taskmanager.service-account'
2025-11-22 10:53:52.160 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Creating new TaskManager pod with name taskmanager-3-2 and resource <16384,12.0>.
2025-11-22 10:53:52.260 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:35310] failed with null
2025-11-22 10:53:52.272 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Pod taskmanager-3-2 is created.
2025-11-22 10:53:52.281 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Received new TaskManager pod: taskmanager-3-2
2025-11-22 10:53:52.282 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requested worker taskmanager-3-2 with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}.
2025-11-22 10:53:52.399 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.ReliableDeliverySupervisor - Association with remote system [<pekko.tcp://flink@10.0.0.180:6122>] has failed, address is now gated for [50] ms. Reason: [Disassociated]
2025-11-22 10:54:07.260 [flink-pekko.actor.default-dispatcher-19] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:34164] failed with null
2025-11-22 10:54:15.283 [flink-pekko.actor.default-dispatcher-15] INFO o.a.f.r.r.a.ActiveResourceManager - Registering TaskManager with ResourceID taskmanager-3-2 (<pekko.tcp://flink@10.0.0.181:6122/user/rpc/taskmanager_0>) at ResourceManager
2025-11-22 10:54:15.291 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.s.FineGrainedSlotManager - Registering task executor taskmanager-3-2 under 12848c284ec438712332f1fd3b76a28a at the slot manager.
2025-11-22 10:54:15.292 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Worker taskmanager-3-2 is registered.
2025-11-22 10:54:15.300 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Closing TaskExecutor connection taskmanager-3-2 because: Failed to send initial slot report to ResourceManager. java.lang.IllegalStateException: Cannot decrease, no worker of spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=25}.
Basically, in FlinkDeployment the taskmanager.numberOfTaskSlots is set to 24, but for some reason the slot report shows 25 slots in the active Task Manager, which leads to the mentioned exception.
It doesn't matter whether I set 24 or 32 numberOfTaskSlots, the issue will be the same - +1 slot in slot report and exception.
This happens for about 5 minutes and then suddenly the job starts working fine.
We use 1.12.1 Flink K8S Operator version and Flink 2.0.0. However this issue rarely appears on 1.20.1 as well.
Thanks in advance.morti
12/03/2025, 3:50 PMprogramArgs query parameter is deleted, No program arguments (e.g, --batchSize 5000) are sent to the job.
I'm currently using flink-2.1.1-java21 image. There is also related stackoverflow question:
https://stackoverflow.com/questions/79702619/how-do-i-pass-program-arguments-from-apache-flink-2-0-0-web-gui-to-my-job-proper
It works with rest api mentioned in stackoverflow question but as I said, program arguments are not sent to job with webui.
Any help or Tip, would be appreciated!Jon Slusher
12/05/2025, 9:18 PMRion Williams
12/06/2025, 3:59 AMGeorge Leonard
12/07/2025, 10:42 AM# CDC Sources
CREATE OR REPLACE TABLE postgres_catalog.demog.accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,address STRING
,accounts STRING
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
CREATE OR REPLACE TABLE postgres_catalog.demog.transactions (
_id BIGINT NOT NULL
,eventid VARCHAR(36) NOT NULL
,transactionid VARCHAR(36) NOT NULL
,eventtime VARCHAR(30)
,direction VARCHAR(8)
,eventtype VARCHAR(10)
,creationdate VARCHAR(20)
,accountholdernationalid VARCHAR(16)
,accountholderaccount STRING
,counterpartynationalid VARCHAR(16)
,counterpartyaccount STRING
,tenantid VARCHAR(8)
,fromid VARCHAR(8)
,accountagentid VARCHAR(8)
,fromfibranchid VARCHAR(6)
,accountnumber VARCHAR(16)
,toid VARCHAR(8)
,accountidcode VARCHAR(5)
,counterpartyagentid VARCHAR(8)
,tofibranchid VARCHAR(6)
,counterpartynumber VARCHAR(16)
,counterpartyidcode VARCHAR(5)
,amount STRING
,msgtype VARCHAR(6)
,settlementclearingsystemcode VARCHAR(5)
,paymentclearingsystemreference VARCHAR(12)
,requestexecutiondate VARCHAR(10)
,settlementdate VARCHAR(10)
,destinationcountry VARCHAR(30)
,localinstrument VARCHAR(2)
,msgstatus VARCHAR(12)
,paymentmethod VARCHAR(4)
,settlementmethod VARCHAR(4)
,transactiontype VARCHAR(2)
,verificationresult VARCHAR(4)
,numberoftransactions INT
,schemaversion INT
,usercode VARCHAR(4)
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'transactions'
,'slot.name' = 'transactions0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
and the create table as inserts.
SET 'execution.checkpointing.interval' = '10s';
SET 'pipeline.name' = 'Persist into Paimon: accountholders table';
CREATE OR REPLACE TABLE c_paimon.finflow.accountholders WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
SELECT * FROM postgres_catalog.demog.accountholders;
SET 'pipeline.name' = 'Persist into Paimon: transactions table';
CREATE OR REPLACE TABLE c_paimon.finflow.transactions WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
SELECT * FROM postgres_catalog.demog.transactions;Manish Jain
12/09/2025, 9:43 AM<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.4.0-1.20</version>
</dependency>
This version of kafka client has 2 vulnerabilities. These vulnerabilities are only fixed in v3.9.1 of kafka clients.
We are wondering how to handle this? Is it recommended to exclude the kafka-client from the connector and then override it with the new version of kafka-client.
We are afraid that it might not be as easy as it sounds. 🙂
Is running the test cases for Flink sufficient to verify that the new version is flink compatible. Or are there other things we'd keep in my while doing this upgrade.
PS: In the longer run, we might upgrade to Flink 2.0, but right now that is not on our plans.Jon Slusher
12/09/2025, 4:36 PM<http://javax.net|javax.net>.debug=ssl in the connector so I can see what certificate it's trying to use?Urs Schoenenberger
12/10/2025, 2:24 PMAlmog Golbar
12/10/2025, 4:10 PMjob.autoscaler.enabled: "true"
job.autoscaler.memory.tuning.enabled: "true" #also tried false for dry run
job.autoscaler.metrics.window: "5m"
job.autoscaler.stabilization.interval: "30s"
job.autoscaler.scaling.enabled: "false"
What we've done:
1. Removed explicit memory configs (taskmanager.memory.process.size, taskmanager.memory.managed.size, taskmanager.memory.network.max)
2. Set taskManager.resource.memory: 12g
3. Verified metrics collection (6+ data points in ConfigMap autoscaler-<deployment-name>)
4. Confirmed autoscaler is active (ScalingExecutor running, metrics being collected)
Current situation:
• Memory: 12GB allocated, ~24% heap usage (~1.36GB used)
• Metrics: Being collected and stored in ConfigMap
• No recommendations: Not appearing in operator logs, Kubernetes events, or FlinkDeployment status/annotations
What we've checked:
• Operator logs show autoscaler activity but no memory tuning recommendations
• No errors or warnings related to memory tuning
• Metrics window is filling (5+ minutes of data collected)
• Job is processing data (not idle)
Has anyone seen memory tuning recommendations with this operator version? Are there additional requirements or known issues? Any guidance on where recommendations should appear or how to troubleshoot?