Pratik Datta
05/23/2025, 2:34 AMYup, in my situation I eventually just forked the existing ES 7.x connector and used the updated version of the underlying REST client to support compatibility mode and passed the headers to the sinks which seems to work without any major code changes to the jobs themselves.https://www.linen.dev/s/apache-flink/t/22890175/bumping-this-again-for-visibility-i-ve-been-struggling-with-#a13f3e4c-9544-40db-945c-d056e042eea4. Did you make any PR for this? I am planning to do the same thing. I wish this was officially supported. Elastic Search license says that even for updated version, the client license is still Apache, so it should be ok.
Sachin Mittal
05/24/2025, 5:37 AMcom.mongodb.MongoCommandException: Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.' on server <http://xxxx.yyy.mongodb.net:27017|xxxx.yyy.mongodb.net:27017>. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1747876810, "i": 30}}, "signature": {"hash": {"$binary": {"base64": "8rIRtcVyp/u8ddzGnA4Z5r1L79A=", "subType": "00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1747876810, "i": 30}}}
Now I have instantiated my Source as:
MongoDBSource.<T>builder()
...
.startupOptions(StartupOptions.initial())
.batchSize(2048)
.connectionOptions("<http://heartbeat.interval.ms|heartbeat.interval.ms>=5000")
.heartbeatIntervalMillis(5000)
.closeIdleReaders(true)
.deserializer(...)
.build();
As mentioned in docs, I have added heartbeat so for infrequently changed oplogs it does not fail, however it looks like these settings have no effect on the failure and still fails when I did not have the heartbeat.
Can anyone tell me what else I could fix here?
Are there any settings missing from the mongodb side?Fabrizzio Chavez
05/26/2025, 4:43 AMОлег Спица
05/28/2025, 10:23 AMDaniel Maciel
05/28/2025, 4:26 PMAntonio Davide Cali
05/29/2025, 10:14 AMSindhu P R
05/29/2025, 11:23 AMjava.lang.RuntimeException: Missing required TM metrics
at org.apache.flink.autoscaler.RestApiMetricsCollector.queryTmMetrics(RestApiMetricsCollector.java:179)
at org.apache.flink.autoscaler.ScalingMetricCollector.updateMetrics(ScalingMetricCollector.java:137)
at org.apache.flink.autoscaler.JobAutoScalerImpl.runScalingLogic(JobAutoScalerImpl.java:183)
at org.apache.flink.autoscaler.JobAutoScalerImpl.scale(JobAutoScalerImpl.java:103)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.applyAutoscaler(AbstractFlinkResourceReconciler.java:219)
at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:142)
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:155)
at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:62)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:110)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:136)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:117)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:452)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
I have used the following flinkConfiguration:
job.autoscaler.metrics.window: 3m
taskmanager.memory.jvm-metaspace.size: 256 mb
metrics.system-resource: 'true'
pipeline.max-parallelism: '24'
taskmanager.network.detailed-metrics: 'true'
job.autoscaler.target.utilization.boundary: '0.1'
job.autoscaler.catch-up.duration: 5m
job.autoscaler.restart.time: 2m
job.autoscaler.scaling.enabled: 'true'
job.autoscaler.stabilization.interval: 1m
job.autoscaler.enabled: 'true'
jobmanager.scheduler: adaptive
How do I make the metrics available to the operator?Younes Naguib
05/29/2025, 6:10 PMMariusz Ołownia
05/30/2025, 5:58 AMSELECT TRANSFORM(arr, x -> ROW(x.a + 1, x.b * 2)) AS new_arr
FROM my_table;
hajime ni
05/30/2025, 9:01 AMPhước Hồ
05/30/2025, 10:40 AMZackary Naas
05/30/2025, 2:57 PMMichael LeGore
05/30/2025, 10:44 PMMohammadReza Shahmorady
05/31/2025, 2:28 PMמייקי בר יעקב
06/01/2025, 10:25 PMJonathan Du
06/02/2025, 12:50 AMMonika Bednarz
06/02/2025, 8:16 AMSTRING
or anything else in Flink SQL.
We can declare the source, but when it tries consuming, it fails with Found org.example.MyEnumType, expecting union
We don't have the option to change the producer schema to make it nullable.
If you know of a working trick or can point me to a good place to ask, i'd be grateful 🙏Sandip Nayak
06/03/2025, 6:59 AMkubernetes operator v1.11
with Flink 2.0.0
successfully? I am getting an error
Could not parse command line arguments [<some IP>, --pyClientExecutable, /usr/local/bin/python, -py, /opt/flink/usrlib/app.py, --jarfile, /opt/flink/usrlib/dependencies-1.0.0.jar]
The same config work fine for kubernetes operator v1.9
with Flink 1.18
, these are the args I am passing
args: ["--pyClientExecutable", "/usr/local/bin/python",
"--py", "/opt/flink/usrlib/app.py",
"--jarfile", "/opt/flink/usrlib/dependencies-1.0.0.jar"]
Jacob Jona Fahlenkamp
06/03/2025, 7:29 PM2025-06-03 21:27:21
java.lang.NullPointerException: Initial Segment may not be null
at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:67)
at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:46)
at org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap$RecordArea.<init>(AbstractBytesHashMap.java:268)
at org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.<init>(AbstractBytesHashMap.java:107)
at org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.<init>(AbstractBytesHashMap.java:96)
at org.apache.flink.table.runtime.util.collections.binary.BytesHashMap.<init>(BytesHashMap.java:43)
at LocalHashAggregateWithKeys$35.open(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Suppressed: java.lang.NullPointerException
at LocalHashAggregateWithKeys$35.close(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1181)
at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1085)
at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:952)
... 3 more
Rushikesh Gulve
06/04/2025, 8:12 AM2025-06-04 07:28:49,048 WARN org.apache.pekko.remote.ReliableDeliverySupervisor [] - Association with remote system [<pekko.tcp://flink@192.168.164.24:6122>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<pekko.tcp://flink@192.168.164.24:6122>]] Caused by: [org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.164.24:6122, caused by: java.net.ConnectException: Connection refused]
I am not able to understand the cause of the issue. I am also sharing my deployment file
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: data-processing-flink
namespace: flink
spec:
image: 624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest
flinkVersion: "v1_20"
serviceAccount: flink
flinkConfiguration:
rest.port: "8081"
jobmanager.rpc.address: "data-processing-flink"
jobmanager.rpc.port: "6123"
taskmanager.numberOfTaskSlots: "2"
podTemplate:
spec:
imagePullSecrets:
- name: ecrscr-credentials
containers:
- name: flink-main-container
imagePullPolicy: Always
envFrom:
- secretRef:
name: redis-secret-main
env:
- name: KAFKA_BROKER
value: "kafka.flink.svc.cluster.local:9092"
- name: BOOTSTRAP_SERVERS
value: "kafka.flink.svc.cluster.local:9092"
- name: MINIO_SERVER
value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
- name: MINIO_USER
value: "minio"
- name: MINIO_PASSWORD
value: "minio123"
securityContext:
runAsUser: 9999
runAsGroup: 9999
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "4096m"
cpu: 2
replicas: 10
job:
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
upgradeMode: stateless
Can anyone suggest possible causes. Any help would be greatly appreciated.nick christidis
06/04/2025, 1:42 PMAnshum Verma
06/04/2025, 4:26 PMDheeraj Panangat
06/05/2025, 6:37 AMRushikesh Gulve
06/06/2025, 9:12 AMimport org.apache.flink.runtime.state.KeyGroupRangeAssignment;
public class FlinkKeyGroupMapper {
public static void main(String[] args) {
final int maxParallelism = 128; // Flink maxParallelism
final int parallelism = 10; // Number of subtasks
final int keysNeeded = parallelism;
// Map: subtask index -> key found
java.util.Map<Integer, String> subtaskToKey = new java.util.HashMap<>();
int i = 100;
int configId = 20003;
while (subtaskToKey.size() < keysNeeded) {
String key = configId + "_" + i;
// Compute key group for this key (Flink does MurmurHash inside)
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
// Find which operator subtask the key group maps to
int subtaskIndex = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, keyGroup);
// If this subtask index not assigned a key, assign it
if (!subtaskToKey.containsKey(subtaskIndex)) {
subtaskToKey.put(subtaskIndex, key);
System.out.println("Subtask " + subtaskIndex + " -> Key: " + key);
}
i++;
}
}
}
this is how I have generated the keys, and my key selector in flink also uses the same key. Can anyone guide me in how can I determine which parallel instance of subtask my current key should be redirected to? ThanksSandip Nayak
06/08/2025, 10:32 PMDisaggregated State Management
with ForSt
state backend on Flink 2.0
, even with the following config, during Flink app restoring from state, I see the the full checkpoints are downloaded into the taskmanager pod, do you know what I might be missing?
state.backend.type: "forst"
table.exec.async-state.enabled: "true"
execution.checkpointing.incremental: "true"
table.exec.mini-batch.enabled: "false"
table.optimizer.agg-phase-strategy: "ONE_PHASE"
Yarden BenMoshe
06/10/2025, 7:00 AMMonika Bednarz
06/11/2025, 9:29 AMCREATE TABLE source__topic ( ... ) WITH (
'connector' = 'kafka',
'value.format' = 'avro-confluent',
...
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="..." password="...";');
We want to get rid of the explicit config passing.
(if there's some form of authentication that doesn't require this in cleartext, also please point me to it 🙏 )Tiansu Yu
06/11/2025, 12:00 PMDheeraj Panangat
06/11/2025, 1:23 PMMohammadReza Shahmorady
06/12/2025, 1:17 AM<http://linger.ms|linger.ms>=500
batch.size=1048576
compression.type=lz4
However, when there's significant backpressure, the producer doesn't start sending data as expected and takes a very long time.
When I remove the batch.size
configuration, performance slightly improves, but it's still not ideal.
I've attached a screenshot with more details in threads.
Does anyone have any suggestions on how to resolve this?