George Leonard
12/12/2025, 2:48 PMMarco Villalobos
12/12/2025, 7:05 PMHristo Yordanov
12/12/2025, 9:37 PMBrad Murry
12/15/2025, 3:45 PMRoyston
12/16/2025, 12:36 PMTiago Pereira
12/16/2025, 1:48 PMTiago Pereira
12/16/2025, 1:48 PMTiago Pereira
12/16/2025, 1:49 PMTiago Pereira
12/16/2025, 1:50 PMTiago Pereira
12/16/2025, 1:50 PMTiago Pereira
12/16/2025, 1:52 PMJon Slusher
12/18/2025, 5:47 PMReadWriteMany , which is not supported for EBS volumes. Can anyone point me in the right direction? I see that EFS might be an option, but I'm curious what the recommended options are for configuring savepoint/checkpoint and ha volumes for Flink jobManagers in EKS. I'll put my current configuration in a threadBruno Cancelinha
12/18/2025, 6:28 PMalert_definitions) and the other one represents all users’ status (user_status). Because these are continuously updating tables, I used postgres-cdc connector like so:
CREATE TABLE user_status_binlog (
id STRING NOT NULL,
user_id STRING,
account_id STRING,
status STRING,
deleted BOOLEAN,
inactive BOOLEAN,
queue_name STRING,
team_id STRING,
updated_at TIMESTAMP(3),
PRIMARY KEY (id) NOT ENFORCED,
WATERMARK FOR updated_at AS updated_at
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgresql',
'port' = '5432',
'username' = 'admin',
'password' = 'admin',
'schema-name' = 'public',
'database-name' = 'user-status',
'table-name' = 'live_user_status',
'slot.name' = 'flink',
'debezium.plugin.name' = 'pgoutput',
'scan.startup.mode' = 'initial',
'changelog-mode' = 'upsert'
);
CREATE TABLE alert_definitions(
alert_id STRING NOT NULL,
metric_id STRING,
account_id STRING,
filters STRING,
min_value NUMERIC,
max_value NUMERIC,
filter_status AS JSON_QUERY(filters, '$.status' RETURNING ARRAY<STRING>),
filter_ring_groups AS JSON_QUERY(filters, '$.ring_groups' RETURNING ARRAY<STRING>),
filter_team_ids AS JSON_QUERY(filters, '$.team_ids' RETURNING ARRAY<STRING>),
created_at TIMESTAMP(6),
PRIMARY KEY (alert_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'postgresql',
'port' = '5432',
'username' = 'admin',
'password' = 'admin',
'schema-name' = 'public',
'database-name' = 'live_sentinel_db',
'table-name' = 'alert_definition',
'slot.name' = 'alert_definition',
'debezium.plugin.name' = 'pgoutput',
'scan.startup.mode' = 'initial',
'changelog-mode' = 'upsert'
);
Every time a new user changes their status or a new alert is created, I want the query to run and send the results to a kafka topic. Because my query is an updating table, I created a Kafka sink with connector upsert-kafka (although the behaviour I’m looking for is more akin to an appending-only table).
My query, which counts the number of users logged in, is as follows:
CREATE TEMPORARY VIEW alert_user_counts AS
SELECT
alert_id,
account_id,
active_user_count,
min_value,
max_value
FROM
(
SELECT alert_id, account_id, min_value, max_value, filter_ring_groups, filter_status, filter_team_ids
FROM alert_definitions
WHERE account_id IS NOT NULL
AND metric_id = 'count-users-logged-in'
) alerts,
LATERAL (
SELECT
COUNT(DISTINCT user_id) AS active_user_count
FROM user_status_binlog AS us
WHERE
us.account_id = alerts.account_id
AND us.status NOT IN ('offline', 'hidden')
AND us.deleted = FALSE
AND us.inactive = FALSE
AND (COALESCE(CARDINALITY(alerts.filter_status), 0) = 0 OR ARRAY_CONTAINS(alerts.filter_status, us.status))
);
INSERT INTO notification_sink
SELECT
counts.account_id,
counts.alert_id,
counts.active_user_count as `value`,
CASE
WHEN (counts.min_value IS NULL OR counts.active_user_count >= counts.min_value)
AND (counts.max_value IS NULL OR counts.active_user_count <= counts.max_value)
THEN 'VIOLATION'
ELSE 'NOMINAL'
END AS `type`,
CURRENT_TIMESTAMP as event_timestamp
FROM alert_user_counts counts
Although the idea behind this job seems pretty simple, I keep finding issues with my implementation. I needed to use a LATERAL JOIN because it was the only way not to get multiple values for the same alert.
Currently it’s working fine except when the alert_definition table is updated. If a new alert is created, then a lot of messages are sent to kafka.
For example: say the account has 3 users logged in. If a new alert is created for that account, we get 3 messages on the kafka topic, like so:
{"account_id":"account-id","alert_id":"10","value":1,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.991"}
{"account_id":"account-id","alert_id":"10","value":2,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.992"}
{"account_id":"account-id","alert_id":"10","value":3,"type":"VIOLATION","event_timestamp":"2025-12-18 18:08:30.993"}
I don’t really understand this ‘counting’ behaviour. It’s as if the user_status table was an appending only table and this new alert is joining with multiple past versions. Can someone please explain to me why this behaviour is happening?
P.S. You might realize that this implementation doesn’t really work when no users are logged in (In which case it should return a 0 but Flink SQL doesn’t return any lines). I’m aware of that, but right now it’s not where I’m focusing on.Utkarsh Vashishtha
12/18/2025, 9:02 PMsyepes
12/19/2025, 1:13 PMDominik
12/23/2025, 8:52 AM徐启文
12/24/2025, 2:08 AMsource:
type: mysql
scan.startup.mode: earliest-offset # Start from earliest offset
scan.startup.mode: latest-offset # Start from latest offset
scan.startup.mode: specific-offset # Start from specific offset
scan.startup.mode: timestamp # Start from timestamp
scan.startup.mode: snapshot # Read snapshot only
scan.startup.specific-offset.file: 'mysql-bin.000003' # Binlog filename under specific offset startup mode
scan.startup.specific-offset.pos: 4 # Binlog position under specific offset mode
scan.startup.specific-offset.gtid-set: 24DA167-... # GTID set under specific offset startup mode
scan.startup.timestamp-millis: 1667232000000 # Timestamp under timestamp startup mode
# ...
val savepointPath = Option(
params.get("s",
params.get("fromSavepoint", null))
)
val isRestoring = savepointPath.isDefined
if (isRestoring) {
println(s"Restoring from: ${savepointPath.get}")
println("Using scan.startup.mode: latest-offset")
} else {
println("Starting fresh job")
println("Using scan.startup.mode: initial")
}
val scanMode = if (isRestoring) "latest-offset" else "initial"
Is it necessary for me to judge the parameter like this, and what should be the correct way to write it? Looking forward to your reply.George Leonard
12/24/2025, 6:50 AMAnatoliy Samsonov
12/24/2025, 7:46 AMFINISHED) after reading the current snapshot.
How can I read table A in streaming (unbounded) mode, so the job keeps running and picks up new data instead of finishing?
Context:
• Iceberg connector
• Flink Table / SQL API
What is the correct way to enable streaming reads?George Leonard
12/24/2025, 9:27 AMSET 'execution.runtime-mode' = 'streaming';
SET 'execution.planner.type' = 'streaming';George Leonard
12/24/2025, 9:28 AMCREATE OR REPLACE TABLE accountholders (
_id BIGINT NOT NULL
,nationalid VARCHAR(16) NOT NULL
,firstname VARCHAR(100)
,lastname VARCHAR(100)
,dob VARCHAR(10)
,gender VARCHAR(10)
,children INT
,address STRING
,accounts STRING
,emailaddress VARCHAR(100)
,mobilephonenumber VARCHAR(20)
,created_at TIMESTAMP_LTZ(3)
,WATERMARK FOR created_at AS created_at - INTERVAL '15' SECOND
,PRIMARY KEY (_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc'
,'hostname' = 'postgrescdc'
,'port' = '5432'
,'username' = 'dbadmin'
,'password' = 'dbpassword'
,'database-name' = 'demog'
,'schema-name' = 'public'
,'table-name' = 'accountholders'
,'slot.name' = 'accountholders_pyflink' -- Can't include capital letters
,'scan.incremental.snapshot.enabled' = 'true'
,'scan.startup.mode' = 'initial'
,'decoding.plugin.name' = 'pgoutput'
,'scan.incremental.snapshot.chunk.size' = '4096' -- Explicitly set chunk size
,'scan.snapshot.fetch.size' = '512' -- Add fetch size
,'connect.timeout' = '30s' -- Add connection timeout
);Ben Amiel
12/24/2025, 9:37 AMAnatoliy Samsonov
12/24/2025, 2:11 PMGeorge Leonard
12/24/2025, 2:14 PMAnatoliy Samsonov
12/24/2025, 2:28 PMAnatoliy Samsonov
12/24/2025, 2:29 PMsyepes
12/28/2025, 11:52 PMFlinkDeployment , and everything is working as expected. Now, I want to migrate some of these jobs to be deployed with the FlinkSessionJob so that they can run under the same JobManager.
The main challenge is that each job requires its own set of environment variables, but I have not been able to figure out how to configure the "podTemplate.podTemplate.containers.env" when using FlinkSessionJob CRD's
Does anyone know if it is currently supported to use the podTemplate, specifically setting environment variables, when deploying a FlinkSessionJob?徐科
12/29/2025, 6:19 AM徐科
12/29/2025, 11:18 AMBen Amiel
12/30/2025, 5:50 PM