Victor Babenko
10/09/2025, 8:22 PMRoyston
10/10/2025, 3:08 PMTrystan
10/10/2025, 5:19 PMEvent | Info | SCALINGREPORT | Scaling execution enabled, begin scaling vertices:
{ Vertex ID 5591cebca2a45b74833df01196d1a431 | Parallelism 74 -> 80 | Processing capacity 6671.45 -> 7131.00 | Target data rate 3889.81}
{ Vertex ID 162a29c968c94eac6fbdfc2f8ccc2080 | Parallelism 180 -> 128 | Processing capacity Infinity -> Infinity | Target data rate 50.67}
{ Vertex ID 009b60dbc7737a8a2e91e7b3c30d9949 | Parallelism 128 -> 169 | Processing capacity 70.68 -> 93.00 | Target data rate 50.67}
these autoscaling decisions don't make a lot of sense to me. why on earth is it scaling UP 009b?
here's the output log from right before:
ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.TRUE_PROCESSING_RATE.Average: 71.325
ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.SCALE_DOWN_RATE_THRESHOLD.Current: 176.0
ue1d.FlinkDeployment.AutoScaler.jobVertexID.009b60dbc7737a8a2e91e7b3c30d9949.SCALE_UP_RATE_THRESHOLD.Current: 56.0
is this a catchup buffer problem? maybe it thinks it would be unable to catch up within the desired window?Barak Ben-Nathan
10/12/2025, 7:35 AMState Processor API to generate both a keyed state and a broadcast state for the same operator?ilililil
10/15/2025, 4:19 AMfinal var stateFuture = state.asyncValue();
stateFuture.thenAccept(value -> {
ctx.timerService().registerEventTimeTimer(timestamp + someValue);
out.collect(foo(value));
});Elad
10/15/2025, 6:18 AMSergio Morales
10/15/2025, 12:01 PMRoyston
10/16/2025, 8:18 AMGeorge Leonard
10/16/2025, 11:46 AMFlink SQL> CREATE CATALOG kafka_catalog WITH (
> 'type' = 'hive',
> 'hive-conf-dir' = './conf/'
> );
[INFO] Execute statement succeeded.
Flink SQL> use catalog kafka_catalog;
>
[INFO] Execute statement succeeded.
Flink SQL> CREATE DATABASE IF NOT EXISTS kafka_catalog.inbound;
>
[INFO] Execute statement succeeded.
Flink SQL> CREATE DATABASE IF NOT EXISTS kafka_catalog.outbound;
>
[INFO] Execute statement succeeded.
Flink SQL> CREATE CATALOG postgres_catalog WITH (
> 'type' = 'hive',
> 'hive-conf-dir' = './conf/'
> );
[INFO] Execute statement succeeded.
Flink SQL> use catalog kafka_catalog;
[INFO] Execute statement succeeded.
Flink SQL> show databases;
+---------------+
| database name |
+---------------+
| default |
| inbound |
| outbound |
+---------------+
3 rows in set
Flink SQL> use catalog postgres_catalog;
[INFO] Execute statement succeeded.
Flink SQL> show databases;
+---------------+
| database name |
+---------------+
| default |
| inbound |
| outbound |
+---------------+
3 rows in set
Flink SQL> use catalog kafka_catalog;
[INFO] Execute statement succeeded.
Flink SQL> create database test;
[INFO] Execute statement succeeded.
Flink SQL> use catalog postgres_catalog;
[INFO] Execute statement succeeded.
Flink SQL> show databases;
+---------------+
| database name |
+---------------+
| default |
| inbound |
| outbound |
| test |
+---------------+vasanth loka
10/16/2025, 6:52 PMItamar Weiss
10/21/2025, 6:04 AMRoyston
10/21/2025, 6:13 AMVikas Patil
10/21/2025, 2:37 PM//ApplicationReconciler.java
} else if (requireHaMetadata && flinkService.atLeastOneCheckpoint(deployConfig)) {
// Last state deployment, explicitly set a dummy savepoint path to avoid accidental
// incorrect state restore in case the HA metadata is deleted by the user
deployConfig.set(SavepointConfigOptions.SAVEPOINT_PATH, LAST_STATE_DUMMY_SP_PATH);
status.getJobStatus().setUpgradeSavepointPath(LAST_STATE_DUMMY_SP_PATH);
} else {
deployConfig.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
• Flink, given any savepoint/checkpoint pointer, immediately resolves that path and fails fast on the dummy before any HA path is considered:
public static FsCompletedCheckpointStorageLocation resolveCheckpointPointer(String checkpointPointer) throws IOException {
final Path path;
try { path = new Path(checkpointPointer); } catch (Exception e) { /* invalid URI */ }
final FileSystem fs = path.getFileSystem();
final FileStatus status;
try { status = fs.getFileStatus(path); } catch (FileNotFoundException e) {
throw new FileNotFoundException("Cannot find checkpoint or savepoint file/directory '"
+ checkpointPointer + "' on file system '" + fs.getUri().getScheme() + "'.");
}
// ...
}
Why is this the case ? Does anyone have any context on how to solve this ?FAS
10/22/2025, 4:45 AMassume-role is handled by the Iceberg catalog.
Scenario
1. Account #1: Runs a Flink (1.19.1) job on an EKS cluster.
2. Account #2: Hosts Iceberg tables in an S3 bucket (<s3://account-2-bucket-iceberg/dbstore1/>) and manages metadata using the AWS Glue Catalog (awsAccount2Id).
3. Permissions:
◦ The Flink EKS pod in Account #1 has a Service Account configured with OIDC.
◦ This Service Account assumes a cross-account role (arn:aws:iam::awsAccount2Id:role/cross-account-role) in Account #2.
4. Verification:
◦ I have `exec`'d into the running Flink pod.
◦ From the pod, I can successfully use the AWS CLI to assume the cross-account role.
◦ After assuming the role, I can successfully list the Glue databases and tables in Account #2.
◦ This confirms the underlying EKS OIDC, IAM roles, and network access are all correctly configured.
The Challenge
In my Flink job, I first define the catalog for Account #2.
1. Create Catalog (Success) This SQL statement executes successfully, and the Flink logs confirm it: 2025-10-22 03:57:00,929 INFO ... - SQL statement executed successfully. sql=CREATE CATALOG \awsAccount2Id ...``
SQL
CREATE CATALOG `awsAccount2Id`
WITH (
'type' = 'iceberg',
'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
'warehouse' = '<s3://account-2-bucket-iceberg/dbstore1/>',
'client.assume-role.arn' = 'arn:aws:iam::awsAccount2Id:role/cross-account-role',
'glue.catalog-id' = 'awsAccount2Id',
'client.region' = 'us-east-1',
'client.credentials-provider' = 'software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider'
);
2. Select from Catalog (Failure) Immediately after the catalog is created, my Flink job executes the following SELECT query:
SQL
SELECT
....
FROM `awsAccount2Id`.`dbstore1`.table1
/*+ OPTIONS('streaming'='true', 'monitor-interval'='30s') */;
This query fails with a validation error:
2025-10-22 03:57:06,710 ERROR ... - Failed to execute SQL statement:
SELECT ...
FROM `awsAccount2Id`.`dbstore1`.table1 ...
;
org.apache.flink.table.api.ValidationException: SQL validation failed. From line 11, column 6 to line 11, column 59: Object 'dbstore1' not found within 'awsAccount2Id'
I also noticed that when Flink logs the list of available databases, it only shows databases from Account #1, not the cross-account ones from Account #2.
My Question
My expectation was that by defining client.assume-role.arn and glue.catalog-id in the CREATE CATALOG statement, any subsequent Flink SQL operations referencing the awsAccount2Id catalog (like my SELECT query) would automatically use those settings to assume the role and query the Glue catalog in Account #2.
Why is Flink reporting that the database dbstore1 is "not found," even though the catalog was created successfully and configured to assume a role that can see that database? i can see tables from this database when i manually assume-role using aws-cli from that pod.
It seems the SELECT query is not honoring the catalog's assume-role configuration and is somehow still querying the default Glue catalog in Account #1. Is this expected, or am I missing a configuration step for Flink to correctly use the assumed role for metadata discovery after the catalog is created?Jaya Ananthram
10/22/2025, 9:10 AMמייקי בר יעקב
10/22/2025, 11:18 PMElad
10/23/2025, 8:43 AMEric Huang
10/23/2025, 1:36 PM2025-10-23 14:05:32,327 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (21/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_20_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:05:32,327 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (90/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_89_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:05:32,327 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (59/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_58_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:05:32,327 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (17/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_16_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:05:32,400 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DelayableMessageProcess -> Sink: xiaoxiang_reach_system (1/1) (dcdc4daa8ced8ca9d2b8fc6c58e26129_0a53a086337bb3f8a33ad689643a92fc_0_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:10:28,717 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 191 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1761199828716 for job ffffffffbd30e5570000000000000001.
2025-10-23 14:10:30,023 INFO org.apache.flink.runtime.state.SharedStateRegistryImpl [] - state self-sustained:true, lastCompletedCheckpoint:191, earliestDependent:9223372036854775807, highestNotClaimedCheckpointID:-1
2025-10-23 14:10:30,023 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - job ffffffffbd30e5570000000000000001 checkpoint 191 completed, job is state-sustained
2025-10-23 14:10:30,207 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 191 for job ffffffffbd30e5570000000000000001 (280739389 bytes, checkpointDuration=1425 ms, finalizationTime=66 ms).
2025-10-23 14:15:28,717 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 192 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1761200128716 for job ffffffffbd30e5570000000000000001.
2025-10-23 14:15:29,030 INFO org.apache.flink.runtime.state.SharedStateRegistryImpl [] - state self-sustained:true, lastCompletedCheckpoint:192, earliestDependent:9223372036854775807, highestNotClaimedCheckpointID:-1
2025-10-23 14:15:29,030 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - job ffffffffbd30e5570000000000000001 checkpoint 192 completed, job is state-sustained
2025-10-23 14:15:29,096 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 192 for job ffffffffbd30e5570000000000000001 (317081932 bytes, checkpointDuration=335 ms, finalizationTime=45 ms).
2025-10-23 14:16:37,533 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (12/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_11_1) switched from RUNNING to FAILED on session-2123414-1761142270-taskmanager-1-10 @ hldy-data-k8s-flink-ssd-node03895.mt (dataPort=23347).
java.lang.NullPointerException: null
at org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor.materializeMatch(SharedBufferAccessor.java:213) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.cep.nfa.NFA.processMatchesAccordingToSkipStrategy(NFA.java:474) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:337) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.cep.operator.CepOperator.advanceTime(CepOperator.java:429) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.cep.operator.CepOperator.onEventTime(CepOperator.java:325) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:599) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:552) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:843) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:792) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:969) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:948) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) ~[flink-dist-1.16.1.jar:1.16.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
2025-10-23 14:16:37,534 INFO org.apache.flink.runtime.executiongraph.failover.flip1.TaskManagerRestartStrategy [] - Received failure event: TaskFailureEvent{taskManagerId=session-2123414-1761142270-taskmanager-1-10, timestamp=1761200197533, cause=NullPointerException: null}, excluded: false
2025-10-23 14:16:37,534 INFO org.apache.flink.runtime.executiongraph.failover.flip1.TaskManagerRestartStrategy [] - Resetting restart strategy state due to stable running period
2025-10-23 14:16:37,536 INFO org.apache.flink.runtime.executiongraph.failover.flip1.ContinuousRestartLimitation [] - Earliest failure timestamp: 1761199499926, max continuous restart duration: 28800000 ms
2025-10-23 14:16:37,536 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 423 tasks will be restarted to recover the failed task dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_11_1.
2025-10-23 14:16:37,536 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job rt_scene_51697_staging (ffffffffbd30e5570000000000000001) switched from state RUNNING to RESTARTING.Mrugesh Kadia
10/24/2025, 9:48 AMMohsen Rezaei
10/27/2025, 11:12 PMGROUP BY clause. I filed an issue for this to show some details on what's going on here, but I was curious if anyone else has run into this since it's a very basic test against Flink 2.1? Running that scenario in a sync state works fine, but is not going to be ideal for more complex scenariosArman shakeri
10/28/2025, 10:09 AMTiago Pereira
10/28/2025, 12:11 PMManish Jain
10/28/2025, 1:49 PMstakater/reloader:v1.0.29 to reload the pods when a config changes. But the annotations that are working with other pods, are not working with Flink components.
Is anyone using a similar setup and has run into such a problem? We don't want to create a custom solution for job restarts, and manual restarts, are not optimal.Francisco Morillo
10/28/2025, 7:29 PMNoufal Rijal
10/29/2025, 5:59 AMapiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-session
namespace: flink-test
spec:
image: pyflink-session-test:v1.0
flinkVersion: v1_20
imagePullPolicy: Always
serviceAccount: flink
mode: standalone
flinkConfiguration:
# All Flink runtime config keys go here
fs.allowed-fallback-filesystems: "file"
io.tmp.dirs: "/tmp"
taskmanager.numberOfTaskSlots: "4"
# ===== OPERATOR AUTOSCALER =====
kubernetes.operator.job.autoscaler.enabled: "true"
kubernetes.operator.job.autoscaler.target.utilization: "0.7"
kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
kubernetes.operator.job.autoscaler.metrics.window: "5m"
kubernetes.operator.job.autoscaler.scale-up.grace-period: "1m"
kubernetes.operator.job.autoscaler.scale-down.grace-period: "5m"
# # 💡 MOVED: jobManager must be a direct child of 'spec'
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
replicas: 2
resource:
# memory: "10240m"
memory: "2048m"
cpu: 2
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: ppe-kafka-streaming
namespace: flink-test
spec:
deploymentName: flink-session
job:
jarURI: /opt/flink/opt/flink-python-1.20.3.jar
# entryClass: org.apache.flink.client.python.PythonDriver
# args:
# # 1. Main PyFlink Script
# - "--python" # Changed from -py
# - "/opt/flink/usrlib/streaming_test.py"
# # 2. Python Archives
# - "--pyArchives" # Changed from -pyarch
# - "blob_path#flink_venv"
# # 3. Python Executable
# - "-pyexec" # This is correct!
# - "flink_venv/bin/python"
args:
- "-py"
- "/opt/flink/usrlib/streaming_test.py"
- "-pyarch"
- "blob_path#flink_venv"
- "-pyexec"
- "flink_venv/bin/python"
parallelism: 2
Request for your help if you have faced and tackled a similar issue.
#C065944F9M2 #C03G7LJTS2G #C03GV7L3G2CSaketh
10/29/2025, 7:09 AMMohamed Galal
10/29/2025, 7:19 AMAndré Santos
10/29/2025, 7:13 PMAbstractFlinkService.submitJobToSessionCluster() - it bypasses rest pod leader discovery entirely.Royston
10/30/2025, 11:43 AMIain Dixon
10/30/2025, 1:23 PMinPoolUsage and outPoolUsage are good metrics with which to assess the presence of backpressure. To test things out I built a really simple setup as seen in the picture below, where records are generated in the generator (via a loop to create the rate and Thread.sleep at the end to buff out the rest of the second, based on the DS2 wordcount here https://github.com/strymon-system/ds2/blob/master/flink-examples/src/main/java/ch/ethz/systems/strymon/ds2/flink/wordcount/sources/RateControlledSourceFunction.java), sent to a pipeline workload simulator (which is a single operator which counts the number of recieved records and runs a Thread.sleep for different frequencies in order to simulate pipeline workload), and exit to a sink where records are received but not saved or sent onwards. I bound the parallelism of each operator to 1 (to create the minimumal possible pipeline). The generator produces a constant workload of 1000 records per second, and the workload simulator produces a constant work for every n records.
!experimental_setup▾
outPoolUsage would trend up to some value (as roughly 500ms of "work" should be created every second) and remain relatively constant at that value, raher than dipping back down and jumping up as seen in the graph. I'm not sure what mechanism in Flink would be responsible for this behaviour if the workload is constant, and I was wondering anyone working on Flink could explain what's occuring or point me in the right direction. I'm aware (from the linked blog post above) that the outPoolUsage metric is an aggregation of the floatingBuffersUsage and exclusiveBuffersUsage metrics, so the dropping to 10% would be one of the exclusiveBuffers, but why would floating buffers come and go if the pipeline workload and arrival rates are constant?
!buffer_question▾