https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • p

    Pratik Datta

    05/23/2025, 2:34 AM
    Is the Elasticsearch 8 connector released ? I found the https://issues.apache.org/jira/browse/FLINK-26088 which was merged , but this doesn't include the table API connector. I found the table API connector Dynamic Table Sink in a separate branch in https://github.com/apache/flink-connector-elasticsearch/pull/53#issuecomment-1862265394 but that was never merged. @Rion Williams I found a message from you that "
    Yup, in my situation I eventually just forked the existing ES 7.x connector and used the updated version of the underlying REST client to support compatibility mode and passed the headers to the sinks which seems to work without any major code changes to the jobs themselves.
    https://www.linen.dev/s/apache-flink/t/22890175/bumping-this-again-for-visibility-i-ve-been-struggling-with-#a13f3e4c-9544-40db-945c-d056e042eea4. Did you make any PR for this? I am planning to do the same thing. I wish this was officially supported. Elastic Search license says that even for updated version, the client license is still Apache, so it should be ok.
  • s

    Sachin Mittal

    05/24/2025, 5:37 AM
    Hello folks, So I have a data stream applications which pulls data from MongoDB using CDC, and after the process runs for few days it fails with following stacktrace:
    Copy code
    com.mongodb.MongoCommandException: Command failed with error 286 (ChangeStreamHistoryLost): 'PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.' on server <http://xxxx.yyy.mongodb.net:27017|xxxx.yyy.mongodb.net:27017>. The full response is {"errorLabels": ["NonResumableChangeStreamError"], "ok": 0.0, "errmsg": "PlanExecutor error during aggregation :: caused by :: Resume of change stream was not possible, as the resume point may no longer be in the oplog.", "code": 286, "codeName": "ChangeStreamHistoryLost", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1747876810, "i": 30}}, "signature": {"hash": {"$binary": {"base64": "8rIRtcVyp/u8ddzGnA4Z5r1L79A=", "subType": "00"}}, "keyId": 7448674344508588123}}, "operationTime": {"$timestamp": {"t": 1747876810, "i": 30}}}
    Now I have instantiated my Source as:
    Copy code
    MongoDBSource.<T>builder()
        ...
        .startupOptions(StartupOptions.initial())
        .batchSize(2048)
        .connectionOptions("<http://heartbeat.interval.ms|heartbeat.interval.ms>=5000")
        .heartbeatIntervalMillis(5000)
        .closeIdleReaders(true)
        .deserializer(...)
        .build();
    As mentioned in docs, I have added heartbeat so for infrequently changed oplogs it does not fail, however it looks like these settings have no effect on the failure and still fails when I did not have the heartbeat. Can anyone tell me what else I could fix here? Are there any settings missing from the mongodb side?
  • f

    Fabrizzio Chavez

    05/26/2025, 4:43 AM
    Hi, why some connectors are abandonned? For example there are many contributions in flink pulsar connector and rabbitmq connector, but nobody is approving and merging the PRs or at least refusing it because of any valid argument, no answers in the github chats, no answers in dev email subscriptions, so it looks like there is no value if any wants to contribute and spend time in that connectors
    p
    • 2
    • 3
  • о

    Олег Спица

    05/28/2025, 10:23 AM
    Hi folks, I'm under migration to Apache Flink Operator with FlinkDeployments and everything is working fine in case when all jars are located in /lib folder of docker image (I'm not using fat jar), but when I moving job's and dependency jars into /usrlib - it doesn't work because couldn't found job's jar. Old deployment way (helm chart with deployment) is working fine. Maybe I need to tell somehow to operator (or in flink config) that jars are divided between /lib and /usrlib folders? Any suggestions, please In logs, I see that classpath contains only jars from /lib, but no one from /usrlib. Classpath: /opt/flink/lib/flink-cep-1.19.1.jar/opt/flink/lib/flink connector files 1.19.1.jar/opt/flink/lib/flink-csv-1.19.1.jar/opt/flink/lib/flink json 1.19.1.jar/opt/flink/lib/flink-scala_2.12-1.19.1.jar/opt/flink/lib/flink table api java uber 1.19.1.jar/opt/flink/lib/flink-table-planner-loader-1.19.1.jar/opt/flink/lib/flink table runtime 1.19.1.jar/opt/flink/lib/jackson-annotations-2.15.3.jar/opt/flink/lib/jackson core 2.15.3.jar/opt/flink/lib/jackson-core-asl-1.9.2.jar/opt/flink/lib/jackson databind 2.15.3.jar/opt/flink/lib/log4j-1.2-api-2.17.1.jar/opt/flink/lib/log4j api 2.17.1.jar/opt/flink/lib/logback-classic-1.2.11.jar/opt/flink/lib/logback core 1.2.11.jar/opt/flink/lib/logstash-logback-encoder-5.3.jar/opt/flink/lib/flink dist 1.19.1.jar:/etc/hadoop/conf Flink version: 1.19.1 Apache Flink Operator version: 1.11.0 Application & Standalone Mode
    p
    j
    • 3
    • 10
  • d

    Daniel Maciel

    05/28/2025, 4:26 PM
    Hey, Do you have any idea if/when Apache Pulsar Connector for Flink 2.0 will be released? Do you know the impact on performance/latency when using kafka connector on Pulsar (through KoP)?
    f
    p
    • 3
    • 4
  • a

    Antonio Davide Cali

    05/29/2025, 10:14 AM
    Hello folks, I have a question. Is there any type of mechanism on Dynamic table to trigger a reset of the state? For example during a stream I'm doing some sort of aggregation, but I want to actually trigger a reset of the state of that because some re-seeding of data on the stream?
  • s

    Sindhu P R

    05/29/2025, 11:23 AM
    I am trying to implement Flink autoscaler in the Flink Kubernetes operator. I am seeing the following error in the operator logs:
    Copy code
    java.lang.RuntimeException: Missing required TM metrics
    at org.apache.flink.autoscaler.RestApiMetricsCollector.queryTmMetrics(RestApiMetricsCollector.java:179)
    at org.apache.flink.autoscaler.ScalingMetricCollector.updateMetrics(ScalingMetricCollector.java:137)
    at org.apache.flink.autoscaler.JobAutoScalerImpl.runScalingLogic(JobAutoScalerImpl.java:183)
    at org.apache.flink.autoscaler.JobAutoScalerImpl.scale(JobAutoScalerImpl.java:103)
    at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.applyAutoscaler(AbstractFlinkResourceReconciler.java:219)
    at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:142)
    at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:155)
    at org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.reconcile(FlinkDeploymentController.java:62)
    at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
    at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
    at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
    at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:110)
    at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:136)
    at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:117)
    at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:91)
    at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:64)
    at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:452)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
    I have used the following flinkConfiguration:
    Copy code
    job.autoscaler.metrics.window: 3m
    taskmanager.memory.jvm-metaspace.size: 256 mb
    metrics.system-resource: 'true'
    pipeline.max-parallelism: '24'
    taskmanager.network.detailed-metrics: 'true'
    job.autoscaler.target.utilization.boundary: '0.1'
    job.autoscaler.catch-up.duration: 5m
    job.autoscaler.restart.time: 2m
    job.autoscaler.scaling.enabled: 'true'
    job.autoscaler.stabilization.interval: 1m
    job.autoscaler.enabled: 'true'
    jobmanager.scheduler: adaptive
    How do I make the metrics available to the operator?
  • y

    Younes Naguib

    05/29/2025, 6:10 PM
    Hi, I'm using Flink on k8s using Flink Operator, and I want to setup Rack awareness for Kafka by setting up TM_NODE_AZ. I have that value at the node level, and I'm looking for a good way to get it down to the Pod level. Other than running an init script, any ideas? Thanks
    • 1
    • 1
  • m

    Mariusz Ołownia

    05/30/2025, 5:58 AM
    Hi, I have a question. Is it possible to map over an array of rows in Flink SQL? For example, something like:
    Copy code
    SELECT TRANSFORM(arr, x -> ROW(x.a + 1, x.b * 2)) AS new_arr
    FROM my_table;
  • h

    hajime ni

    05/30/2025, 9:01 AM
    Hi, I have a question regarding the KinesisStreamsSource. I’m currently working on upgrading an application from Flink 1.18.1 to Flink 1.20, using flink-connector-aws with AWS Kinesis Data Stream as the source. I couldn’t find the SHARD_GETRECORDS_INTERVAL_MILLIS configuration for the new KinesisStreamsSource within the flink-connector-aws-kinesis-streams for Flink 1.20. Was this intentional, and if so, what is the recommended way to manage the GetRecords call frequency with the new KinesisStreamsSource? --- According to the AWS documentation, the GetRecords API has a call limit of 5 TPS per shard. The 200ms interval (1000ms / 5 TPS = 200ms) configurable in FlinkKinesisConsumer seemed appropriate to respect this limit. In fact, when testing with KinesisStreamsSource, I’ve observed through AWS CloudWatch Metrics that approximately 10-20% of the GetRecords API calls are failing. I suspect this might be due to hitting the API limits, as there’s no explicit interval control. --- Thanks
    a
    • 2
    • 2
  • p

    Phước Hồ

    05/30/2025, 10:40 AM
    Hi, does anyone here success connect to flink sql gateway behind https api gateway using flink-sql-jdbc-driver? It's always connect to http port, not https port
  • z

    Zackary Naas

    05/30/2025, 2:57 PM
    Hi! we are using Flink 1.19.2, and we are seeing an interesting issue. Hoping someone may have experience with something similar: • Kafka sources, consuming data from multiple topics in the same upstream kafka cluster • joining those sources together in Flink using regular inner and left outer joins (not temporal or anything else - we need all state from all time for each upstream source) • Using 64 parallelism, 8 slots We are seeing that occasionally one of the consumer operators, which is reading from the kafka topic, seems to be having some sort of bug related to infinitely consuming and seemingly never actually progressing. The upstream kafka topic has a total of 16M records, but the source operator will show that it sent 170M records through to the next step. There's no logic in this operator, as it is reading from the source and selecting columns and that is it. We are using avro-debezium and upsert-kafka for this source. Most of our other jobs like this just work, but there are a handful that get stuck in this state, and they basically show a static value for pending records and for remaining kafka records via metrics reporting, meaning it looks like they are never making progress. Any ideas? Anyone faced this before?? Thanks y'all!
    r
    • 2
    • 5
  • m

    Michael LeGore

    05/30/2025, 10:44 PM
    Is there a way to get timers to fire correctly in BATCH mode? I want them to fire as if they were happening in streaming mode, at the event time they were set? I was hoping that DataStreamV2 would allow this, but it seems like it still does not. The use case we want for BATCH mode is to simulate what would have happened in streaming mode, to produce historical ML feature values, and the differences between streaming and batch make that less viable.
    p
    • 2
    • 1
  • m

    MohammadReza Shahmorady

    05/31/2025, 2:28 PM
    im trying to add a field to my state but when i run my job with new version and pass the last savepoint path with Allow Non Restored State, i get this error com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index 50 out of bounds for length 2 Serialization trace: productId (models.Quantity) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:394) at org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:88) at org.apache.flink.runtime.state.metrics.LatencyTrackingValueState.lambda$value$0(LatencyTrackingValueState.java:60) at org.apache.flink.runtime.state.metrics.AbstractLatencyTrackState.trackLatencyWithIOException(AbstractLatencyTrackState.java:112) at org.apache.flink.runtime.state.metrics.LatencyTrackingValueState.value(LatencyTrackingValueState.java:59) at ir.okala.flink.operators.QuantityPriceAggregatorFunction.processElement1(QuantityPriceAggregatorFunction.java:49) at ir.okala.flink.operators.QuantityPriceAggregatorFunction.processElement1(QuantityPriceAggregatorFunction.java:19) at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:95) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.IndexOutOfBoundsException: Index 50 out of bounds for length 2 at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source) at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source) at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source) at java.base/java.util.Objects.checkIndex(Unknown Source) at java.base/java.util.ArrayList.get(Unknown Source) at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42) at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805) at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113) ... 25 more productid is a string what should i do?
    a
    p
    • 3
    • 8
  • u

    מייקי בר יעקב

    06/01/2025, 10:25 PM
    Does flink 2.0 includes built in end to end traces?
  • j

    Jonathan Du

    06/02/2025, 12:50 AM
    Hey folks, I've run into what I think is a race-condition bug 🪲 with the new KinesisStreamsSource with EFO enabled. I'll share some details in the thread 🧵. Keen to hear if anyone has experience with this issue before I raise a bug ticket.
    • 1
    • 5
  • m

    Monika Bednarz

    06/02/2025, 8:16 AM
    Hi Team 🙂 We have a problem with non-nullable enums in an Avro schema of a Kafka topic. I hope some of you faced it and/or have a workaround. The issue is detailed in FLINK-24544. Basically non-null enums can't be consumed using the avro connector, no matter if you declare them as
    STRING
    or anything else in Flink SQL. We can declare the source, but when it tries consuming, it fails with
    Found org.example.MyEnumType, expecting union
    We don't have the option to change the producer schema to make it nullable. If you know of a working trick or can point me to a good place to ask, i'd be grateful 🙏
    ✅ 1
    • 1
    • 1
  • s

    Sandip Nayak

    06/03/2025, 6:59 AM
    Hi All, just looking for some information, has anyone ever used
    kubernetes operator v1.11
    with
    Flink 2.0.0
    successfully? I am getting an error
    Copy code
    Could not parse command line arguments [<some IP>, --pyClientExecutable, /usr/local/bin/python, -py, /opt/flink/usrlib/app.py, --jarfile, /opt/flink/usrlib/dependencies-1.0.0.jar]
    The same config work fine for
    kubernetes operator v1.9
    with
    Flink 1.18
    , these are the args I am passing
    Copy code
    args: ["--pyClientExecutable", "/usr/local/bin/python",
               "--py", "/opt/flink/usrlib/app.py",
               "--jarfile", "/opt/flink/usrlib/dependencies-1.0.0.jar"]
    • 1
    • 1
  • j

    Jacob Jona Fahlenkamp

    06/03/2025, 7:29 PM
    Hello my batch job is in trouble:
    Copy code
    2025-06-03 21:27:21
    java.lang.NullPointerException: Initial Segment may not be null
    	at org.apache.flink.runtime.memory.AbstractPagedOutputView.<init>(AbstractPagedOutputView.java:67)
    	at org.apache.flink.runtime.io.disk.SimpleCollectingOutputView.<init>(SimpleCollectingOutputView.java:46)
    	at org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap$RecordArea.<init>(AbstractBytesHashMap.java:268)
    	at org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.<init>(AbstractBytesHashMap.java:107)
    	at org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.<init>(AbstractBytesHashMap.java:96)
    	at org.apache.flink.table.runtime.util.collections.binary.BytesHashMap.<init>(BytesHashMap.java:43)
    	at LocalHashAggregateWithKeys$35.open(Unknown Source)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.base/java.lang.Thread.run(Unknown Source)
    	Suppressed: java.lang.NullPointerException
    		at LocalHashAggregateWithKeys$35.close(Unknown Source)
    		at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
    		at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1181)
    		at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
    		at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
    		at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
    		at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1085)
    		at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:952)
    		at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
    		at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:952)
    		... 3 more
    • 1
    • 1
  • r

    Rushikesh Gulve

    06/04/2025, 8:12 AM
    Hi All, I am trying to deploy my pyflink application using Kubernetes. My process works fine for 1 Task Manager (2 core cpu and 4gb memory). Now I increase the Task Manager to 10 with same configuration for each Task Manager. Initially it works fine, but as soon as I increase TMs the job fails with error. Error in Job Manager logs are too long. I think the root cause is
    Copy code
    2025-06-04 07:28:49,048 WARN  org.apache.pekko.remote.ReliableDeliverySupervisor           [] - Association with remote system [<pekko.tcp://flink@192.168.164.24:6122>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<pekko.tcp://flink@192.168.164.24:6122>]] Caused by: [org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.164.24:6122, caused by: java.net.ConnectException: Connection refused]
    I am not able to understand the cause of the issue. I am also sharing my deployment file
    Copy code
    apiVersion: flink.apache.org/v1beta1
    kind: FlinkDeployment
    metadata:
      name: data-processing-flink
      namespace: flink
    spec:
      image: 624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest
      flinkVersion: "v1_20"
      serviceAccount: flink
      flinkConfiguration:
        rest.port: "8081"
        jobmanager.rpc.address: "data-processing-flink"
        jobmanager.rpc.port: "6123"
        taskmanager.numberOfTaskSlots: "2"
      podTemplate:
        spec:
          imagePullSecrets:
            - name: ecrscr-credentials
          containers:
            - name: flink-main-container
              imagePullPolicy: Always
              envFrom:
                - secretRef:
                    name: redis-secret-main
              env:
                - name: KAFKA_BROKER
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: BOOTSTRAP_SERVERS
                  value: "kafka.flink.svc.cluster.local:9092"
                - name: MINIO_SERVER
                  value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
                - name: MINIO_USER
                  value: "minio"
                - name: MINIO_PASSWORD
                  value: "minio123"
          securityContext:
            runAsUser: 9999
            runAsGroup: 9999
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "4096m"
          cpu: 2
        replicas: 10
      job:
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
        upgradeMode: stateless
    Can anyone suggest possible causes. Any help would be greatly appreciated.
    p
    • 2
    • 1
  • n

    nick christidis

    06/04/2025, 1:42 PM
    Hello everyone, I’m considering forking and modifying Flink’s KafkaSource so that it can, if needed, read a starting offset for each topic partition from an external state store and override the offset that would normally be chosen based on checkpointing. In other words, I want to force KafkaSource to start from specific offsets I provide, even though checkpoints usually dictate where the source resumes. • Is it feasible to achieve this by customizing components like KafkaSourceEnumerator, KafkaSourceReader, or KafkaSourceSplitReader? • Or do Flink’s source mechanics make this approach impossible, regardless of how much the KafkaSource code is modified? Any insights on whether this kind of override is supported (or fundamentally blocked) by the current KafkaSource architecture would be greatly appreciated. --- UPDATE --- I do not want to use startingOffsets, as I want to achieve that in runtime, not again through DAG recreation (job resubmission, restart, etc.)
    l
    a
    • 3
    • 3
  • a

    Anshum Verma

    06/04/2025, 4:26 PM
    I have been trying to utilize flink ML in streaming mode but I dont know if thats the right choice. The documentation does not guide about enabling streaming mode for flink ML. Anyone here has done this or recommends an alternate approach ? I have tried using agglomerative clustering using this but the pipeline never executes the sink task and eventually fals with Java Heap Space error
  • d

    Dheeraj Panangat

    06/05/2025, 6:37 AM
    Hi Team, I have a custom Logger and want to initialise it at the beginning of JM and TM, how can I call the CustomLogger.initialise() method from docker-entrypoint.sh code ? I am using flink-kubernetes-operator. Appreciate any inputs. This is subsequent to another issue we faced implementing logback.xml as mentioned here Thanks.
    • 1
    • 1
  • r

    Rushikesh Gulve

    06/06/2025, 9:12 AM
    Hi All, I am working on a flink project and I want to maximize utilization od resources. Currently I have 10 Task Managers in my cluster, each having 1 task slot. I am producing data with 10 distinct keys and I expect to use all the 10 task managers, but I end up using only 6 task managers. How do I maximize utilization. I have tried using murmurhash code to find the keys that should be directed to different parallel instances of a subtask, but nothing seems to work.
    Copy code
    import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
    
    public class FlinkKeyGroupMapper {
    
        public static void main(String[] args) {
            final int maxParallelism = 128;  // Flink maxParallelism
            final int parallelism = 10;      // Number of subtasks
            final int keysNeeded = parallelism;
    
            // Map: subtask index -> key found
            java.util.Map<Integer, String> subtaskToKey = new java.util.HashMap<>();
    
            int i = 100;
            int configId = 20003;
    
            while (subtaskToKey.size() < keysNeeded) {
                String key = configId + "_" + i;
    
                // Compute key group for this key (Flink does MurmurHash inside)
                int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
    
                // Find which operator subtask the key group maps to
                int subtaskIndex = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
                        maxParallelism, parallelism, keyGroup);
    
                // If this subtask index not assigned a key, assign it
                if (!subtaskToKey.containsKey(subtaskIndex)) {
                    subtaskToKey.put(subtaskIndex, key);
                    System.out.println("Subtask " + subtaskIndex + " -> Key: " + key);
                }
    
                i++;
            }
        }
    }
    this is how I have generated the keys, and my key selector in flink also uses the same key. Can anyone guide me in how can I determine which parallel instance of subtask my current key should be redirected to? Thanks
    f
    • 2
    • 1
  • s

    Sandip Nayak

    06/08/2025, 10:32 PM
    Hi All, I am exploring
    Disaggregated State Management
    with
    ForSt
    state backend on
    Flink 2.0
    , even with the following config, during Flink app restoring from state, I see the the full checkpoints are downloaded into the taskmanager pod, do you know what I might be missing?
    Copy code
    state.backend.type: "forst"
        table.exec.async-state.enabled: "true"
        execution.checkpointing.incremental: "true"
        table.exec.mini-batch.enabled: "false"
        table.optimizer.agg-phase-strategy: "ONE_PHASE"
    p
    • 2
    • 3
  • y

    Yarden BenMoshe

    06/10/2025, 7:00 AM
    Hi everyone, I am working on a use case that involves writing avro records to a kafka topic, from pipeline-a using DataStream API (Without schema registry, performance concerns). Then, using pipeline-b i create a table with the expected format and try to consume data from the topic to go down the stream. I have an issue with serialization, as the avro connector for Table API generates a schema with name of record = "record" and namespace="org.apache.flink.avro.generated". Have anyone experienced similar scenario, and can maybe advise what are my options here?
    p
    • 2
    • 2
  • m

    Monika Bednarz

    06/11/2025, 9:29 AM
    Hi Team! I truly hope I can get some inspiration or a push in the right direction from you. Context: We deployed Flink on K8s with the official operator. We want to deploy the SQL Gateway, which will allow users to query Kafka topics. We use the JAAS config in normal jobs and the SQL client CLI, like below. We want to eliminate passing the password in cleartext in the API call to the gateway. How to do that properly? The container can have the password as env variable or a file config, but I couldn't make it work (JAAS config path seems to be hardcoded and sql client doesn't support jinja and embedding the env variables). Please share ideas and what worked for you 🙂 🙏 SQL we run:
    Copy code
    CREATE TABLE source__topic ( ... ) WITH (
                  'connector' = 'kafka',
                  'value.format' = 'avro-confluent',
    ...
                  'properties.security.protocol' = 'SASL_SSL',
                  'properties.sasl.mechanism' = 'SCRAM-SHA-512',
                  'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="..." password="...";');
    We want to get rid of the explicit config passing. (if there's some form of authentication that doesn't require this in cleartext, also please point me to it 🙏 )
    • 1
    • 1
  • t

    Tiansu Yu

    06/11/2025, 12:00 PM
    Hi, I have a date column, I would like to express date at midnight 000, is the event timestamp of my table (convert date to timestamp_ltz) how could i do that on Flink SQL?
  • d

    Dheeraj Panangat

    06/11/2025, 1:23 PM
    Hi All, Running into an issue with Flink Autoscaling Implementation : • Using Flink 1.20.1 with Flink Operator 1.12.0 for deployment Issue : • The job stops processing data in between during autoscaling whenever it scales up • We want to auto-scale with Exactly Once and without Exactly Once jobs What we tried : • Enabled Unalligned checkpoints -> though the job never stopped processing, this resulted in data loss Appreciate any inputs here. Is there no way to ensure autoscaling without data loss and exactly-once semantics ? CC: @Abhishek Joshi Thanks.
  • m

    MohammadReza Shahmorady

    06/12/2025, 1:17 AM
    Hi everyone, I'm experiencing an issue with batch producing using Kafka sink. I've configured the following configs:
    Copy code
    <http://linger.ms|linger.ms>=500  
    batch.size=1048576  
    compression.type=lz4
    However, when there's significant backpressure, the producer doesn't start sending data as expected and takes a very long time. When I remove the
    batch.size
    configuration, performance slightly improves, but it's still not ideal. I've attached a screenshot with more details in threads. Does anyone have any suggestions on how to resolve this?
    • 1
    • 1
1...9495969798Latest