aditya arya
08/04/2025, 8:33 PMVikas Patil
08/04/2025, 9:27 PMflink-k8s-operator
version 1.12
and we notice a warning "Job Not Found" in the k8s events of the target namespace. The operator is able to see the correct jobId. I can see it in the logs. But for some reason it does use it, but instead uses some other value. Any idea what is going on ?
Relevant logs here:
flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:24:39,941 o.a.f.r.r.RestClient [DEBUG] Received response {"jobs":[{"jid":"000000002edab9a50000000000000000","name":"streaming-canary-job","start-time":1754342059808,"end-time":-1,"duration":620135,"state":"RUNNING","last-modification":1754342500559,"tasks":{"running":4,"canceling":0,"canceled":0,"total":4,"created":0,"scheduled":0,"deploying":0,"reconciling":0,"finished":0,"initializing":0,"failed":0}}]}.
// more logs here
flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:25:25,158 o.a.f.r.r.RestClient [DEBUG] Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to flink-canary-rest.flink-canary:8081/v1/jobs/overview
flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:25:25,197 o.a.f.r.r.RestClient [DEBUG] Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to flink-canary-rest.flink-canary:8081/v1/config
flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:25:25,202 o.a.f.r.r.RestClient [DEBUG] Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to flink-canary-rest.flink-canary:8081/v1/taskmanagers
flink-kubernetes-operator-569944464f-4dd8c flink-kubernetes-operator 2025-08-04 21:25:25,210 o.a.f.r.r.RestClient [DEBUG] Sending request of class class org.apache.flink.runtime.rest.messages.EmptyRequestBody to flink-canary-rest.flink-canary:8081/v1/jobs/9bfdca22ec0b09ca745d20cd99c5cb6a/checkpoints
Krishnakumar K
08/05/2025, 2:55 PMKafkaOffsetsInitializer.committed_offsets()
File "/opt/flink/usrlib/uco_flink_job/flink_job.py", line 507, in main
.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets())
File "/opt/flink/opt/python/pyflink.zip/pyflink/datastream/connectors/kafka.py", line 750, in committed_offsets
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 146, in deco
File "/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 330, in get_return_value
py4j.protocol.Py4JError: An error occurred while calling z:org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer.committedOffsets. Trace:
org.apache.flink.api.python.shaded.py4j.Py4JException: Method committedOffsets([class org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy]) does not exist
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:321)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:342)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:276)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
org.apache.flink.client.program.ProgramAbortException: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:134)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:1026)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1270)
at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$10(CliFrontend.java:1367)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1367)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1335)
Caused by: java.lang.RuntimeException: Python process exits with code: 1
at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:124)
... 14 more
Has anyone faced a similar issue before or have built pipelines that kafka source by setting offset to committed_offsets
.
these are the jar files i'm using and their versions :-
flink-connector-kafka-3.3.0-1.20.jar
flink-sql-connector-kafka-3.3.0-1.20.jar
kafka-clients-3.4.0.jar
Thanks!Renan Nogueira
08/05/2025, 3:09 PMskipping database 'sql-db-dev-dbname' due to error reading tables: Incorrect syntax near '-'
This prevents Flink from reading any table from that database using CDC.
Is there any workaround or configuration that allows using databases with hyphens in their names, without having to rename the database?Ethan Brown
08/05/2025, 9:35 PMflink-s3-fs-presto
(s3p://
) for state & sinks
Observed
1. presto.s3.credentials-provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
→ JM dies (NoSuchMethodException
on that class).
2. org.apache.flink.fs.s3.common.token.DynamicTemporaryAWSCredentialsProvider
→ starts only if s3.roleArn
is also set, which defeats IRSA.
Questions
• Is IRSA officially supported by Presto FS yet?
• If not, is the workaround to keep checkpoints/HA on Hadoop S3A with IRSA and use Presto only for sinks (static keys)?
• Any JIRA/PR tracking full IRSA support?
Thanks!Vikas Patil
08/06/2025, 4:08 AMMarco Villalobos
08/06/2025, 4:15 AMfinal JsonDeserializationSchema<ObjectNode> jsonDeserializationSchema = new JsonDeserializationSchema<>(ObjectNode.class);
final KafkaSource<ObjectNode> kafkaSource = KafkaSource.<ObjectNode>builder()
.setBootstrapServers(kafkaBootstrapServers)
.setTopics(kafkaTopic)
.setGroupId(kafkaGroupId)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(jsonDeserializationSchema)
.build();
but when I run it I get the following stack trace:
2025-08-06 04:09:44,311 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: flame-inf kafka source -> Sink: Print to Std. Out (1/2)#0 (e119be7364394210a5290c812216a46b_9a1186299580bdec0729e3077675ce1a_0_0) switched from RUNNING to FAILED with failure cause:
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) ~[blob_p-da5c3d686c1802bf53f64aa7321d844d6ada3eb1-fc90b5255bf6e1738c8dc5257b589a2f:?]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) ~[blob_p-da5c3d686c1802bf53f64aa7321d844d6ada3eb1-fc90b5255bf6e1738c8dc5257b589a2f:?]
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203) ~[flink-connector-files-1.20.2.jar:1.20.2]
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:443) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) [flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) [flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) [flink-dist-1.20.2.jar:1.20.2]
at java.base/java.lang.Thread.run(Unknown Source) [?:?]
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Conflicting setter definitions for property "all": com.fasterxml.jackson.databind.node.ObjectNode#setAll(com.fasterxml.jackson.databind.node.ObjectNode) vs com.fasterxml.jackson.databind.node.ObjectNode#setAll(java.util.Map)
at [Source: (byte[])"{"type": "xxxx", "sha256": "not used", "frame": 1, "top_left": {"x": 101, "y": 1}, "bottom_right": {"x": 390, "y": 189}}"; line: 1, column: 1]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1909) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:268) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:244) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:142) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:648) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:4861) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4731) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3738) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.formats.json.JsonDeserializationSchema.deserialize(JsonDeserializationSchema.java:69) ~[flink-json-1.20.2.jar:1.20.2]
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) ~[flink-dist-1.20.2.jar:1.20.2]
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) ~[blob_p-da5c3d686c1802bf53f64aa7321d844d6ada3eb1-fc90b5255bf6e1738c8dc5257b589a2f:?]
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) ~[blob_p-da5c3d686c1802bf53f64aa7321d844d6ada3eb1-fc90b5255bf6e1738c8dc5257b589a2f:?]
... 14 more
I'm also wondering if it is because print does not know how to serialize ObjectNode? Must I register a serializer for it or something?
I even mapped the source into a string before the print like this:
`
jsonNodeSource.map(BaseJsonNode::toString).print();
but I still get this error:
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Conflicting setter definitions for property "all": com.fasterxml.jackson.databind.node.ObjectNode#setAll(java.util.Map) vs com.fasterxml.jackson.databind.node.ObjectNode#setAll(com.fasterxml.jackson.databind.node.ObjectNode)
מייקי בר יעקב
08/07/2025, 7:49 AMGrzegorz Liter
08/07/2025, 10:35 AMAnshuta Awasthi
08/09/2025, 10:50 AMמייקי בר יעקב
08/09/2025, 2:52 PMFabrizzio Chavez
08/09/2025, 8:40 PMCREATE TABLE users (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
operation STRING METADATA FROM 'row_kind' VIRTUAL,
_id STRING,
fullDocument STRING,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'connection.options' = 'directConnection=true&tls=false',
'username' = 'admin',
'password' = 'admin',
'database' = 'cdc-test',
'collection' = 'users',
'scan.full-changelog' = 'true'
);
I don't want to deserialize because I need to store the raw data through a jdbc sink, fullDocument is returning nullMohammadReza Shahmorady
08/10/2025, 9:12 PMEvan NotEvan
08/10/2025, 11:09 PMFabrizzio Chavez
08/11/2025, 1:31 AMSET 'sql-client.execution.result-mode' = 'tableau';
CREATE TABLE mongo_users (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
row_kind STRING METADATA FROM 'row_kind' VIRTUAL,
_id STRING,
name STRING,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = 'localhost:27017',
'connection.options' = 'directConnection=true&tls=false',
'username' = 'admin',
'password' = 'admin',
'database' = 'cdc-test',
'collection' = 'users'
);
SELECT * FROM mongo_users;
But I am not sure if the row_kind is returned correctly, because when the document is deleted the row_kind says +U (the previous row_kind), it should not be -D? like the op column added in sql client?Madhusudhan Reddy
08/11/2025, 1:42 PMSELECT
t1.ID,
LISTAGG(
CONCAT_WS(
CHR(31), -- Unit Separator
t1.c1,
t1.c2
),
'\u001E' -- Record Separator
) AS t1_concat
FROM table1 t1
JOIN table2 t2 ON t1.ID = t2.ID
GROUP BY t1.ID
Issue:
This intermediate emission leads to duplicate events — one partial and one final — whereas only the final consolidated state should be emitted.
Question:
How can I make sure that I emit only the latest event that have change instead of emitting
1) Event that doesn't have the row that I am updating
2) Event that have the row that i am updating
Update on one of the row in Flink Table- Operation AGGLIST, Join and GroupBY we see ROWKIND -U +U -U +U
How can I filter for only the last +U and emit event?
Please let me know your view
Here is the code
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);
FlinkConfigurationManager flinkConfigurationManager = new FlinkConfigurationManager(streamExecutionEnvironment);
// Run the query that provides publishable data
Table viewsAggregateQueryResult = streamTableEnvironment.sqlQuery(sqlQuery);
// Process the result rows
DataStream<JsonNode> eventDataStream = streamTableEnvironment
.toChangelogStream(viewsAggregateQueryResult)
.filter(new UpdateBeforeFilter())
.uid(PROC_NAME + "-update-before-filter-" + "-v1")
.name("update before filter:") // filter out -U Rowkind
.map(new RowToJsonNodeMapper(mappings.toString()));
// Output the data after transformation to
eventDataStream
.map(new EventTransformerMapFunction(
configs.getKafkaSchemaHeaders()))
.keyBy(EventData::getKey)
.sinkTo(new KafkaSinkWriter(
getOutboundKafkaSink(configs)));
streamExecutionEnvironment.execute();
Harish Sharma
08/12/2025, 6:00 AMMahesh Gupta
08/12/2025, 8:29 AMcdp-customer-profile-flink-hudi-taskmanager-1-3 @ ip-10-230-15-69.ap-south-1.compute.internal (dataPort=38719).
java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
old:[p-de4b1680d065109d59ef7958483755d33ccd99e2-2edc04a09cff45f9714e59581b6bb2dd]
new:[p-de4b1680d065109d59ef7958483755d33ccd99e2-bb81a92443d3bd2a99dc8fcfaad94962]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350)
at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1059)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:637)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
2025-08-11T05:54:03.092Z [flink-pekko.actor.default-dispatcher-13] INFO o.a.f.r.s.adaptive.AdaptiveScheduler - Restarting job.
java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
old:[p-de4b1680d065109d59ef7958483755d33ccd99e2-2edc04a09cff45f9714e59581b6bb2dd]
new:[p-de4b1680d065109d59ef7958483755d33ccd99e2-bb81a92443d3bd2a99dc8fcfaad94962]
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:437)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$600(BlobLibraryCacheManager.java:373)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:249)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1200(BlobLibraryCacheManager.java:210)
at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:350)
at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1059)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:637)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
and seeing the task manager is not releaseing like mentioned on above ticket
pod/customer-flink-hudi-6c654cddbd-bkx6l 1/1 Running 17 (62m ago) 18h
pod/customer-flink-hudi-taskmanager-17-1 1/1 Running 0 118m
pod/customer-flink-hudi-taskmanager-17-2 1/1 Running 0 118m
pod/customer-flink-hudi-taskmanager-17-3 1/1 Running 0 118m
pod/customer-flink-hudi-taskmanager-17-4 1/1 Running 0 118m
pod/customer-flink-hudi-taskmanager-17-5 1/1 Running 0 118m
pod/customer-flink-hudi-taskmanager-17-6 1/1 Running 0 118m
pod/customer-flink-hudi-taskmanager-18-1 1/1 Running 0 61m
pod/customer-flink-hudi-taskmanager-18-2 1/1 Running 0 61m
pod/customer-flink-hudi-taskmanager-18-3 1/1 Running 0 61m
pod/customer-flink-hudi-taskmanager-18-4 1/1 Running 0 61m
pod/customer-flink-hudi-taskmanager-18-5 1/1 Running 0 61m
pod/customer-flink-hudi-taskmanager-18-6 1/1 Running 0 61m
AMRIT SARKAR
08/13/2025, 5:23 AM$internal.flink.version v1_16
akka.ask.timeout 601s
akka.framesize 41943040b
akka.lookup.timeout 30s
akka.ssl.enabled false
akka.tcp.timeout 610s
blob.server.port 6124
blob.service.ssl.enabled false
classloader.parent-first-patterns.additional org.apache.flink.statefun;org.apache.kafka
classloader.resolve-order parent-first
cluster.evenly-spread-out-slots true
cluster.health-check.checkpoint-progress.enabled true
execution.checkpointing.interval 60s
execution.target kubernetes-session
fs.s3a.aws.credentials.provider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
fs.s3a.multipart.size 10M
heartbeat.rpc-failure-threshold 5
high-availability org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.cluster-id [REDACTED]
high-availability.jobmanager.port 6123
high-availability.storageDir [REDACTED]
internal.cluster.execution-mode NORMAL
jobmanager.execution.failover-strategy region
jobmanager.memory.heap.size 66437775360b
jobmanager.memory.jvm-metaspace.size 1073741824b
jobmanager.memory.jvm-overhead.max 1073741824b
jobmanager.memory.jvm-overhead.min 1073741824b
jobmanager.memory.off-heap.size 134217728b
jobmanager.memory.process.size 64g
jobmanager.rpc.address [REDACTED]
jobmanager.rpc.port 6123
jobstore.expiration-time 86400
kubernetes.cluster-id [REDACTED]
kubernetes.container.image [REDACTED]
kubernetes.container.image.pull-policy Always
kubernetes.flink.conf.dir /opt/flink/conf
kubernetes.internal.jobmanager.entrypoint.class org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint
kubernetes.internal.taskmanager.replicas 2
kubernetes.jobmanager.annotations <http://flinkdeployment.flink.apache.org/generation:18|flinkdeployment.flink.apache.org/generation:18>
kubernetes.jobmanager.cpu 16.0
kubernetes.jobmanager.owner.reference [REDACTED]
kubernetes.jobmanager.replicas 2
kubernetes.namespace [REDACTED]
kubernetes.pod-template-file.jobmanager [REDACTED]
kubernetes.pod-template-file.taskmanager [REDACTED]
kubernetes.rest-service.exposed.type ClusterIP
kubernetes.service-account [REDACTED]
kubernetes.taskmanager.cpu 22.0
metrics.internal.query-service.port 6120
metrics.reporter.prom.factory.class org.apache.flink.metrics.prometheus.PrometheusReporterFactory
metrics.reporter.prom.port 9999
metrics.reporter.prom.whitelist _Custom_Source.0
metrics.reporters prom
parallelism.default 16
resourcemanager.taskmanager-registration.timeout 15min
resourcemanager.taskmanager-timeout 600000
rest.address 10.69.99.191
rest.connection-timeout 120000
rest.flamegraph.enabled true
restart-strategy exponential-delay
restart-strategy.exponential-delay.initial-backoff 60s
restart-strategy.exponential-delay.jitter-factor 0.1
restart-strategy.exponential-delay.max-backoff 2min
restart-strategy.exponential-delay.reset-backoff-threshold 10min
restart-strategy.fixed-delay.attempts 10
s3.entropy.key _entropy_
s3.entropy.length 4
security.ssl.internal.enabled false
security.ssl.rest.enabled false
security.ssl.verify-hostname false
slot.idle.timeout 300000
slot.request.timeout 600000
slotmanager.redundant-taskmanager-num 2
state.backend rocksdb
state.backend.fs.checkpointdir [REDACTED]
state.backend.incremental true
state.backend.local-recovery false
state.backend.rocksdb.block.cache-size 4mb
state.backend.rocksdb.localdir /data/flink/rocksdb
state.backend.rocksdb.memory.managed true
state.backend.rocksdb.memory.write-buffer-ratio 0.70
state.backend.rocksdb.timer-service.factory ROCKSDB
state.backend.rocksdb.writebuffer.size 16mb
state.checkpoints.dir [REDACTED]
state.savepoints.dir [REDACTED]
taskmanager.data.port 6121
taskmanager.data.ssl.enabled false
taskmanager.heartbeat.interval 60000
taskmanager.heartbeat.timeout 180000
taskmanager.memory.framework.heap.size 256mb
taskmanager.memory.framework.off-heap.size 256mb
taskmanager.memory.jvm-metaspace.size 1gb
taskmanager.memory.jvm-overhead.max 10gb
taskmanager.memory.jvm-overhead.min 10gb
taskmanager.memory.managed.size 40gb
taskmanager.memory.network.max 20gb
taskmanager.memory.network.min 20gb
taskmanager.memory.process.size 176g
taskmanager.memory.task.off-heap.size 1gb
taskmanager.network.memory.exclusive-buffers-request-timeout-ms 120000
taskmanager.network.request-backoff.max 120000
taskmanager.network.retries 10
taskmanager.numberOfTaskSlots 44
taskmanager.registration.timeout 30 min
taskmanager.rpc.port 6122
web.cancel.enable false
web.tmpdir [REDACTED]
version OpenJDK 64-Bit Server VM - Azul Systems, Inc. - 11/11.0.21.0.102+1-LTS
arch amd64
options -Dlog4j2.formatMsgNoLookups=true -XX:CompileCommand=quiet -XX:CompileCommand=exclude,example/security/rng/RdRandSecureRandomSpi.generatePRF -Xmx66437775360 -Xms66437775360 -XX:MaxMetaspaceSize=1073741824 -Dlog.file=[REDACTED] -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-console.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
Thank you for your time and expertise.Rushikesh Gulve
08/13/2025, 10:15 AMFile "/opt/flink/usrlib/data_processing_v9/stream_processing/process/prepare_data.py", line 71, in process
filtered_elements = self.filter_elements(raw_data)
File "/opt/flink/usrlib/data_processing_v9/stream_processing/process/prepare_data.py", line 104, in filter_elements
raw_mapping_dict = set(self.raw_io_mapping_dict.keys())
File "/opt/flink/usrlib/data_processing_v9/stream_processing/utils/state_utils/map_state_utils.py", line 66, in keys
return self.map_state.keys()
File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 934, in keys
return self.get_internal_state().keys()
File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 840, in keys
self.remote_data_iterator(IterateType.KEYS))
File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 879, in remote_data_iterator
return self._map_state_handler.lazy_iterator(
File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 437, in lazy_iterator
current_batch, iterator_token = self._iterate_raw(
File "/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/state_impl.py", line 621, in _iterate_raw
data, response_token = self._underlying.get_raw(state_key, continuation_token)
File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1064, in get_raw
response = self._blocking_request(
File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1104, in _blocking_request
raise self._exception
File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/usr/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.10/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 1039, in pull_responses
for response in responses:
File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 543, in __next__
return self._next()
File "/usr/local/lib/python3.10/dist-packages/grpc/_channel.py", line 969, in _next
raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "recvmsg:Connection reset by peer"
debug_error_string = "UNKNOWN:Error received from peer {grpc_message:"recvmsg:Connection reset by peer", grpc_status:14}"
>
This is the error that I am facing. I can see some changes in metrics around the same time but I am not able to find the root cause behind this issue. Can someone guide me with the tuning of configuration parameters. I am using RocksDB backend.L P V
08/13/2025, 10:18 AMCREATE TABLE src.kafka.kafka_transactions (
id BIGINT,
type STRING,
amount DOUBLE
) WITH (
'connector' = 'kafka',
'topic' = 'core-transaction-item',
'properties.bootstrap.servers' = 'generic.kafka.db-dev:9092',
'properties.group.id' = 'flink-sql-transactionitem',
'scan.startup.mode' = 'earliest-offset',
'format' = 'protobuf',
'protobuf.message-class-name' = 'TransactionItem',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USERNAME}" password="${KAFKA_PASSWORD}";'
);
---> but it's not work. Seem flink sql client can not read directly KAFKA_USERNAME even I passed them when start sql client like this:
./bin/sql-client.sh gateway --endpoint 0.0.0.0:8083 -Drest.address=xxxx -Drest.port=8081 \
-DKAFKA_USERNAME="$KAFKA_USERNAME" \
-DKAFKA_PASSWORD="$KAFKA_PASSWORD"
So is there anyway I could pass Kafka credential to flink sql?
This syntax run correctly if I use plaintext for KAKFA_USERNAME / Password instead env variable.מייקי בר יעקב
08/14/2025, 9:08 PMHarish Sharma
08/15/2025, 6:33 AMRichard Moorhead
08/16/2025, 2:30 AMHarish Sharma
08/18/2025, 7:15 AMJdbcIncrementalSource<String> pgSource =
PostgresSourceBuilder.PostgresIncrementalSource.<String>builder()
.hostname("localhost")
.port(5432)
.database("postgres")
.schemaList("public")
.tableList("public.orders")
.username("postgres")
.password("postgres")
.slotName("orders_slot")
.decodingPluginName("pgoutput")
// optional: chunk key if no PK; see docs
//.chunkKeyColumn("id")
.deserializer(deserializer)
.build();
DataStreamSource<String> stream =
env.fromSource(pgSource, WatermarkStrategy.noWatermarks(), "pg-cdc-incremental");
stream.print();
env.execute();
Fabio Carvalho
08/18/2025, 9:51 AMFabio Carvalho
08/18/2025, 9:53 AMJina Mizrahi
08/18/2025, 12:14 PMraphaelauv
08/18/2025, 12:26 PMauto.register.schemas
or use.schema.id
... ?
I have share a full code example in stackoverflow
https://stackoverflow.com/questions/79738462/flink-confluentregistryavroserializationschema-not-respecting-registryconfigs
Thanks allRehan Sayed
08/19/2025, 9:28 AMtaskManager:
managedMemoryFraction: "0.01"
nodeSelector:
flinknode: taskmanager
type: highmem32
tolerations:
- key: "flinknode"
operator: "Equal"
value: "taskmanager"
effect: "NoSchedule"
taskSlots: 32
resource:
memoryFactor: 110
processMemory: 220000 # 220GB. 245 GB is allocatable.Total memory assigned to pod is 220*1.1=242 GB
jvmMetaspace: 256
offHeap: 16000 # 16 Gb
cpu: 30
These are the current rocksDB configs :
rocksDbSettings: |
state.backend.type: "rocksdb"
state.backend.incremental: "true"
state.backend.rocksdb.checkpoint.transfer.thread.num: "8"
state.backend.rocksdb.thread.num: "8"
state.backend.rocksdb.memory.managed: "false"
state.backend.rocksdb.writebuffer.size: "256mb"
state.backend.rocksdb.block.cache-size: "128mb"
state.backend.rocksdb.block.blocksize: "64kb"
state.backend.rocksdb.writebuffer.count: "4"
state.backend.rocksdb.writebuffer.number-to-merge: "2"
state.backend.rocksdb.files.open: "400"
state.backend.rocksdb.predefined-options: "FLASH_SSD_OPTIMIZED"
Attached are diagrams of the block cache usage alongside the task manager memory graph (where increases in block cache size coincide with pod OOMs) and the heap usage graph
I would appreciate any advice on how to investigate or resolve this issue.