https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    aditya arya

    08/04/2025, 8:33 PM
    Hi Team, I am new to this slack to asking here if someone can help. We are using flink 1.18.2 and on we are facing a non-heap memory issue. We are ruining flink on Linux and as we run jobs multiple times we are seeing resident memory getting increased and then eventually getting filled up. Along with our jobs which are simple dag of source as MySQL and target as MySQL. We tried to run word count multiple times and faced same issue. Can anyone know of this issue and can help. We have tried the same with various versions of flink in java which is 1.19, 1.20 and then 2.0 versions. All have same issue. Also, we are using data set api as of now and it’s difficult to switch to streaming immediately.
  • v

    Vikas Patil

    08/04/2025, 9:27 PM
    Hey Team, released the new
    flink-k8s-operator
    version
    1.12
    and we notice a warning "Job Not Found" in the k8s events of the target namespace. The operator is able to see the correct jobId. I can see it in the logs. But for some reason it does use it, but instead uses some other value. Any idea what is going on ? Relevant logs here:
    Copy code
    flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:24:39,941 o.a.f.r.r.RestClient           [DEBUG] Received response {"jobs":[{"jid":"000000002edab9a50000000000000000","name":"streaming-canary-job","start-time":1754342059808,"end-time":-1,"duration":620135,"state":"RUNNING","last-modification":1754342500559,"tasks":{"running":4,"canceling":0,"canceled":0,"total":4,"created":0,"scheduled":0,"deploying":0,"reconciling":0,"finished":0,"initializing":0,"failed":0}}]}.
    
    // more logs here
    
    flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:25:25,158 o.a.f.r.r.RestClient           [DEBUG] Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to flink-canary-rest.flink-canary:8081/v1/jobs/overview
    flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:25:25,197 o.a.f.r.r.RestClient           [DEBUG] Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to flink-canary-rest.flink-canary:8081/v1/config
    flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:25:25,202 o.a.f.r.r.RestClient           [DEBUG] Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to flink-canary-rest.flink-canary:8081/v1/taskmanagers
    flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:25:25,210 o.a.f.r.r.RestClient           [DEBUG] Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to flink-canary-rest.flink-canary:8081/v1/jobs/9bfdca22ec0b09ca745d20cd99c5cb6a/checkpoints
    g
    • 2
    • 4
  • k

    Krishnakumar K

    08/05/2025, 2:55 PM
    Hey folks, i'm working on setting up a flink job that consumes data from kafka and i'm getting an error when setting the starting offset of this kafka source to
    KafkaOffsetsInitializer.committed_offsets()
    Copy code
    File "/opt/flink/usrlib/uco_flink_job/flink_job.py", line 507, in main
        .set_starting_offsets(KafkaOffsetsInitializer.committed_offsets())
      File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 750, in committed_offsets
      File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
      File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
      File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 330, in get_return_value
    py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.committedOffsets. Trace:
    org.apache.flink.api.python.shaded.py4j.Py4JException: Method committedOffsets([class org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy]) does not exist
            at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
            at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
            at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:276)
            at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
            at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
            at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
            at java.base/java.lang.Thread.run(Thread.java:829)
    
    
    org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
            at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
            at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
            at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.base/java.lang.reflect.Method.invoke(Method.java:566)
            at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
            at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
            at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
            at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
            at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
            at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
            at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
            at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
            at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
            at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
    Caused by: java.lang.RuntimeException: Python process exits with code: 1
            at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
            ... 14 more
    Has anyone faced a similar issue before or have built pipelines that kafka source by setting offset to
    committed_offsets
    . these are the jar files i'm using and their versions :-
    Copy code
    flink-connector-kafka-3.3.0-1.20.jar
    flink-sql-connector-kafka-3.3.0-1.20.jar
    kafka-clients-3.4.0.jar
    Thanks!
    p
    • 2
    • 1
  • r

    Renan Nogueira

    08/05/2025, 3:09 PM
    SQL Server CDC connector incorrectly handles special characters in database When using the sqlserver-cdc connector with a database that contains a hyphen (-) in its name, the source fails during discovery phase. This happens even in the latest version (3.4.0). The connector is not being able to escape or quote database names that contain special characters like hyphens (e.g., sql-db-dev-dbname in SQL Server). The connector logs the following warning and skips the database:
    skipping database 'sql-db-dev-dbname' due to error reading tables: Incorrect syntax near '-'
    This prevents Flink from reading any table from that database using CDC. Is there any workaround or configuration that allows using databases with hyphens in their names, without having to rename the database?
  • e

    Ethan Brown

    08/05/2025, 9:35 PM
    Hi all, IRSA + Presto S3 FS question. Setup • Flink 2.0.0 on EKS (Flink-K8s-Operator 1.11.0) • Pods use IRSA (no static keys) • Migrating to
    flink-s3-fs-presto
    (
    s3p://
    ) for state & sinks Observed 1.
    presto.s3.credentials-provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
    → JM dies (
    NoSuchMethodException
    on that class). 2.
    org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
    → starts only if
    s3.roleArn
    is also set, which defeats IRSA. Questions • Is IRSA officially supported by Presto FS yet? • If not, is the workaround to keep checkpoints/HA on Hadoop S3A with IRSA and use Presto only for sinks (static keys)? • Any JIRA/PR tracking full IRSA support? Thanks!
  • v

    Vikas Patil

    08/06/2025, 4:08 AM
    Is it possible that the job id in the running jobs page different from the one in the configuration ? Why are these 2 not in sync ?
  • m

    Marco Villalobos

    08/06/2025, 4:15 AM
    Hi, does ObjectNode actually work with json-format and Kafka with a JsonDeserializationSchema ? I tried this:
    Copy code
    final JsonDeserializationSchema<ObjectNode> jsonDeserializationSchema = new JsonDeserializationSchema<>(ObjectNode.class);
    		final KafkaSource<ObjectNode> kafkaSource = KafkaSource.<ObjectNode>builder()
    				.setBootstrapServers(kafkaBootstrapServers)
    				.setTopics(kafkaTopic)
    				.setGroupId(kafkaGroupId)
    				.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    				.setValueOnlyDeserializer(jsonDeserializationSchema)
    				.build();
    but when I run it I get the following stack trace:
    Copy code
    2025-08-06 04:09:44,311 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: flame-inf kafka source -> Sink: Print to Std. Out (1/2)#0 (e119be7364394210a5290c812216a46b_9a1186299580bdec0729e3077675ce1a_0_0) switched from RUNNING to FAILED with failure cause:
    java.io.IOException: Failed to deserialize consumer record due to
    	at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[blob_p-da5c3d686c1802bf53f64aa7321d844d6ada3eb1-fc90b5255bf6e1738c8dc5257b589a2f:?]
    	at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[blob_p-da5c3d686c1802bf53f64aa7321d844d6ada3eb1-fc90b5255bf6e1738c8dc5257b589a2f:?]
    	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) ~[flink-connector-files-1.20.2.jar:1.20.2]
    	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) [flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.2.jar:1.20.2]
    	at java.base/java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Conflicting setter definitions for property "all": com.fasterxml.jackson.databind.node.ObjectNode#setAll(com.fasterxml.jackson.databind.node.ObjectNode) vs com.fasterxml.jackson.databind.node.ObjectNode#setAll(java.util.Map)
     at [Source: (byte[])"{"type": "xxxx", "sha256": "not used", "frame": 1, "top_left": {"x": 101, "y": 1}, "bottom_right": {"x": 390, "y": 189}}"; line: 1, column: 1]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1909) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:268) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:648) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:4861) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4731) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3738) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.formats.json.JsonDeserializationSchema.deserialize(JsonDeserializationSchema.java:69) ~[flink-json-1.20.2.jar:1.20.2]
    	at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist-1.20.2.jar:1.20.2]
    	at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[blob_p-da5c3d686c1802bf53f64aa7321d844d6ada3eb1-fc90b5255bf6e1738c8dc5257b589a2f:?]
    	at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[blob_p-da5c3d686c1802bf53f64aa7321d844d6ada3eb1-fc90b5255bf6e1738c8dc5257b589a2f:?]
    	... 14 more
    I'm also wondering if it is because print does not know how to serialize ObjectNode? Must I register a serializer for it or something? I even mapped the source into a string before the print like this: `
    Copy code
    jsonNodeSource.map(BaseJsonNode::toString).print();
    but I still get this error:
    Copy code
    org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Conflicting setter definitions for property "all": com.fasterxml.jackson.databind.node.ObjectNode#setAll(java.util.Map) vs com.fasterxml.jackson.databind.node.ObjectNode#setAll(com.fasterxml.jackson.databind.node.ObjectNode)
  • u

    מייקי בר יעקב

    08/07/2025, 7:49 AM
    Does anyone know where i can find the complete and full flink 1.16 java 11 dockerfile?
  • g

    Grzegorz Liter

    08/07/2025, 10:35 AM
    I am using Flink 2.1 but had the same problem with 1.20. We have fairly large snapshots 40-50 GB. Is it possible that savepoint at some point need to fit in TM memory? During savepoint pod memory usage jumps by the around size of the snapshot (half of the size for each of 2 TMs). No Flink memory metric notices the memory usage, only Kubernetes pod memory usage metrics.
    • 1
    • 1
  • a

    Anshuta Awasthi

    08/09/2025, 10:50 AM
    Hi all, we are stuck at a problem in our company - We are using Kafka consumer to read messages and confluent - avro schema registry to deserialize the messages . We need to pass keystore and truststore secret file paths in Flink's Kafka Source table's Connector configuration properties. The property constraint is that- it can only read from local path. How to make these secret files available in local memory of all the nodes? We cannot use HDFS (s3 is the only option where secrets can be stored). Is there any way the secres can be pulled from s3 during runtime and saved in /tmp directory of each node . And this local file path can be set in properties.ssl.keystore.location Anyhelp would be much appreciated. 🙏
    s
    • 2
    • 4
  • u

    מייקי בר יעקב

    08/09/2025, 2:52 PM
    What are the main features of the flink auto scaler?
  • f

    Fabrizzio Chavez

    08/09/2025, 8:40 PM
    Hi, I am using flink-cdc with mongodb-cdc connector and I'd like to know if it is posible to get directlty the fullDocument from mongodb changestreams?
    Copy code
    CREATE TABLE users (
        db_name         STRING METADATA FROM 'database_name' VIRTUAL,
        collection_name STRING METADATA  FROM 'collection_name' VIRTUAL,
        operation_ts    TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
        operation       STRING METADATA FROM 'row_kind' VIRTUAL,
        _id STRING,
        fullDocument STRING,
      PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
        'connector' = 'mongodb-cdc',
        'hosts' = 'localhost:27017',
        'connection.options' = 'directConnection=true&tls=false',
        'username' = 'admin',
        'password' = 'admin',
        'database' = 'cdc-test',
        'collection' = 'users',
        'scan.full-changelog' = 'true'
    );
    I don't want to deserialize because I need to store the raw data through a jdbc sink, fullDocument is returning null
  • m

    MohammadReza Shahmorady

    08/10/2025, 9:12 PM
    Hello friends, I’ve run into an issue with my Flink job designs, and I could use a bit of help. Basically, we can’t have an operator with more than two inputs. However, there are cases where we need to join three topics together. The solution that comes to mind is to join the first two topics, then join the result with the third topic. But there’s a problem here. For example, I’m joining the price and quantity topics together, and both of them are stored in their respective states. Now, in the next operator, I join the result with the third topic. In the second join, I need to store both the result of the previous join and the third topic! This means I’m storing something again that I already had in the previous step. Now, imagine this requirement keeps growing and I might need more joins — but in every step, I’m repeatedly storing duplicate data. What’s the proper way to solve this problem? Is there something I don’t know that’s causing me to hit this issue, or is this a normal problem — and if so, is there a better solution for it? For context: my code is written using the DataStream API.
    p
    a
    • 3
    • 8
  • e

    Evan NotEvan

    08/10/2025, 11:09 PM
    Hi everyone, I’m starting a new project that will use Apache Flink and I’m wondering which version you’d recommend for development. I initially started with Flink v2.0.0, but I ran into a lot of broken APIs that made it very difficult to work with. My plan was to use the iteration API from flink-ml, but the latest supported version there is 1.17. I tried porting flink-ml to work with Flink 2.0.0 — I can get it to compile, but it required so many hacks that it’s not really usable. My project will require some kind of iteration, since the algorithm I’m developing is iterative by nature. Do you recommend sticking with a stable 1.x release for now, or is there a better approach for iterative workloads on newer versions? Thanks!
  • f

    Fabrizzio Chavez

    08/11/2025, 1:31 AM
    Hello, I am testing with flink .1.20.2 and mongodb cdc connector with the next configuration:
    Copy code
    SET 'sql-client.execution.result-mode' = 'tableau';
    
    CREATE TABLE mongo_users (
        db_name         STRING METADATA FROM 'database_name' VIRTUAL,
        collection_name STRING METADATA  FROM 'collection_name' VIRTUAL,
        operation_ts    TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
        row_kind       STRING METADATA FROM 'row_kind' VIRTUAL,
        _id             STRING,
        name         STRING,
        PRIMARY KEY(_id) NOT ENFORCED
    ) WITH (
        'connector' = 'mongodb-cdc',
        'hosts' = 'localhost:27017',
        'connection.options' = 'directConnection=true&tls=false',
        'username' = 'admin',
        'password' = 'admin',
        'database' = 'cdc-test',
        'collection' = 'users'
    );
    SELECT * FROM mongo_users;
    But I am not sure if the row_kind is returned correctly, because when the document is deleted the row_kind says +U (the previous row_kind), it should not be -D? like the op column added in sql client?
    • 1
    • 1
  • m

    Madhusudhan Reddy

    08/11/2025, 1:42 PM
    Hi Team, We are attempting to perform the following logic using Flink's Table API, with the goal of publishing only the latest change: Scenario: 1.Flink Table1 created from CDC DataStream that have multiple rows with composite primary key 2.Select all the rows for a given one of the composite key 3.Apply LISTAGG to get all the rows for a given one of the composite key 4.When one of the existing row gets updated from CDC 5.Emit event with the change to that row and all the remaining rows with LISTAGG Issue that we face now: 1) First Event is emitted without the row that got updated. 2) Second event is emitted with the row that got updated along with the other existing rows for the key Expectation: 1) Emit only the second event that have row that got updated along with the other existing rows for the key More details are given below When there is an update on Table1, we want to publish only that update However, we are observing that two events are being published — one without the actual update and one with it. Behavior Observed: If an ID has only one row, a single event is emitted as expected. If an ID has multiple rows, and only one row is updated (e.g., a row with c1): Flink internally performs the following steps: 1. -U → Removes the row with c1 (old value) 2. +U → Adds an updated intermediate row (e.g., only c2 without c1) 3. Event is published here (incomplete) 4. -U → Removes c2, seemingly clearing the intermediate state 5. +U → Adds both updated rows (c1 and c2) 6. Another event is published (correct, with all data) 7. Ideally, only the final +U should be published, and intermediate steps filtered out.
    Copy code
    SELECT  
        t1.ID,  
        LISTAGG( 
            CONCAT_WS(
                CHR(31),  -- Unit Separator
                t1.c1, 
                t1.c2
            ), 
            '\u001E'  -- Record Separator
        ) AS t1_concat  
    FROM table1 t1  
    JOIN table2 t2 ON t1.ID = t2.ID  
    GROUP BY t1.ID
    Issue: This intermediate emission leads to duplicate events — one partial and one final — whereas only the final consolidated state should be emitted. Question: How can I make sure that I emit only the latest event that have change instead of emitting 1) Event that doesn't have the row that I am updating 2) Event that have the row that i am updating Update on one of the row in Flink Table- Operation AGGLIST, Join and GroupBY we see ROWKIND -U +U -U +U How can I filter for only the last +U and emit event? Please let me know your view Here is the code
    Copy code
    StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);
    
            FlinkConfigurationManager flinkConfigurationManager = new      FlinkConfigurationManager(streamExecutionEnvironment);
     // Run the query that provides publishable data
        Table viewsAggregateQueryResult =   streamTableEnvironment.sqlQuery(sqlQuery);
         // Process the result rows
        DataStream<JsonNode> eventDataStream = streamTableEnvironment
                    .toChangelogStream(viewsAggregateQueryResult)
                    .filter(new UpdateBeforeFilter())
                    .uid(PROC_NAME + "-update-before-filter-" + "-v1")
                    .name("update before filter:") // filter out -U   Rowkind
                    .map(new  RowToJsonNodeMapper(mappings.toString()));
    
    
     // Output the data after transformation to 
            eventDataStream
                    .map(new EventTransformerMapFunction(
                            configs.getKafkaSchemaHeaders()))
                    .keyBy(EventData::getKey)               
                    .sinkTo(new KafkaSinkWriter(
                            getOutboundKafkaSink(configs)));
    
            streamExecutionEnvironment.execute();
    d
    • 2
    • 2
  • h

    Harish Sharma

    08/12/2025, 6:00 AM
    Hi Team, I am trying to create a DataStream Source incremental snapshot based implementation but I do not see anything getting printing in the console. Please can someone suggest what can be the issue
    • 1
    • 1
  • m

    Mahesh Gupta

    08/12/2025, 8:29 AM
    Hi Team, I am continuously facing this issue. Can some from here help? I see this ticket is open too long and many user are facing too. https://issues.apache.org/jira/browse/FLINK-32212
    Copy code
    cdp-customer-profile-flink-hudi-taskmanager-1-3 @ ip-10-230-15-69.ap-south-1.compute.internal (dataPort=38719).
    java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
    old:[p-de4b1680d065109d59ef7958483755d33ccd99e2-2edc04a09cff45f9714e59581b6bb2dd]
    new:[p-de4b1680d065109d59ef7958483755d33ccd99e2-bb81a92443d3bd2a99dc8fcfaad94962]
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437)
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373)
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249)
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210)
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350)
    	at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1059)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:637)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.base/java.lang.Thread.run(Unknown Source)
    2025-08-11T05:54:03.092Z [flink-pekko.actor.default-dispatcher-13] INFO  o.a.f.r.s.adaptive.AdaptiveScheduler - Restarting job.
    java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
    old:[p-de4b1680d065109d59ef7958483755d33ccd99e2-2edc04a09cff45f9714e59581b6bb2dd]
    new:[p-de4b1680d065109d59ef7958483755d33ccd99e2-bb81a92443d3bd2a99dc8fcfaad94962]
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437)
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373)
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249)
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210)
    	at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350)
    	at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1059)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:637)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.base/java.lang.Thread.run(Unknown Source)
    and seeing the task manager is not releaseing like mentioned on above ticket
    Copy code
    pod/customer-flink-hudi-6c654cddbd-bkx6l   1/1     Running   17 (62m ago)    18h
    pod/customer-flink-hudi-taskmanager-17-1   1/1     Running       0               118m
    pod/customer-flink-hudi-taskmanager-17-2   1/1     Running       0               118m
    pod/customer-flink-hudi-taskmanager-17-3   1/1     Running       0               118m
    pod/customer-flink-hudi-taskmanager-17-4   1/1     Running       0               118m
    pod/customer-flink-hudi-taskmanager-17-5   1/1     Running       0               118m
    pod/customer-flink-hudi-taskmanager-17-6   1/1     Running       0               118m
    pod/customer-flink-hudi-taskmanager-18-1   1/1     Running       0               61m
    pod/customer-flink-hudi-taskmanager-18-2   1/1     Running       0               61m
    pod/customer-flink-hudi-taskmanager-18-3   1/1     Running       0               61m
    pod/customer-flink-hudi-taskmanager-18-4   1/1     Running       0               61m
    pod/customer-flink-hudi-taskmanager-18-5   1/1     Running       0               61m
    pod/customer-flink-hudi-taskmanager-18-6   1/1     Running       0               61m
    m
    • 2
    • 3
  • a

    AMRIT SARKAR

    08/13/2025, 5:23 AM
    Consistent RTE Job failures for Flink Job Our Flink task managers are currently experiencing frequent swapping in and out due to Karpenter autoscaling. This is causing task managers to repeatedly encounter RemoteTransportException (RTE), leading to job failures. We are looking for the ideal configuration to prevent these RTEs and ensure that the Flink Job Manager can automatically retry and recover, rather than failing the entire job. We have already tried various approaches without success. Could you please provide guidance on the optimal configuration parameters to address this issue? We are open to exploring all possible solutions. Configurations
    Copy code
    $internal.flink.version	v1_16
    akka.ask.timeout	601s
    akka.framesize	41943040b
    akka.lookup.timeout	30s
    akka.ssl.enabled	false
    akka.tcp.timeout	610s
    blob.server.port	6124
    blob.service.ssl.enabled	false
    classloader.parent-first-patterns.additional	org.apache.flink.statefun;org.apache.kafka
    classloader.resolve-order	parent-first
    cluster.evenly-spread-out-slots	true
    cluster.health-check.checkpoint-progress.enabled	true
    execution.checkpointing.interval	60s
    execution.target	kubernetes-session
    fs.s3a.aws.credentials.provider	com.amazonaws.auth.WebIdentityTokenCredentialsProvider
    fs.s3a.multipart.size	10M
    heartbeat.rpc-failure-threshold	5
    high-availability	org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.cluster-id	[REDACTED]
    high-availability.jobmanager.port	6123
    high-availability.storageDir	[REDACTED]
    internal.cluster.execution-mode	NORMAL
    jobmanager.execution.failover-strategy	region
    jobmanager.memory.heap.size	66437775360b
    jobmanager.memory.jvm-metaspace.size	1073741824b
    jobmanager.memory.jvm-overhead.max	1073741824b
    jobmanager.memory.jvm-overhead.min	1073741824b
    jobmanager.memory.off-heap.size	134217728b
    jobmanager.memory.process.size	64g
    jobmanager.rpc.address	[REDACTED]
    jobmanager.rpc.port	6123
    jobstore.expiration-time	86400
    kubernetes.cluster-id	[REDACTED]
    kubernetes.container.image	[REDACTED]
    kubernetes.container.image.pull-policy	Always
    kubernetes.flink.conf.dir	/opt/flink/conf
    kubernetes.internal.jobmanager.entrypoint.class	org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
    kubernetes.internal.taskmanager.replicas	2
    kubernetes.jobmanager.annotations	<http://flinkdeployment.flink.apache.org/generation:18|flinkdeployment.flink.apache.org/generation:18>
    kubernetes.jobmanager.cpu	16.0
    kubernetes.jobmanager.owner.reference	[REDACTED]
    kubernetes.jobmanager.replicas	2
    kubernetes.namespace	[REDACTED]
    kubernetes.pod-template-file.jobmanager	[REDACTED]
    kubernetes.pod-template-file.taskmanager	[REDACTED]
    kubernetes.rest-service.exposed.type	ClusterIP
    kubernetes.service-account	[REDACTED]
    kubernetes.taskmanager.cpu	22.0
    metrics.internal.query-service.port	6120
    metrics.reporter.prom.factory.class	org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port	9999
    metrics.reporter.prom.whitelist	_Custom_Source.0
    metrics.reporters	prom
    parallelism.default	16
    resourcemanager.taskmanager-registration.timeout	15min
    resourcemanager.taskmanager-timeout	600000
    rest.address	10.69.99.191
    rest.connection-timeout	120000
    rest.flamegraph.enabled	true
    restart-strategy	exponential-delay
    restart-strategy.exponential-delay.initial-backoff	60s
    restart-strategy.exponential-delay.jitter-factor	0.1
    restart-strategy.exponential-delay.max-backoff	2min
    restart-strategy.exponential-delay.reset-backoff-threshold	10min
    restart-strategy.fixed-delay.attempts	10
    s3.entropy.key	_entropy_
    s3.entropy.length	4
    security.ssl.internal.enabled	false
    security.ssl.rest.enabled	false
    security.ssl.verify-hostname	false
    slot.idle.timeout	300000
    slot.request.timeout	600000
    slotmanager.redundant-taskmanager-num	2
    state.backend	rocksdb
    state.backend.fs.checkpointdir	[REDACTED]
    state.backend.incremental	true
    state.backend.local-recovery	false
    state.backend.rocksdb.block.cache-size	4mb
    state.backend.rocksdb.localdir	/data/flink/rocksdb
    state.backend.rocksdb.memory.managed	true
    state.backend.rocksdb.memory.write-buffer-ratio	0.70
    state.backend.rocksdb.timer-service.factory	ROCKSDB
    state.backend.rocksdb.writebuffer.size	16mb
    state.checkpoints.dir	[REDACTED]
    state.savepoints.dir	[REDACTED]
    taskmanager.data.port	6121
    taskmanager.data.ssl.enabled	false
    taskmanager.heartbeat.interval	60000
    taskmanager.heartbeat.timeout	180000
    taskmanager.memory.framework.heap.size	256mb
    taskmanager.memory.framework.off-heap.size	256mb
    taskmanager.memory.jvm-metaspace.size	1gb
    taskmanager.memory.jvm-overhead.max	10gb
    taskmanager.memory.jvm-overhead.min	10gb
    taskmanager.memory.managed.size	40gb
    taskmanager.memory.network.max	20gb
    taskmanager.memory.network.min	20gb
    taskmanager.memory.process.size	176g
    taskmanager.memory.task.off-heap.size	1gb
    taskmanager.network.memory.exclusive-buffers-request-timeout-ms	120000
    taskmanager.network.request-backoff.max	120000
    taskmanager.network.retries	10
    taskmanager.numberOfTaskSlots	44
    taskmanager.registration.timeout	30 min
    taskmanager.rpc.port	6122
    web.cancel.enable	false
    web.tmpdir	[REDACTED]
    
    version	OpenJDK 64-Bit Server VM - Azul Systems, Inc. - 11/11.0.21.0.102+1-LTS
    arch	amd64
    options	-Dlog4j2.formatMsgNoLookups=true -XX:CompileCommand=quiet -XX:CompileCommand=exclude,example/security/rng/RdRandSecureRandomSpi.generatePRF -Xmx66437775360 -Xms66437775360 -XX:MaxMetaspaceSize=1073741824 -Dlog.file=[REDACTED] -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
    Thank you for your time and expertise.
  • r

    Rushikesh Gulve

    08/13/2025, 10:15 AM
    Hi all, I am trying deploy pyfink application using Kubernetes operator. I am constantly hitting a error after which my Task Manager Pod goes unresponsive and dies after certain interval.
    Copy code
    File "/opt/flink/usrlib/data_processing_v9/stream_processing/process/prepare_data.py", line 71, in process
        filtered_elements = self.filter_elements(raw_data)
      File "/opt/flink/usrlib/data_processing_v9/stream_processing/process/prepare_data.py", line 104, in filter_elements
        raw_mapping_dict = set(self.raw_io_mapping_dict.keys())
      File "/opt/flink/usrlib/data_processing_v9/stream_processing/utils/state_utils/map_state_utils.py", line 66, in keys
        return self.map_state.keys()
      File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 934, in keys
        return self.get_internal_state().keys()
      File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 840, in keys
        self.remote_data_iterator(IterateType.KEYS))
      File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 879, in remote_data_iterator
        return self._map_state_handler.lazy_iterator(
      File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 437, in lazy_iterator
        current_batch, iterator_token = self._iterate_raw(
      File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 621, in _iterate_raw
        data, response_token = self._underlying.get_raw(state_key, continuation_token)
      File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1064, in get_raw
        response = self._blocking_request(
      File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1104, in _blocking_request
        raise self._exception
      File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
        self.run()
      File "/usr/lib/python3.10/threading.py", line 953, in run
        self._target(*self._args, **self._kwargs)
      File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1039, in pull_responses
        for response in responses:
      File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 543, in __next__
        return self._next()
      File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 969, in _next
        raise self
    grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
    	status = StatusCode.UNAVAILABLE
    	details = "recvmsg:Connection reset by peer"
    	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"recvmsg:Connection reset by peer", grpc_status:14}"
    >
    This is the error that I am facing. I can see some changes in metrics around the same time but I am not able to find the root cause behind this issue. Can someone guide me with the tuning of configuration parameters. I am using RocksDB backend.
  • l

    L P V

    08/13/2025, 10:18 AM
    Hi guy, In flink sql, I want to create a table with kafka credential:
    Copy code
    CREATE TABLE src.kafka.kafka_transactions (
      id BIGINT,
      type STRING,
      amount DOUBLE
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'core-transaction-item',
      'properties.bootstrap.servers' = 'generic.kafka.db-dev:9092',
      'properties.group.id' = 'flink-sql-transactionitem',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'protobuf',
      'protobuf.message-class-name' = 'TransactionItem',
      'properties.security.protocol' = 'SASL_PLAINTEXT',
      'properties.sasl.mechanism' = 'SCRAM-SHA-512',
      'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USERNAME}" password="${KAFKA_PASSWORD}";'
    );
    ---> but it's not work. Seem flink sql client can not read directly KAFKA_USERNAME even I passed them when start sql client like this:
    Copy code
    ./bin/sql-client.sh gateway --endpoint 0.0.0.0:8083  -Drest.address=xxxx  -Drest.port=8081 \
        -DKAFKA_USERNAME="$KAFKA_USERNAME" \
        -DKAFKA_PASSWORD="$KAFKA_PASSWORD"
    So is there anyway I could pass Kafka credential to flink sql? This syntax run correctly if I use plaintext for KAKFA_USERNAME / Password instead env variable.
    t
    • 2
    • 1
  • u

    מייקי בר יעקב

    08/14/2025, 9:08 PM
    Does flink in new version have a feature of tracing? i mean to see the flow of every single message
    a
    • 2
    • 2
  • h

    Harish Sharma

    08/15/2025, 6:33 AM
    Hi Team , Do we have any way to integrate MySQL with AWS s3 table ??
    • 1
    • 1
  • r

    Richard Moorhead

    08/16/2025, 2:30 AM
    documentation links from nightlies seem to be down: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.12/docs/try-flink-kubernetes-operator/quick-start/
  • h

    Harish Sharma

    08/18/2025, 7:15 AM
    Hi Team , I am missing anything with PostgresCDC for Incermental Source. I do not see any change is printing with upsert.
    Copy code
    JdbcIncrementalSource<String> pgSource =
                        PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
                                .hostname("localhost")
                                .port(5432)
                                .database("postgres")
                                .schemaList("public")
                                .tableList("public.orders")
                                .username("postgres")
                                .password("postgres")
                                .slotName("orders_slot")
                                .decodingPluginName("pgoutput")
    
                                // optional: chunk key if no PK; see docs
                                //.chunkKeyColumn("id")
                                .deserializer(deserializer)
                                .build();
    
                DataStreamSource<String> stream =
                        env.fromSource(pgSource, WatermarkStrategy.noWatermarks(), "pg-cdc-incremental");
    
                stream.print();
                env.execute();
  • f

    Fabio Carvalho

    08/18/2025, 9:51 AM
    Hello guys! I need some light on a problem. Thanks in advance if you can share your 2 cents here. We're using Flink with FlinkOperator v1.11 in our K8s clusters (dev, qa, uat, prod). In dev cluster, the FlinkDeployments just stopped creating the configMaps with the configuration files (Flink config and pod template) that are mounted in the pods - then the deployment fails as it cannot mount the files. This started all of a sudden, the stack trace in FlinkOperator logs (in DEBUG) are insufficient and we couldn't pinpoint anything further. Tried to upgrade FlinkOperator to v.1.12 with no success. The codebase is the same for all K8s clusters, just changing some values regarding databases and resources to connect to. We rebuilt the dev K8s cluster and it worked fine for the extent of a morning, then started doing it again. Any ideas?
    • 1
    • 1
  • f

    Fabio Carvalho

    08/18/2025, 9:53 AM
    BTW, on each K8s cluster, we have now 19 Flink clusters in K8s HA managed by ArgoCD.
  • j

    Jina Mizrahi

    08/18/2025, 12:14 PM
    Hey all, I am facing an issue with flink-connector-jdbc-core:4.0.0-2.0 and clickhouseclickhouse jdbc0.8.6. I have created JdbcSink using ClickHouseDriver and every time the SimpleBatchStatementExecutor::executeBatch is been called, we insert all the previous batches as well. I looked at the code and saw that there is no clear of the field batchValues (of com.clickhouse.jdbc.PreparedStatementImpl) in any phase, only when we create new preparedStatement of course. So using both of them together is not possible since we insert each batch in every insert command. Is it a known issue? is there any fix for it? can we create new PreparedStatement after each executeBatch function call?
  • r

    raphaelauv

    08/18/2025, 12:26 PM
    question : FLINK & confluent schema registry hi all , how can I set schema-registry settings on the kafka producer like
    auto.register.schemas
    or
    use.schema.id
    ... ? I have share a full code example in stackoverflow https://stackoverflow.com/questions/79738462/flink-confluentregistryavroserializationschema-not-respecting-registryconfigs Thanks all
    • 1
    • 1
  • r

    Rehan Sayed

    08/19/2025, 9:28 AM
    Hey folks, The TaskManager pods in my flink pipeline that use the RocksDB state backend have been consistently experiencing very high memory utilization, with memory usage remaining steadily around 95% over an extended period. Despite normal fluctuations observed in JVM heap memory, the overall memory consumption within the pods stays persistently high, which has been leading to resource pressure and intermittent pod failures due to OOM Using flink 1.18.1, flink-kubernetes-operator version 1.11.0, jemalloc as memory allocator and n2d-highmem-32 VMs Following are the task manager configs:
    Copy code
    taskManager:
      managedMemoryFraction: "0.01"
      nodeSelector:
        flinknode: taskmanager
        type: highmem32
      tolerations:
        - key: "flinknode"
          operator: "Equal"
          value: "taskmanager"
          effect: "NoSchedule"
      taskSlots: 32
      resource:
        memoryFactor: 110
        processMemory: 220000 # 220GB. 245 GB is allocatable.Total memory assigned to pod is 220*1.1=242 GB
        jvmMetaspace: 256
        offHeap: 16000 # 16 Gb
        cpu: 30
    These are the current rocksDB configs :
    Copy code
    rocksDbSettings: |
      state.backend.type: "rocksdb"
      state.backend.incremental: "true"
      state.backend.rocksdb.checkpoint.transfer.thread.num: "8"
      state.backend.rocksdb.thread.num: "8"
      state.backend.rocksdb.memory.managed: "false"
      state.backend.rocksdb.writebuffer.size: "256mb"
      state.backend.rocksdb.block.cache-size: "128mb"
      state.backend.rocksdb.block.blocksize: "64kb"
      state.backend.rocksdb.writebuffer.count: "4"
      state.backend.rocksdb.writebuffer.number-to-merge: "2"
      state.backend.rocksdb.files.open: "400"
      state.backend.rocksdb.predefined-options: "FLASH_SSD_OPTIMIZED"
    Attached are diagrams of the block cache usage alongside the task manager memory graph (where increases in block cache size coincide with pod OOMs) and the heap usage graph I would appreciate any advice on how to investigate or resolve this issue.