Jon Slusher
12/05/2025, 9:18 PMRion Williams
12/06/2025, 3:59 AMGeorge Leonard
12/07/2025, 10:42 AM# CDC Sources
CREATE OR REPLACE TABLE postgres_catalog.demog.accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,address STRING
,accounts STRING
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
CREATE OR REPLACE TABLE postgres_catalog.demog.transactions (
_id BIGINT NOT NULL
,eventid VARCHAR(36) NOT NULL
,transactionid VARCHAR(36) NOT NULL
,eventtime VARCHAR(30)
,direction VARCHAR(8)
,eventtype VARCHAR(10)
,creationdate VARCHAR(20)
,accountholdernationalid VARCHAR(16)
,accountholderaccount STRING
,counterpartynationalid VARCHAR(16)
,counterpartyaccount STRING
,tenantid VARCHAR(8)
,fromid VARCHAR(8)
,accountagentid VARCHAR(8)
,fromfibranchid VARCHAR(6)
,accountnumber VARCHAR(16)
,toid VARCHAR(8)
,accountidcode VARCHAR(5)
,counterpartyagentid VARCHAR(8)
,tofibranchid VARCHAR(6)
,counterpartynumber VARCHAR(16)
,counterpartyidcode VARCHAR(5)
,amount STRING
,msgtype VARCHAR(6)
,settlementclearingsystemcode VARCHAR(5)
,paymentclearingsystemreference VARCHAR(12)
,requestexecutiondate VARCHAR(10)
,settlementdate VARCHAR(10)
,destinationcountry VARCHAR(30)
,localinstrument VARCHAR(2)
,msgstatus VARCHAR(12)
,paymentmethod VARCHAR(4)
,settlementmethod VARCHAR(4)
,transactiontype VARCHAR(2)
,verificationresult VARCHAR(4)
,numberoftransactions INT
,schemaversion INT
,usercode VARCHAR(4)
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'transactions'
,'slot.name' = 'transactions0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
and the create table as inserts.
SET 'execution.checkpointing.interval' = '10s';
SET 'pipeline.name' = 'Persist into Paimon: accountholders table';
CREATE OR REPLACE TABLE c_paimon.finflow.accountholders WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
SELECT * FROM postgres_catalog.demog.accountholders;
SET 'pipeline.name' = 'Persist into Paimon: transactions table';
CREATE OR REPLACE TABLE c_paimon.finflow.transactions WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
SELECT * FROM postgres_catalog.demog.transactions;Manish Jain
12/09/2025, 9:43 AM<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.4.0-1.20</version>
</dependency>
This version of kafka client has 2 vulnerabilities. These vulnerabilities are only fixed in v3.9.1 of kafka clients.
We are wondering how to handle this? Is it recommended to exclude the kafka-client from the connector and then override it with the new version of kafka-client.
We are afraid that it might not be as easy as it sounds. 🙂
Is running the test cases for Flink sufficient to verify that the new version is flink compatible. Or are there other things we'd keep in my while doing this upgrade.
PS: In the longer run, we might upgrade to Flink 2.0, but right now that is not on our plans.Jon Slusher
12/09/2025, 4:36 PM<http://javax.net|javax.net>.debug=ssl in the connector so I can see what certificate it's trying to use?Urs Schoenenberger
12/10/2025, 2:24 PMAlmog Golbar
12/10/2025, 4:10 PMjob.autoscaler.enabled: "true"
job.autoscaler.memory.tuning.enabled: "true" #also tried false for dry run
job.autoscaler.metrics.window: "5m"
job.autoscaler.stabilization.interval: "30s"
job.autoscaler.scaling.enabled: "false"
What we've done:
1. Removed explicit memory configs (taskmanager.memory.process.size, taskmanager.memory.managed.size, taskmanager.memory.network.max)
2. Set taskManager.resource.memory: 12g
3. Verified metrics collection (6+ data points in ConfigMap autoscaler-<deployment-name>)
4. Confirmed autoscaler is active (ScalingExecutor running, metrics being collected)
Current situation:
• Memory: 12GB allocated, ~24% heap usage (~1.36GB used)
• Metrics: Being collected and stored in ConfigMap
• No recommendations: Not appearing in operator logs, Kubernetes events, or FlinkDeployment status/annotations
What we've checked:
• Operator logs show autoscaler activity but no memory tuning recommendations
• No errors or warnings related to memory tuning
• Metrics window is filling (5+ minutes of data collected)
• Job is processing data (not idle)
Has anyone seen memory tuning recommendations with this operator version? Are there additional requirements or known issues? Any guidance on where recommendations should appear or how to troubleshoot?Jon Slusher
12/10/2025, 7:15 PMkafka source connector (using PyFlink) to our Kafka cluster using SASL_SSL and SCRAM-SHA-512.
• Can anyone at least confirm that this should be possible?
• I have some DEBUG logs to share that to me definitely point to the taskManager being the culprit. I see in the logs the jobManager is definitely connecting successfully to my Kafka brokers because it can see the topic I and partitions want to source. A taskManager is spawned shortly afterwards and that's what seems to file.
• I'm using Flink 1.20.3, and the Flink operator in Kubernetes to manage the job
• I'll post the configuration and a link to a condensed and redacted Gist of the DEBUG logs I captured for my connector in a threadTaranpreet Kaur
12/11/2025, 2:57 AMGeorge Leonard
12/11/2025, 5:57 PM# CDC Sources
CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,accounts STRING
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
flink catalog create
CREATE CATALOG c_iceberg WITH (
'type'='iceberg'
,'catalog-type'='rest'
,'uri'='<http://polaris:8181/api/catalog>'
,'warehouse'='icebergcat'
,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
,'credential'='root:s3cr3t'
,'scope'='PRINCIPAL_ROLE:ALL'
,'s3.endpoint'='<http://minio:900>'
,'s3.access-key-id'='mnadmin'
,'s3.secret-access-key'='mnpassword'
,'s3.path-style-access'='true'
);
USE CATALOG c_iceberg;
-- Create using below
CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
contents from .env file driving the compose.yaml
ROOT_CLIENT_ID=root
ROOT_CLIENT_SECRET=s3cr3t
CATALOG_NAME=icebergcat
POLARIS_REALM=findept
MINIO_ROOT_USER=mnadmin
MINIO_ROOT_PASSWORD=mnpassword
MINIO_ALIAS=minio
MINIO_ENDPOINT=http://minio:9000
MINIO_BUCKET=warehouse
AWS_ACCESS_KEY_ID=mnadmin
AWS_SECRET_ACCESS_KEY=mnpassword
AWS_REGION=za-south-1
AWS_DEFAULT_REGION=za-south-1
polaris-bootsrap service from docker-compose
polaris-bootstrap:
image: apache/polaris-admin-tool:latest
hostname: polaris-bootstrap
container_name: polaris-bootstrap
depends_on:
postgrescat:
condition: service_healthy
environment:
- POLARIS_PERSISTENCE_TYPE=relational-jdbc
- QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
- QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
- QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
- POLARIS_REALM=${POLARIS_REALM}
- ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
- ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
command:
- "bootstrap"
- "--realm=${POLARIS_REALM}"
- "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
polaris-setup service from docker-compose.
polaris-setup:
image: alpine/curl
hostname: polaris-setup
container_name: polaris-setup
depends_on:
polaris:
condition: service_healthy
environment:
- CLIENT_ID=${ROOT_CLIENT_ID}
- CLIENT_SECRET=${ROOT_CLIENT_SECRET}
- CATALOG_NAME=${CATALOG_NAME}
- POLARIS_REALM=${POLARIS_REALM}
- MINIO_BUCKET=${MINIO_BUCKET}
- MINIO_ENDPOINT=${MINIO_ENDPOINT}
volumes:
- ./conf/polaris/:/polaris
entrypoint: "/bin/sh"
command:
- "-c"
- >-
chmod +x /polaris/create-catalog.sh;
chmod +x /polaris/obtain-token.sh;
source /polaris/obtain-token.sh $$POLARIS_REALM;
echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
export STORAGE_CONFIG_INFO='{"storageType":"S3",
"endpoint":"<http://localhost:9000>",
"endpointInternal":"'$$MINIO_ENDPOINT'",
"pathStyleAccess":true}';
export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
/polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
echo Extra grants...;
curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
-X PUT \
<http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
-d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
echo Polaris Setup Complete.;
output from fetch catalogs api call, verifying the catalog and full path
curl -X GET <http://localhost:8181/api/management/v1/catalogs> \
> -H "Authorization: Bearer ${TOKEN}" | jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 388 100 388 0 0 13756 0 --:--:-- --:--:-- --:--:-- 13857
{
"catalogs": [
{
"type": "INTERNAL",
"name": "icebergcat",
"properties": {
"default-base-location": "<s3://warehouse/iceberg>"
},
"createTimestamp": 1765474601004,
"lastUpdateTimestamp": 1765474601004,
"entityVersion": 1,
"storageConfigInfo": {
"endpoint": "<http://localhost:9000>",
"endpointInternal": "<http://minio:9000>",
"pathStyleAccess": true,
"storageType": "S3",
"allowedLocations": [
"<s3://warehouse/iceberg>"
]
}
}
]
}
ERROR
> CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
> 'file.format' = 'parquet'
> ,'compaction.min.file-num' = '2'
> ,'compaction.early-max.file-num' = '50'
> ,'snapshot.time-retained' = '1h'
> ,'snapshot.num-retained.min' = '5'
> ,'snapshot.num-retained.max' = '20'
> ,'table.exec.sink.upsert-materialize'= 'NONE'
> ) AS
> SELECT * FROM c_cdcsource.demog.accountholders;
[ERROR] Could not execute SQL statement. Reason:
java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1164/0x0000000801826a00 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
docker images
arm64v8/flink:1.20.2-scala_2.12-java17 79a54781f5d0 789MB 0B
georgelza/apacheflink-base-1.20.2-scala_2.12-java17:1.0.2 dd8585afe66c 6.84GB 0B U
Flink container build / Dockerfile, as ... error search is saying version incompatibility...
FROM arm64v8/flink:1.20.2-scala_2.12-java17
SHELL ["/bin/bash", "-c"]
ENV FLINK_HOME=/opt/flink
ENV HIVE_HOME=$FLINK_HOME/conf/
WORKDIR $FLINK_HOME
RUN echo "--> Setup Directory Structure" && \
mkdir -p /opt/flink/conf/ && \
mkdir -p /opt/flink/checkpoints && \
mkdir -p /opt/flink/rocksdb
RUN echo "--> Install JARs: Flink's S3 plugin" && \
mkdir -p ./plugins/s3-fs-hadoop && \
mv ./opt/flink-s3-fs-hadoop-1.20.2.jar ./plugins/s3-fs-hadoop/
RUN echo "--> Install Flink JARs: => Generic"
COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar
COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar
# CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
COPY stage/flink-sql-json-1.20.2.jar /opt/flink/lib/flink-sql-json-1.20.2.jar
COPY stage/flink-sql-parquet-1.20.2.jar /opt/flink/lib/flink-sql-parquet-1.20.2.jar
COPY stage/flink-json-1.20.2.jar /opt/flink/lib/flink-json-1.20.2.jar
COPY stage/flink-connector-jdbc-3.3.0-1.20.jar /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
RUN echo "--> Install Flink JARs: => Open Table Formats"
COPY stage/iceberg-flink-1.20-1.9.1.jar /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
COPY stage/iceberg-aws-bundle-1.9.1.jar /opt/flink/iceberg-aws-bundle-1.9.1.jar
RUN echo "--> Set Ownerships of /opt/flink" && \
chown -R flink:flink $FLINK_HOME
USER flink:flink
CMD ./bin/start-cluster.sh && sleep infinityGeorge Leonard
12/11/2025, 6:33 PMGeorge Leonard
12/12/2025, 9:55 AMFROM arm64v8/flink:1.20.1-scala_2.12-java17
SHELL ["/bin/bash", "-c"]
ENV FLINK_HOME=/opt/flink
ENV HIVE_HOME=$FLINK_HOME/conf/
WORKDIR $FLINK_HOME
RUN echo "--> Setup Directory Structure" && \
mkdir -p /opt/flink/conf/ && \
mkdir -p /opt/flink/checkpoints && \
mkdir -p /opt/flink/rocksdb
RUN echo "--> Install JARs: Flink's S3 plugin" && \
mkdir -p ./plugins/s3-fs-hadoop && \
mv ./opt/flink-s3-fs-hadoop-1.20.1.jar ./plugins/s3-fs-hadoop/
RUN echo "--> Install JARs: => Generic"
COPY stage/bundle-2.31.9.jar /opt/flink/lib/bundle-2.31.9.jar
COPY stage/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar /opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
COPY stage/jackson-core-2.20.1.jar /opt/flink/lib/jackson-core-2.20.1.jar
COPY stage/postgresql-42.7.6.jar /opt/flink/lib/postgresql-42.7.6.jar
# CDC Support <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.4/docs/connectors/flink-sources/overview/#supported-flink-versions>
COPY stage/flink-sql-connector-postgres-cdc-3.5.0.jar /opt/flink/lib/flink-sql-connector-postgres-cdc-3.5.0.jar
# Flink version specific
RUN echo "--> Install JARs: => Flink 1.20.1"
COPY stage/flink-sql-json-1.20.1.jar /opt/flink/lib/flink-sql-json-1.20.1.jar
COPY stage/flink-sql-parquet-1.20.1.jar /opt/flink/lib/flink-sql-parquet-1.20.1.jar
COPY stage/flink-json-1.20.1.jar /opt/flink/lib/flink-json-1.20.1.jar
COPY stage/flink-connector-jdbc-3.3.0-1.20.jar /opt/flink/lib/flink-connector-jdbc-3.3.0-1.20.jar
RUN echo "--> Install JARs: => Iceberg 1.9.1"
COPY stage/iceberg-flink-1.20-1.9.1.jar /opt/flink/lib/iceberg-flink-1.20-1.9.1.jar
COPY stage/iceberg-flink-runtime-1.20-1.9.1.jar /opt/flink/lib/iceberg-flink-runtime-1.20-1.9.1.jar
COPY stage/iceberg-aws-bundle-1.9.1.jar /opt/flink/iceberg-aws-bundle-1.9.1.jar
RUN echo "--> Set Ownerships of /opt/flink" && \
chown -R flink:flink $FLINK_HOME
USER flink:flink
CMD ./bin/start-cluster.sh && sleep infinity
below is the polaris bits from my docker-compose file, included as this creates the polaris catalog, uses the previously created minio warehouse/iceberg bucket
polaris:
image: apache/polaris:latest
hostname: polaris
container_name: polaris
depends_on:
postgrescat:
condition: service_healthy
minio:
condition: service_healthy
polaris-bootstrap:
condition: service_completed_successfully
ports:
# API port
- "8181:8181"
# Management port (metrics and health checks)
- "8182:8182"
# Optional, allows attaching a debugger to the Polaris JVM
- "5005:5005"
environment:
POLARIS_PERSISTENCE_TYPE: relational-jdbc
POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_RETRIES: 5
POLARIS_PERSISTENCE_RELATIONAL_JDBC_INITIAL_DELAY_IN_MS: 100
POLARIS_PERSISTENCE_RELATIONAL_JDBC_MAX_DURATION_IN_MS: 5000
QUARKUS_DATASOURCE_JDBC_URL: jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
QUARKUS_DATASOURCE_USERNAME: ${POSTGRES_CAT_USER}
QUARKUS_DATASOURCE_PASSWORD: ${POSTGRES_CAT_PASSWORD}
POLARIS_REALM_CONTEXT_REALMS: ${POLARIS_REALM}
QUARKUS_OTEL_SDK_DISABLED: true
polaris.features."ALLOW_INSECURE_STORAGE_TYPES": "true"
polaris.features."SUPPORTED_CATALOG_STORAGE_TYPES": "[\"FILE\",\"S3\"]"
polaris.readiness.ignore-severe-issues: "true"
# S3 endpoint for MinIO
AWS_REGION: ${AWS_REGION}
AWS_ACCESS_KEY_ID: ${MINIO_ROOT_USER}
AWS_SECRET_ACCESS_KEY: ${MINIO_ROOT_PASSWORD}
POLARIS_BOOTSTRAP_CREDENTIALS: ${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}
polaris.realm-context.realms: ${POLARIS_REALM}
quarkus.otel.sdk.disabled: "true"
volumes:
- ./data/polaris:/var/lib/polaris
- ./conf/polaris:/polaris
healthcheck:
test: ["CMD", "curl", "<http://localhost:8182/q/health>"]
interval: 5s
timeout: 10s
retries: 5
start_period: 30s
restart: unless-stopped
polaris-bootstrap:
image: apache/polaris-admin-tool:latest
hostname: polaris-bootstrap
container_name: polaris-bootstrap
depends_on:
postgrescat:
condition: service_healthy
environment:
- POLARIS_PERSISTENCE_TYPE=relational-jdbc
- QUARKUS_DATASOURCE_JDBC_URL=jdbc:postgresql://${POSTGRES_CAT_HOST}:${POSTGRES_CAT_PORT}/${POLARIS_REALM}
- QUARKUS_DATASOURCE_USERNAME=${POSTGRES_CAT_USER}
- QUARKUS_DATASOURCE_PASSWORD=${POSTGRES_CAT_PASSWORD}
- POLARIS_REALM=${POLARIS_REALM}
- ROOT_CLIENT_ID=${ROOT_CLIENT_ID}
- ROOT_CLIENT_SECRET=${ROOT_CLIENT_SECRET}
command:
- "bootstrap"
- "--realm=${POLARIS_REALM}"
- "--credential=${POLARIS_REALM},${ROOT_CLIENT_ID},${ROOT_CLIENT_SECRET}"
polaris-setup:
image: alpine/curl
hostname: polaris-setup
container_name: polaris-setup
depends_on:
polaris:
condition: service_healthy
environment:
- CLIENT_ID=${ROOT_CLIENT_ID}
- CLIENT_SECRET=${ROOT_CLIENT_SECRET}
- CATALOG_NAME=${CATALOG_NAME}
- POLARIS_REALM=${POLARIS_REALM}
- MINIO_BUCKET=${MINIO_BUCKET}
- MINIO_ENDPOINT=${MINIO_ENDPOINT}
volumes:
- ./conf/polaris/:/polaris
entrypoint: "/bin/sh"
command:
- "-c"
- >-
chmod +x /polaris/create-catalog.sh;
chmod +x /polaris/obtain-token.sh;
source /polaris/obtain-token.sh $$POLARIS_REALM;
echo Creating catalog: $$CATALOG_NAME in realm: $$POLARIS_REALM;
export STORAGE_CONFIG_INFO='{"storageType":"S3",
"endpoint":"<http://localhost:9000>",
"endpointInternal":"'$$MINIO_ENDPOINT'",
"pathStyleAccess":true}';
export STORAGE_LOCATION="s3://$$MINIO_BUCKET/iceberg";
/polaris/create-catalog.sh $$POLARIS_REALM $$CATALOG_NAME $$TOKEN;
echo Extra grants...;
curl -H "Authorization: Bearer $$TOKEN" -H 'Content-Type: application/json' \
-X PUT \
<http://polaris:8181/api/management/v1/catalogs/${CATALOG_NAME}/catalog-roles/catalog_admin/grants> \
-d '{"type":"catalog", "privilege":"CATALOG_MANAGE_CONTENT"}';
echo Polaris Setup Complete.;
c_iceberg catalog create
CREATE CATALOG c_iceberg WITH (
'type'='iceberg'
,'catalog-type'='rest'
,'uri'='<http://polaris:8181/api/catalog>'
,'warehouse'='icebergcat'
,'oauth2-server-uri'='<http://polaris:8181/api/catalog/v1/oauth/tokens>'
,'credential'='root:s3cr3t'
,'scope'='PRINCIPAL_ROLE:ALL'
,'s3.endpoint'='<http://minio:900>'
,'s3.access-key-id'='mnadmin'
,'s3.secret-access-key'='mnpassword'
,'s3.path-style-access'='true'
);
USE CATALOG c_iceberg;
-- Create using below
CREATE DATABASE IF NOT EXISTS c_iceberg.finflow;
flink sql being executed
CREATE OR REPLACE TABLE c_cdcsource.demog.accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,accounts STRING
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders0'
-- experimental feature: incremental snapshot (default off)
,'scan.incremental.snapshot.enabled' = 'true' -- experimental feature: incremental snapshot (default off)
,'scan.startup.mode' = 'initial' -- <https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/connectors/flink-sources/postgres-cdc/#startup-reading-position> ,'decoding.plugin.name' = 'pgoutput'
,'decoding.plugin.name' = 'pgoutput'
);
SET 'execution.checkpointing.interval' = '60s';
SET 'table.exec.sink.upsert-materialize' = 'NONE';
SET 'pipeline.name' = 'Persist into Iceberg: accountholders table';
CREATE OR REPLACE TABLE c_iceberg.finflow.accountholders WITH (
'file.format' = 'parquet'
,'compaction.min.file-num' = '2'
,'compaction.early-max.file-num' = '50'
,'snapshot.time-retained' = '1h'
,'snapshot.num-retained.min' = '5'
,'snapshot.num-retained.max' = '20'
,'table.exec.sink.upsert-materialize'= 'NONE'
) AS
SELECT * FROM c_cdcsource.demog.accountholders;
The above resulted in the below
[ERROR] Could not execute SQL statement. Reason:
org.apache.iceberg.exceptions.NoSuchNamespaceException: Namespace does not exist: finflow
After I got the above namespace problem I executed the following
curl -X POST <http://localhost:8181/api/catalog/v1/icebergcat/namespaces> \
-H "Authorization: Bearer ${TOKEN}" \
-H 'Content-Type: application/json' \
-d '{"namespace": ["finflow"], "properties": {"description": "Iceberg catalog database"}}' | jq
I then reran my above CTAS errors below that
[ERROR] Could not execute SQL statement. Reason:
java.lang.AbstractMethodError: Receiver class org.apache.iceberg.flink.FlinkCreateTableOptions$$Lambda$1155/0x0000000601838000 does not define or inherit an implementation of the resolved method 'abstract void generate(org.apache.iceberg.shaded.com.fasterxml.jackson.core.JsonGenerator)' of interface org.apache.iceberg.util.JsonUtil$ToJson.
Flink SQL>George Leonard
12/12/2025, 10:04 AMGeorge Leonard
12/12/2025, 10:08 AMFrancisco Morillo
12/12/2025, 10:26 AMAnooj S
12/12/2025, 1:31 PMGeorge Leonard
12/12/2025, 2:48 PMMarco Villalobos
12/12/2025, 7:05 PMHristo Yordanov
12/12/2025, 9:37 PMBrad Murry
12/15/2025, 3:45 PMRoyston
12/16/2025, 12:36 PMTiago Pereira
12/16/2025, 1:48 PMTiago Pereira
12/16/2025, 1:48 PMTiago Pereira
12/16/2025, 1:49 PMTiago Pereira
12/16/2025, 1:50 PMTiago Pereira
12/16/2025, 1:50 PMTiago Pereira
12/16/2025, 1:52 PMJon Slusher
12/18/2025, 5:47 PMReadWriteMany , which is not supported for EBS volumes. Can anyone point me in the right direction? I see that EFS might be an option, but I'm curious what the recommended options are for configuring savepoint/checkpoint and ha volumes for Flink jobManagers in EKS. I'll put my current configuration in a threadBruno Cancelinha
12/18/2025, 6:28 PMalert_definitions) and the other one represents all users’ status (user_status). Because these are continuously updating tables, I used postgres-cdc connector like so:
CREATE TABLE user_status_binlog (
id STRING NOT NULL,
user_id STRING,
account_id STRING,
status STRING,
deleted BOOLEAN,
inactive BOOLEAN,
queue_name STRING,
team_id STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED,
WATERMARK FOR updated_at AS updated_at
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgresql',
'port' = '5432',
'username' = 'admin',
'password' = 'admin',
'schema-name' = 'public',
'database-name' = 'user-status',
'table-name' = 'live_user_status',
'slot.name' = 'flink',
'debezium.plugin.name' = 'pgoutput',
'scan.startup.mode' = 'initial',
'changelog-mode' = 'upsert'
);
CREATE TABLE alert_definitions(
alert_id STRING NOT NULL,
metric_id STRING,
account_id STRING,
filters STRING,
min_value NUMERIC,
max_value NUMERIC,
filter_status AS JSON_QUERY(filters, '$.status' RETURNING ARRAY<STRING>),
filter_ring_groups AS JSON_QUERY(filters, '$.ring_groups' RETURNING ARRAY<STRING>),
filter_team_ids AS JSON_QUERY(filters, '$.team_ids' RETURNING ARRAY<STRING>),
created_at TIMESTAMP(6),
PRIMARY KEY (alert_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgresql',
'port' = '5432',
'username' = 'admin',
'password' = 'admin',
'schema-name' = 'public',
'database-name' = 'live_sentinel_db',
'table-name' = 'alert_definition',
'slot.name' = 'alert_definition',
'debezium.plugin.name' = 'pgoutput',
'scan.startup.mode' = 'initial',
'changelog-mode' = 'upsert'
);
Every time a new user changes their status or a new alert is created, I want the query to run and send the results to a kafka topic. Because my query is an updating table, I created a Kafka sink with connector upsert-kafka (although the behaviour I’m looking for is more akin to an appending-only table).
My query, which counts the number of users logged in, is as follows:
CREATE TEMPORARY VIEW alert_user_counts AS
SELECT
alert_id,
account_id,
active_user_count,
min_value,
max_value
FROM
(
SELECT alert_id, account_id, min_value, max_value, filter_ring_groups, filter_status, filter_team_ids
FROM alert_definitions
WHERE account_id IS NOT NULL
AND metric_id = 'count-users-logged-in'
) alerts,
LATERAL (
SELECT
COUNT(DISTINCT user_id) AS active_user_count
FROM user_status_binlog AS us
WHERE
us.account_id = alerts.account_id
AND us.status NOT IN ('offline', 'hidden')
AND us.deleted = FALSE
AND us.inactive = FALSE
AND (COALESCE(CARDINALITY(alerts.filter_status), 0) = 0 OR ARRAY_CONTAINS(alerts.filter_status, us.status))
);
INSERT INTO notification_sink
SELECT
counts.account_id,
counts.alert_id,
counts.active_user_count as `value`,
CASE
WHEN (counts.min_value IS NULL OR counts.active_user_count >= counts.min_value)
AND (counts.max_value IS NULL OR counts.active_user_count <= counts.max_value)
THEN 'VIOLATION'
ELSE 'NOMINAL'
END AS `type`,
CURRENT_TIMESTAMP as event_timestamp
FROM alert_user_counts counts
Although the idea behind this job seems pretty simple, I keep finding issues with my implementation. I needed to use a LATERAL JOIN because it was the only way not to get multiple values for the same alert.
Currently it’s working fine except when the alert_definition table is updated. If a new alert is created, then a lot of messages are sent to kafka.
For example: say the account has 3 users logged in. If a new alert is created for that account, we get 3 messages on the kafka topic, like so:
{"account_id":"account-id","alert_id":"10","value":1,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.991"}
{"account_id":"account-id","alert_id":"10","value":2,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.992"}
{"account_id":"account-id","alert_id":"10","value":3,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.993"}
I don’t really understand this ‘counting’ behaviour. It’s as if the user_status table was an appending only table and this new alert is joining with multiple past versions. Can someone please explain to me why this behaviour is happening?
P.S. You might realize that this implementation doesn’t really work when no users are logged in (In which case it should return a 0 but Flink SQL doesn’t return any lines). I’m aware of that, but right now it’s not where I’m focusing on.Utkarsh Vashishtha
12/18/2025, 9:02 PM