Vedanth Baliga
11/23/2025, 9:42 AM# ---------- Namespace ----------
apiVersion: v1
kind: Namespace
metadata:
name: telemetry
---
# ---------- Redpanda-1 ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-1
namespace: telemetry
spec:
selector:
app: redpanda-1
ports:
- name: kafka
port: 29092
targetPort: 29092
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-1
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-1
template:
metadata:
labels:
app: redpanda-1
spec:
containers:
- name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
args:
- redpanda
- start
- --smp
- "1"
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- "1"
- --kafka-addr
- <PLAINTEXT://0.0.0.0:29092>
- --advertise-kafka-addr
- <PLAINTEXT://redpanda-1:29092>
- --rpc-addr
- 0.0.0.0:33145
- --advertise-rpc-addr
- redpanda-1:33145
- --pandaproxy-addr
- <PLAINTEXT://0.0.0.0:28082>
- --advertise-pandaproxy-addr
- <PLAINTEXT://redpanda-1:28082>
ports:
- containerPort: 29092
- containerPort: 28082
- containerPort: 33145
volumeMounts:
- name: data
mountPath: /var/lib/redpanda
volumes:
- name: data
emptyDir: {}
---
# ---------- Redpanda-2 ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-2
namespace: telemetry
spec:
selector:
app: redpanda-2
ports:
- name: kafka
port: 29093
targetPort: 29093
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-2
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-2
template:
metadata:
labels:
app: redpanda-2
spec:
containers:
- name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
args:
- redpanda
- start
- --smp
- "1"
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- "2"
- --seeds
- redpanda-1:33145
- --kafka-addr
- <PLAINTEXT://0.0.0.0:29093>
- --advertise-kafka-addr
- <PLAINTEXT://redpanda-2:29093>
- --rpc-addr
- 0.0.0.0:33146
- --advertise-rpc-addr
- redpanda-2:33146
- --pandaproxy-addr
- <PLAINTEXT://0.0.0.0:28083>
- --advertise-pandaproxy-addr
- <PLAINTEXT://redpanda-2:28083>
ports:
- containerPort: 29093
- containerPort: 28083
- containerPort: 33146
volumeMounts:
- name: data
mountPath: /var/lib/redpanda
volumes:
- name: data
emptyDir: {}
---
# ---------- Redpanda Console ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-console
namespace: telemetry
spec:
selector:
app: redpanda-console
ports:
- name: http
port: 8080
targetPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-console
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-console
template:
metadata:
labels:
app: redpanda-console
spec:
containers:
- name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v2.2.4
command: ["/bin/sh", "-c"]
args:
- echo "$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda-1:29092", "redpanda-2:29093"]
schemaRegistry:
enabled: false
redpanda:
adminApi:
enabled: false
connect:
enabled: false
ports:
- containerPort: 8080
---
# ---------- Flink: JobManager ----------
apiVersion: v1
kind: Service
metadata:
name: jobmanager
namespace: telemetry
spec:
selector:
app: jobmanager
ports:
- name: rpc
port: 6123
targetPort: 6123
- name: ui
port: 8081
targetPort: 8081
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: jobmanager
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: jobmanager
template:
metadata:
labels:
app: jobmanager
spec:
containers:
- name: jobmanager
image: flink-sql-k8s:1.19
args: ["jobmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
jobmanager.bind-host: 0.0.0.0
jobmanager.rpc.port: 6123
rest.address: jobmanager
rest.bind-address: 0.0.0.0
rest.port: 8081
ports:
- containerPort: 6123
- containerPort: 8081
---
# ---------- Flink: TaskManager ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: taskmanager
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: taskmanager
template:
metadata:
labels:
app: taskmanager
spec:
containers:
- name: taskmanager
image: flink-sql-k8s:1.19
args: ["taskmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 20
taskmanager.bind-host: 0.0.0.0
ports:
- containerPort: 6121
- containerPort: 6122
---
# ---------- Flink: SQL Client ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: sql-client
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: sql-client
template:
metadata:
labels:
app: sql-client
spec:
containers:
- name: sql-client
image: flink-sql-k8s:1.19
command: ["bash", "-c", "sleep infinity"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
---
# ---------- ClickHouse ----------
apiVersion: v1
kind: Service
metadata:
name: clickhouse
namespace: telemetry
spec:
selector:
app: clickhouse
ports:
- name: http
port: 8123
targetPort: 8123
- name: native
port: 9000
targetPort: 9000
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: clickhouse
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: clickhouse
template:
metadata:
labels:
app: clickhouse
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:23.8
ports:
- containerPort: 8123
- containerPort: 9000
volumeMounts:
- name: data
mountPath: /var/lib/clickhouse
- name: logs
mountPath: /var/log/clickhouse-server
volumes:
- name: data
emptyDir: {}
- name: logs
emptyDir: {}
---
# ---------- Telemetry Producer (Python) ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: telemetry-producer
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: telemetry-producer
template:
metadata:
labels:
app: telemetry-producer
spec:
containers:
- name: telemetry-producer
image: telemetry-producer:latest
imagePullPolicy: IfNotPresent
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "redpanda-1:29092"
- name: KAFKA_TOPIC
value: "fleet.prod.telemetry.raw"
My Dockerfile
# base image
FROM flink:1.19-scala_2.12-java11
USER root
RUN wget -P /opt/flink/lib/ \
<https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar> && \
wget -P /opt/flink/lib/ \
<https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.19.0/flink-json-1.19.0.jar>Bhargav Vekariya
11/24/2025, 6:11 PMsource:
type: mysql
hostname: 10.3.4.168
name: ms_mysql_prod_sales_us_ca
port: 3306
username: flink_replication_admin
password: Uq^ZXXXXXXX
tables: |
ms.sales_flat_quote_item,
ms_canada.sales_flat_quote_item
server-id: 8000-8500
server-time-zone: UTC
scan.newly-added-table.enabled: true
schema-change.enabled: true
sink:
type: starrocks
name: StarRocks Sink
jdbc-url: jdbc:<mysql://10.231.XX.XX:9030>
load-url: 10.231.XX.XX:8030
username: root
password: dpBjIXXXXXX
sink.buffer-flush.interval-ms: 2000
sink.buffer-flush.max-bytes: 67108864
table.create.properties.replication_num: 1
sink.at-least-once.use-transaction-stream-load: false
sink.io.thread-count: 6
sink.label-prefix: flink_sync_custom
route:
- source-table: ms.\.*
sink-table: ms.mage_us_<>
replace-symbol: <>
description: route all tables in ms to ms
- source-table: ms_canada.\.*
sink-table: ms.mage_ca_<>
replace-symbol: <>
description: route all tables in ms_canada to ms
pipeline:
parallelism: 4
name: Sync MySQL Sales US/CA DWH Tables to StarRocks
execution.checkpointing.interval: 30000
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.timeout: 300000
execution.checkpointing.min-pause: 5000
state.backend.changelog.enabled: trueAshish Marottickal Gopi
11/24/2025, 8:06 PMprocessBroadcastElement function
Question: Is Broadcasting the state the right approach ? I'm specifically confused on following points:
1. Will the broadcasted state be consistent across the parallel instances of the non-broadcasted stream ?
2. Since I have the CDC kind of data in the broadcasted stream, I believe the state update that i make will be consistent across the parallel tasks of the non-broadcasted stream ?
3. Should I change the parallelism of the broadcast stream to 1 ?
4. Or does it seems good to just use a KeyedCoProcessJoin for this ?Eddy Agossou
11/25/2025, 11:51 AMBen Mali
11/27/2025, 8:05 AMFlinkDeployment , defining spec.jobManager.replicas works as expected, but when I'm trying to specify spec.taskManager.replicas it seems to be ignored. Does anyone know how can I manually configure the number of task manager pods? Should I configure it using HPA?Philipp
11/27/2025, 10:12 AMHiếu trịnh
11/28/2025, 3:32 AMH.P. Grahsl
12/01/2025, 10:41 AMJon Slusher
12/02/2025, 6:52 PMAleksei Perminov
12/03/2025, 3:21 PM2025-11-22 10:53:52.155 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requesting new worker with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}, current pending count: 2.
2025-11-22 10:53:52.156 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.e.ExternalResourceUtils - Enabled external resources: []
2025-11-22 10:53:52.157 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.configuration.Configuration - Config uses fallback configuration key 'kubernetes.service-account' instead of key 'kubernetes.taskmanager.service-account'
2025-11-22 10:53:52.160 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Creating new TaskManager pod with name taskmanager-3-2 and resource <16384,12.0>.
2025-11-22 10:53:52.260 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:35310] failed with null
2025-11-22 10:53:52.272 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Pod taskmanager-3-2 is created.
2025-11-22 10:53:52.281 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.k.KubernetesResourceManagerDriver - Received new TaskManager pod: taskmanager-3-2
2025-11-22 10:53:52.282 [flink-pekko.actor.default-dispatcher-4] INFO o.a.f.r.r.a.ActiveResourceManager - Requested worker taskmanager-3-2 with resource spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=24}.
2025-11-22 10:53:52.399 [flink-pekko.actor.default-dispatcher-4] WARN o.a.p.r.ReliableDeliverySupervisor - Association with remote system [<pekko.tcp://flink@10.0.0.180:6122>] has failed, address is now gated for [50] ms. Reason: [Disassociated]
2025-11-22 10:54:07.260 [flink-pekko.actor.default-dispatcher-19] WARN o.a.p.r.t.netty.NettyTransport - Remote connection to [/10.0.0.70:34164] failed with null
2025-11-22 10:54:15.283 [flink-pekko.actor.default-dispatcher-15] INFO o.a.f.r.r.a.ActiveResourceManager - Registering TaskManager with ResourceID taskmanager-3-2 (<pekko.tcp://flink@10.0.0.181:6122/user/rpc/taskmanager_0>) at ResourceManager
2025-11-22 10:54:15.291 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.s.FineGrainedSlotManager - Registering task executor taskmanager-3-2 under 12848c284ec438712332f1fd3b76a28a at the slot manager.
2025-11-22 10:54:15.292 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Worker taskmanager-3-2 is registered.
2025-11-22 10:54:15.300 [flink-pekko.actor.default-dispatcher-19] INFO o.a.f.r.r.a.ActiveResourceManager - Closing TaskExecutor connection taskmanager-3-2 because: Failed to send initial slot report to ResourceManager. java.lang.IllegalStateException: Cannot decrease, no worker of spec WorkerResourceSpec {cpuCores=12.0, taskHeapSize=4.912gb (5274756322 bytes), taskOffHeapSize=0 bytes, networkMemSize=755.200mb (791884607 bytes), managedMemSize=8.850gb (9502615519 bytes), numSlots=25}.
Basically, in FlinkDeployment the taskmanager.numberOfTaskSlots is set to 24, but for some reason the slot report shows 25 slots in the active Task Manager, which leads to the mentioned exception.
It doesn't matter whether I set 24 or 32 numberOfTaskSlots, the issue will be the same - +1 slot in slot report and exception.
This happens for about 5 minutes and then suddenly the job starts working fine.
We use 1.12.1 Flink K8S Operator version and Flink 2.0.0. However this issue rarely appears on 1.20.1 as well.
Thanks in advance.morti
12/03/2025, 3:50 PMprogramArgs query parameter is deleted, No program arguments (e.g, --batchSize 5000) are sent to the job.
I'm currently using flink-2.1.1-java21 image. There is also related stackoverflow question:
https://stackoverflow.com/questions/79702619/how-do-i-pass-program-arguments-from-apache-flink-2-0-0-web-gui-to-my-job-proper
It works with rest api mentioned in stackoverflow question but as I said, program arguments are not sent to job with webui.
Any help or Tip, would be appreciated!Jon Slusher
12/05/2025, 9:18 PMRion Williams
12/06/2025, 3:59 AMGeorge Leonard
12/07/2025, 10:42 AM# CDC Sources
CREATE OR REPLACE TABLE postgres_catalog.demog.accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,address STRING
,accounts STRING
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
CREATE OR REPLACE TABLE postgres_catalog.demog.transactions (
_id BIGINT NOT NULL
,eventid VARCHAR(36) NOT NULL
,transactionid VARCHAR(36) NOT NULL
,eventtime VARCHAR(30)
,direction VARCHAR(8)
,eventtype VARCHAR(10)
,creationdate VARCHAR(20)
,accountholdernationalid VARCHAR(16)
,accountholderaccount STRING
,counterpartynationalid VARCHAR(16)
,counterpartyaccount STRING
,tenantid VARCHAR(8)
,fromid VARCHAR(8)
,accountagentid VARCHAR(8)
,fromfibranchid VARCHAR(6)
,accountnumber VARCHAR(16)
,toid VARCHAR(8)
,accountidcode VARCHAR(5)
,counterpartyagentid VARCHAR(8)
,tofibranchid VARCHAR(6)
,counterpartynumber VARCHAR(16)
,counterpartyidcode VARCHAR(5)
,amount STRING
,msgtype VARCHAR(6)
,settlementclearingsystemcode VARCHAR(5)
,paymentclearingsystemreference VARCHAR(12)
,requestexecutiondate VARCHAR(10)
,settlementdate VARCHAR(10)
,destinationcountry VARCHAR(30)
,localinstrument VARCHAR(2)
,msgstatus VARCHAR(12)
,paymentmethod VARCHAR(4)
,settlementmethod VARCHAR(4)
,transactiontype VARCHAR(2)
,verificationresult VARCHAR(4)
,numberoftransactions INT
,schemaversion INT
,usercode VARCHAR(4)
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'transactions'
,'slot.name' = 'transactions0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
and the create table as inserts.
SET 'execution.checkpointing.interval' = '10s';
SET 'pipeline.name' = 'Persist into Paimon: accountholders table';
CREATE OR REPLACE TABLE c_paimon.finflow.accountholders WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
SELECT * FROM postgres_catalog.demog.accountholders;
SET 'pipeline.name' = 'Persist into Paimon: transactions table';
CREATE OR REPLACE TABLE c_paimon.finflow.transactions WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
SELECT * FROM postgres_catalog.demog.transactions;Manish Jain
12/09/2025, 9:43 AM<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.4.0-1.20</version>
</dependency>
This version of kafka client has 2 vulnerabilities. These vulnerabilities are only fixed in v3.9.1 of kafka clients.
We are wondering how to handle this? Is it recommended to exclude the kafka-client from the connector and then override it with the new version of kafka-client.
We are afraid that it might not be as easy as it sounds. 🙂
Is running the test cases for Flink sufficient to verify that the new version is flink compatible. Or are there other things we'd keep in my while doing this upgrade.
PS: In the longer run, we might upgrade to Flink 2.0, but right now that is not on our plans.Jon Slusher
12/09/2025, 4:36 PM<http://javax.net|javax.net>.debug=ssl in the connector so I can see what certificate it's trying to use?Urs Schoenenberger
12/10/2025, 2:24 PMAlmog Golbar
12/10/2025, 4:10 PMjob.autoscaler.enabled: "true"
job.autoscaler.memory.tuning.enabled: "true" #also tried false for dry run
job.autoscaler.metrics.window: "5m"
job.autoscaler.stabilization.interval: "30s"
job.autoscaler.scaling.enabled: "false"
What we've done:
1. Removed explicit memory configs (taskmanager.memory.process.size, taskmanager.memory.managed.size, taskmanager.memory.network.max)
2. Set taskManager.resource.memory: 12g
3. Verified metrics collection (6+ data points in ConfigMap autoscaler-<deployment-name>)
4. Confirmed autoscaler is active (ScalingExecutor running, metrics being collected)
Current situation:
• Memory: 12GB allocated, ~24% heap usage (~1.36GB used)
• Metrics: Being collected and stored in ConfigMap
• No recommendations: Not appearing in operator logs, Kubernetes events, or FlinkDeployment status/annotations
What we've checked:
• Operator logs show autoscaler activity but no memory tuning recommendations
• No errors or warnings related to memory tuning
• Metrics window is filling (5+ minutes of data collected)
• Job is processing data (not idle)
Has anyone seen memory tuning recommendations with this operator version? Are there additional requirements or known issues? Any guidance on where recommendations should appear or how to troubleshoot?Jon Slusher
12/10/2025, 7:15 PMkafka source connector (using PyFlink) to our Kafka cluster using SASL_SSL and SCRAM-SHA-512.
• Can anyone at least confirm that this should be possible?
• I have some DEBUG logs to share that to me definitely point to the taskManager being the culprit. I see in the logs the jobManager is definitely connecting successfully to my Kafka brokers because it can see the topic I and partitions want to source. A taskManager is spawned shortly afterwards and that's what seems to file.
• I'm using Flink 1.20.3, and the Flink operator in Kubernetes to manage the job
• I'll post the configuration and a link to a condensed and redacted Gist of the DEBUG logs I captured for my connector in a threadTaranpreet Kaur
12/11/2025, 2:57 AMGeorge Leonard
12/11/2025, 5:57 PM# CDC Sources
CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,accounts STRING
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
flink catalog create
CREATE CATALOG c_iceberg WITH (
'type'='iceberg'
,'catalog-type'='rest'
,'uri'='<http://polaris:8181/api/catalog>'
,'warehouse'='icebergcat'
,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
,'credential'='root:s3cr3t'
,'scope'='PRINCIPAL_ROLE:ALL'
,'s3.endpoint'='<http://minio:900>'
,'s3.access-key-id'='mnadmin'
,'s3.secret-access-key'='mnpassword'
,'s3.path-style-access'='true'
);
USE CATALOG c_iceberg;
-- Create using below
CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
contents from .env file driving the compose.yaml
ROOT_CLIENT_ID=root
ROOT_CLIENT_SECRET=s3cr3t
CATALOG_NAME=icebergcat
POLARIS_REALM=findept
MINIO_ROOT_USER=mnadmin
MINIO_ROOT_PASSWORD=mnpassword
MINIO_ALIAS=minio
MINIO_ENDPOINT=http://minio:9000
MINIO_BUCKET=warehouse
AWS_ACCESS_KEY_ID=mnadmin
AWS_SECRET_ACCESS_KEY=mnpassword
AWS_REGION=za-south-1
AWS_DEFAULT_REGION=za-south-1
polaris-bootsrap service from docker-compose
polaris-bootstrap:
image: apache/polaris-admin-tool:latest
hostname: polaris-bootstrap
container_name: polaris-bootstrap
depends_on:
postgrescat:
condition: service_healthy
environment:
- POLARIS_PERSISTENCE_TYPE=relational-jdbc
- QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
- QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
- QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
- POLARIS_REALM=${POLARIS_REALM}
- ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
- ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
command:
- "bootstrap"
- "--realm=${POLARIS_REALM}"
- "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
polaris-setup service from docker-compose.
polaris-setup:
image: alpine/curl
hostname: polaris-setup
container_name: polaris-setup
depends_on:
polaris:
condition: service_healthy
environment:
- CLIENT_ID=${ROOT_CLIENT_ID}
- CLIENT_SECRET=${ROOT_CLIENT_SECRET}
- CATALOG_NAME=${CATALOG_NAME}
- POLARIS_REALM=${POLARIS_REALM}
- MINIO_BUCKET=${MINIO_BUCKET}
- MINIO_ENDPOINT=${MINIO_ENDPOINT}
volumes:
- ./conf/polaris/:/polaris
entrypoint: "/bin/sh"
command:
- "-c"
- >-
chmod +x /polaris/create-catalog.sh;
chmod +x /polaris/obtain-token.sh;
source /polaris/obtain-token.sh $$POLARIS_REALM;
echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
export STORAGE_CONFIG_INFO='{"storageType":"S3",
"endpoint":"<http://localhost:9000>",
"endpointInternal":"'$$MINIO_ENDPOINT'",
"pathStyleAccess":true}';
export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
/polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
echo Extra grants...;
curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
-X PUT \
<http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
-d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
echo Polaris Setup Complete.;
output from fetch catalogs api call, verifying the catalog and full path
curl -X GET <http://localhost:8181/api/management/v1/catalogs> \
> -H "Authorization: Bearer ${TOKEN}" | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 388 100 388 0 0 13756 0 --:--:-- --:--:-- --:--:-- 13857
{
"catalogs": [
{
"type": "INTERNAL",
"name": "icebergcat",
"properties": {
"default-base-location": "<s3://warehouse/iceberg>"
},
"createTimestamp": 1765474601004,
"lastUpdateTimestamp": 1765474601004,
"entityVersion": 1,
"storageConfigInfo": {
"endpoint": "<http://localhost:9000>",
"endpointInternal": "<http://minio:9000>",
"pathStyleAccess": true,
"storageType": "S3",
"allowedLocations": [
"<s3://warehouse/iceberg>"
]
}
}
]
}
ERROR
> CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
> 'file.format' = 'parquet'
> ,'compaction.min.file-num' = '2'
> ,'compaction.early-max.file-num' = '50'
> ,'snapshot.time-retained' = '1h'
> ,'snapshot.num-retained.min' = '5'
> ,'snapshot.num-retained.max' = '20'
> ,'table.exec.sink.upsert-materialize'= 'NONE'
> ) AS
> SELECT * FROM c_cdcsource.demog.accountholders;
[ERROR] Could not execute SQL statement. Reason:
java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1164/0x0000000801826a00 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
docker images
arm64v8/flink:1.20.2-scala_2.12-java17 79a54781f5d0 789MB 0B
georgelza/apacheflink-base-1.20.2-scala_2.12-java17:1.0.2 dd8585afe66c 6.84GB 0B U
Flink container build / Dockerfile, as ... error search is saying version incompatibility...
FROM arm64v8/flink:1.20.2-scala_2.12-java17
SHELL ["/bin/bash", "-c"]
ENV FLINK_HOME=/opt/flink
ENV HIVE_HOME=$FLINK_HOME/conf/
WORKDIR $FLINK_HOME
RUN echo "--> Setup Directory Structure" && \
mkdir -p /opt/flink/conf/ && \
mkdir -p /opt/flink/checkpoints && \
mkdir -p /opt/flink/rocksdb
RUN echo "--> Install JARs: Flink's S3 plugin" && \
mkdir -p ./plugins/s3-fs-hadoop && \
mv ./opt/flink-s3-fs-hadoop-1.20.2.jar ./plugins/s3-fs-hadoop/
RUN echo "--> Install Flink JARs: => Generic"
COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar
COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar
# CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
COPY stage/flink-sql-json-1.20.2.jar /opt/flink/lib/flink-sql-json-1.20.2.jar
COPY stage/flink-sql-parquet-1.20.2.jar /opt/flink/lib/flink-sql-parquet-1.20.2.jar
COPY stage/flink-json-1.20.2.jar /opt/flink/lib/flink-json-1.20.2.jar
COPY stage/flink-connector-jdbc-3.3.0-1.20.jar /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
RUN echo "--> Install Flink JARs: => Open Table Formats"
COPY stage/iceberg-flink-1.20-1.9.1.jar /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
COPY stage/iceberg-aws-bundle-1.9.1.jar /opt/flink/iceberg-aws-bundle-1.9.1.jar
RUN echo "--> Set Ownerships of /opt/flink" && \
chown -R flink:flink $FLINK_HOME
USER flink:flink
CMD ./bin/start-cluster.sh && sleep infinityGeorge Leonard
12/11/2025, 6:33 PMGeorge Leonard
12/12/2025, 9:55 AMFROM arm64v8/flink:1.20.1-scala_2.12-java17
SHELL ["/bin/bash", "-c"]
ENV FLINK_HOME=/opt/flink
ENV HIVE_HOME=$FLINK_HOME/conf/
WORKDIR $FLINK_HOME
RUN echo "--> Setup Directory Structure" && \
mkdir -p /opt/flink/conf/ && \
mkdir -p /opt/flink/checkpoints && \
mkdir -p /opt/flink/rocksdb
RUN echo "--> Install JARs: Flink's S3 plugin" && \
mkdir -p ./plugins/s3-fs-hadoop && \
mv ./opt/flink-s3-fs-hadoop-1.20.1.jar ./plugins/s3-fs-hadoop/
RUN echo "--> Install JARs: => Generic"
COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar
COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
COPY stage/jackson-core-2.20.1.jar /opt/flink/lib/jackson-core-2.20.1.jar
COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar
# CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
# Flink version specific
RUN echo "--> Install JARs: => Flink 1.20.1"
COPY stage/flink-sql-json-1.20.1.jar /opt/flink/lib/flink-sql-json-1.20.1.jar
COPY stage/flink-sql-parquet-1.20.1.jar /opt/flink/lib/flink-sql-parquet-1.20.1.jar
COPY stage/flink-json-1.20.1.jar /opt/flink/lib/flink-json-1.20.1.jar
COPY stage/flink-connector-jdbc-3.3.0-1.20.jar /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
RUN echo "--> Install JARs: => Iceberg 1.9.1"
COPY stage/iceberg-flink-1.20-1.9.1.jar /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
COPY stage/iceberg-aws-bundle-1.9.1.jar /opt/flink/iceberg-aws-bundle-1.9.1.jar
RUN echo "--> Set Ownerships of /opt/flink" && \
chown -R flink:flink $FLINK_HOME
USER flink:flink
CMD ./bin/start-cluster.sh && sleep infinity
below is the polaris bits from my docker-compose file, included as this creates the polaris catalog, uses the previously created minio warehouse/iceberg bucket
polaris:
image: apache/polaris:latest
hostname: polaris
container_name: polaris
depends_on:
postgrescat:
condition: service_healthy
minio:
condition: service_healthy
polaris-bootstrap:
condition: service_completed_successfully
ports:
# API port
- "8181:8181"
# Management port (metrics and health checks)
- "8182:8182"
# Optional, allows attaching a debugger to the Polaris JVM
- "5005:5005"
environment:
POLARIS_PERSISTENCE_TYPE: relational-jdbc
POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES: 5
POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS: 100
POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DURATION_IN_MS: 5000
QUARKUS_DATASOURCE_JDBC_URL: jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
QUARKUS_DATASOURCE_USERNAME: ${POSTGRES_CAT_USER}
QUARKUS_DATASOURCE_PASSWORD: ${POSTGRES_CAT_PASSWORD}
POLARIS_REALM_CONTEXT_REALMS: ${POLARIS_REALM}
QUARKUS_OTEL_SDK_DISABLED: true
polaris.features."ALLOW_INSECURE_STORAGE_TYPES": "true"
polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES": "[\"FILE\",\"S3\"]"
polaris.readiness.ignore-severe-issues: "true"
# S3 endpoint for MinIO
AWS_REGION: ${AWS_REGION}
AWS_ACCESS_KEY_ID: ${MINIO_ROOT_USER}
AWS_SECRET_ACCESS_KEY: ${MINIO_ROOT_PASSWORD}
POLARIS_BOOTSTRAP_CREDENTIALS: ${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}
polaris.realm-context.realms: ${POLARIS_REALM}
quarkus.otel.sdk.disabled: "true"
volumes:
- ./data/polaris:/var/lib/polaris
- ./conf/polaris:/polaris
healthcheck:
test: ["CMD", "curl", "<http://localhost:8182/q/health>"]
interval: 5s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
polaris-bootstrap:
image: apache/polaris-admin-tool:latest
hostname: polaris-bootstrap
container_name: polaris-bootstrap
depends_on:
postgrescat:
condition: service_healthy
environment:
- POLARIS_PERSISTENCE_TYPE=relational-jdbc
- QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
- QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
- QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
- POLARIS_REALM=${POLARIS_REALM}
- ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
- ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
command:
- "bootstrap"
- "--realm=${POLARIS_REALM}"
- "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
polaris-setup:
image: alpine/curl
hostname: polaris-setup
container_name: polaris-setup
depends_on:
polaris:
condition: service_healthy
environment:
- CLIENT_ID=${ROOT_CLIENT_ID}
- CLIENT_SECRET=${ROOT_CLIENT_SECRET}
- CATALOG_NAME=${CATALOG_NAME}
- POLARIS_REALM=${POLARIS_REALM}
- MINIO_BUCKET=${MINIO_BUCKET}
- MINIO_ENDPOINT=${MINIO_ENDPOINT}
volumes:
- ./conf/polaris/:/polaris
entrypoint: "/bin/sh"
command:
- "-c"
- >-
chmod +x /polaris/create-catalog.sh;
chmod +x /polaris/obtain-token.sh;
source /polaris/obtain-token.sh $$POLARIS_REALM;
echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
export STORAGE_CONFIG_INFO='{"storageType":"S3",
"endpoint":"<http://localhost:9000>",
"endpointInternal":"'$$MINIO_ENDPOINT'",
"pathStyleAccess":true}';
export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
/polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
echo Extra grants...;
curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
-X PUT \
<http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
-d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
echo Polaris Setup Complete.;
c_iceberg catalog create
CREATE CATALOG c_iceberg WITH (
'type'='iceberg'
,'catalog-type'='rest'
,'uri'='<http://polaris:8181/api/catalog>'
,'warehouse'='icebergcat'
,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
,'credential'='root:s3cr3t'
,'scope'='PRINCIPAL_ROLE:ALL'
,'s3.endpoint'='<http://minio:900>'
,'s3.access-key-id'='mnadmin'
,'s3.secret-access-key'='mnpassword'
,'s3.path-style-access'='true'
);
USE CATALOG c_iceberg;
-- Create using below
CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
flink sql being executed
CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,accounts STRING
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
SET 'execution.checkpointing.interval' = '60s';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'pipeline.name' = 'Persist into Iceberg: accountholders table';
CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
SELECT * FROM c_cdcsource.demog.accountholders;
The above resulted in the below
[ERROR] Could not execute SQL statement. Reason:
org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: finflow
After I got the above namespace problem I executed the following
curl -X POST <http://localhost:8181/api/catalog/v1/icebergcat/namespaces> \
-H "Authorization: Bearer ${TOKEN}" \
-H 'Content-Type: application/json' \
-d '{"namespace": ["finflow"], "properties": {"description": "Iceberg catalog database"}}' | jq
I then reran my above CTAS errors below that
[ERROR] Could not execute SQL statement. Reason:
java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
Flink SQL>George Leonard
12/12/2025, 10:04 AMGeorge Leonard
12/12/2025, 10:08 AMFrancisco Morillo
12/12/2025, 10:26 AMAnooj S
12/12/2025, 1:31 PMGeorge Leonard
12/12/2025, 2:48 PMMarco Villalobos
12/12/2025, 7:05 PMHristo Yordanov
12/12/2025, 9:37 PM