George Leonard
06/15/2025, 4:02 PMRushikesh Gulve
06/16/2025, 6:47 AMkind: FlinkDeployment
metadata:
name: data-processing-flink
namespace: flink
spec:
image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
flinkVersion: "v1_20"
serviceAccount: flink
flinkConfiguration:
rest.port: "8081"
jobmanager.rpc.address: "data-processing-flink"
jobmanager.rpc.port: "6123"
taskmanager.numberOfTaskSlots: "2"
# High Availability
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
# Checkpoints and Savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoints.dir: file:///flink-data/savepoints
state.backend: rocksdb
state.backend.incremental: true
rest.profiling.enabled: "true"
env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
podTemplate:
spec:
imagePullSecrets:
- name: ecrscr-credentials
containers:
- name: flink-main-container
imagePullPolicy: Always
envFrom:
- secretRef:
name: redis-secret-main
env:
- name: KAFKA_BROKER
value: "kafka.flink.svc.cluster.local:9092"
- name: BOOTSTRAP_SERVERS
value: "kafka.flink.svc.cluster.local:9092"
- name: MINIO_SERVER
value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
- name: MINIO_USER
value: "minio"
- name: MINIO_PASSWORD
value: "minio123"
- name: MAX_PARALLELISM
value: "128"
volumeMounts:
- mountPath: /flink-data
name: flink-pvc
volumes:
- name: flink-pvc
persistentVolumeClaim:
claimName: flink-data-pvc
securityContext:
runAsUser: 9999
runAsGroup: 9999
jobManager:
resource:
memory: "3072m"
cpu: 0.4
taskManager:
resource:
memory: "3584m"
cpu: 2
replicas: 2
job:
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
upgradeMode: last-state
This is the deployment file I am using. My application is already consuming a lot of resources and I cannot afford allocating more resources to it. Also the checkpointing size is also 2-3 mb for a few successful checkpoint that I observed. Can anyone guide from where can I start to debug this issue. ThanksNick Mosin
06/16/2025, 12:40 PMNick Mosin
06/16/2025, 2:20 PMNick Mosin
06/16/2025, 3:46 PMEnabling required built-in plugins
Linking flink-s3-fs-presto-1.20.1.jar to plugin directory
Successfully enabled flink-s3-fs-presto-1.20.1.jar
Linking flink-s3-fs-hadoop-1.20.1.jar to plugin directory
Successfully enabled flink-s3-fs-hadoop-1.20.1.jar
and even
Multiple providers loaded with the same prefix: s3. This might lead to unintended consequences, please consider using only one of them.
but anyway I got
Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto.
Tiansu Yu
06/17/2025, 8:06 AMjava.lang.UnsupportedOperationException: Unsupported to derive Schema for type: TIMESTAMP_LTZ(3)
at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:494) ~[flink-sql-avro-1.19.2.jar:1.19.2]
however, this argument is only available for flink avro not flink confluent avro format. Therefore it is unable to cast unix timstamp in avro properly as a source table in Flink, while using confluent-avro format (where you can hook schema registry).
Is this a known bug already?Rushikesh Gulve
06/17/2025, 12:09 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: data-processing-flink
namespace: flink
spec:
image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
flinkVersion: "v1_20"
serviceAccount: flink
flinkConfiguration:
rest.port: "8081"
jobmanager.rpc.address: "data-processing-flink"
jobmanager.rpc.port: "6123"
taskmanager.numberOfTaskSlots: "2"
# High Availability
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
# Checkpoints and Savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoints.dir: file:///flink-data/savepoints
state.backend.type: rocksdb
execution.checkpointing.incremental: "true"
# execution.checkpointing.alignment-timeout: "3000" # ms, after which checkpoint switches to unaligned mode dynamically
#execution.checkpointing.unaligned: "true" # Enable unaligned checkpoints
#rest.profiling.enabled: "true"
#env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
podTemplate:
spec:
imagePullSecrets:
- name: ecrscr-credentials
containers:
- name: flink-main-container
imagePullPolicy: Always
envFrom:
- secretRef:
name: redis-secret-main
env:
- name: KAFKA_BROKER
value: "kafka.flink.svc.cluster.local:9092"
- name: BOOTSTRAP_SERVERS
value: "kafka.flink.svc.cluster.local:9092"
- name: MINIO_SERVER
value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
- name: MINIO_USER
value: "minio"
- name: MINIO_PASSWORD
value: "minio123"
- name: MAX_PARALLELISM
value: "128"
volumeMounts:
- mountPath: /flink-data
name: flink-pvc
volumes:
- name: flink-pvc
persistentVolumeClaim:
claimName: flink-data-pvc
securityContext:
runAsUser: 9999
runAsGroup: 9999
jobManager:
resource:
memory: "3072m"
cpu: 0.4
taskManager:
resource:
memory: "3584m"
cpu: 2
replicas: 2
job:
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
upgradeMode: stateless
I am not able find the root cause or the best way to configure checkpointing. Does anyone have any idea as to how I shall proceed with this?Leong Wai Leong
06/17/2025, 2:10 PMApollo Elon
06/18/2025, 4:05 PMKafkaSourceBuilder<RichCdcMultiplexRecord> kafkaSourceBuilder = KafkaSource.builder();
kafkaSourceBuilder.setTopics(Context.KafkaOptions.topic)
.setBootstrapServers(Context.KafkaOptions.bootstrapServer)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new RichCdcMultiplexRecordDeserializer())
.setGroupId(Context.KafkaOptions.groupId);
public class RichCdcMultiplexRecordDeserializer
implements DeserializationSchema<RichCdcMultiplexRecord> {
@Override
public RichCdcMultiplexRecord deserialize(byte[] message) throws IOException {
RichCdcMultiplexRecord richCdcMultiplexRecord = RichCdcMultiplexRecordDeserializer.deserializeForValue(message);
return richCdcMultiplexRecord;
}
@Override
public boolean isEndOfStream(RichCdcMultiplexRecord nextElement) {
return false;
}
@Override
public TypeInformation<RichCdcMultiplexRecord> getProducedType() {
return TypeInformation.of(RichCdcMultiplexRecord.class);
}
public static RichCdcMultiplexRecord deserializeForValue(byte[] value) {
return KryoSerializerUtil.deserialize(value, RichCdcMultiplexRecord.class);
}
}
I'm sure the Value of the data in Kafka is in the RichCdcMultiplexRecord format. The data has been read from Kafka because the serialization method debug has observed it. However, there was a problem when writing to the next operator. Thank you all!!!Sumit Nekar
06/19/2025, 6:04 AMArtsiom Yudovin
06/20/2025, 10:17 AMSELECT /*+ STATE_TTL('session'='12h', 'userState'='1d', 'authorizedUserState'='365d') */
it looks like it is not working, do anybody have any idea what it can happend?Akash Patel
06/20/2025, 2:04 PMRushikesh Gulve
06/23/2025, 10:31 AM# JRE version: OpenJDK Runtime Environment Temurin-11.0.27+6 (11.0.27+6) (build 11.0.27+6)
2025-06-23 11:43:03.749
#
2025-06-23 11:43:03.749
# SIGSEGV (0xb) at pc=0x00007fa5f75ff898, pid=1, tid=3479
2025-06-23 11:43:03.748
#
2025-06-23 11:43:03.748
# A fatal error has been detected by the Java Runtime Environment:
2025-06-23 11:43:03.748
#
2025-06-23 11:43:03.748
pthread lock: Invalid argument
2025-06-23 11:29:59.808
2025-06-23 09:29:59,808 INFO /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - after minio data_output: WindowProcessComputeIOStates-5-1736416800000, 64558e0e8caa4ebabc262d76e8117256, 1736416800000
During this time, other task managers are logging a failiure in checkpointing.
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-dist-1.20.1.jar:1.20.1]
2025-06-23 11:33:00.661
at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:127) ~[flink-dist-1.20.1.jar:1.20.1]
2025-06-23 11:33:00.661
2025-06-23 09:33:00,634 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - KEYED PROCESS -> (Extract-Timestamp -> Timestamps/Watermarks -> Remove-Timestamp -> Map -> Sink: Writer -> Sink: Committer, Map, Map -> Sink: Writer -> Sink: Committer) (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
2025-06-23 11:33:00.661
2025-06-23 09:33:00,629 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: prepareData->ProcessComputePartTypeTransactions -> _stream_key_by_map_operator (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
2025-06-23 11:33:00.661
java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.
I am guessing the one task manager whcih got unresponsive was responsible for the checkpoint failure which happened due to timeout. Can anyone help with identifying the root cause?Tudor Plugaru
06/24/2025, 1:45 PMtraceparent
?
Currently, from the work I did it seems that it's very very hard to properly do it, unless you pass tracing metadata as part of the POJO that get's passed from operator to operator, but even in this case, you'll have to "activate" the trace in each operator of your pipeline. I found this to be not very DX friendly and very error prone. We're on Flink 1.18 if it's important.
I'm looking for inspiration mainly on how to do it nicer and more DX friendly, so open for suggestions/ideas.
ThanksGeorge Leonard
06/25/2025, 5:32 AMimport org.apache.flink.api.common.serialization.SerializableSerializer;
...
@Override
public SimpleVersionedSerializer<SnmpSourceSplit> getSplitSerializer() {
return new SerializableSerializer<>();
}
@Override
public SimpleVersionedSerializer<List<SnmpSourceSplit>> getEnumeratorCheckpointSerializer() {
return new SerializableSerializer<>();
}
parent pom.xml
<properties>
<flink.version>1.20.1</flink.version>
<java.version>17</java.version>
<maven-compiler.version>3.11.0</maven-compiler.version>
<maven-shade.version>3.5.2</maven-shade.version>
<maven-compiler.source>${java.version}</maven-compiler.source>
<maven-compiler.target>${java.version}</maven-compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
...
local pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Fabrizzio Chavez
06/25/2025, 1:50 PMAdam Richardson
06/26/2025, 3:27 AMorg.apache.flink.api.connector.sink.{Writer,Committer,GlobalCommitter,Sink,...}
). I'm observing strange behavior where consumer group latency on Kafka is close to zero after a checkpoint, but the data in the output Delta table lags significantly. I can see in the TM logs that Delta commits are happening quickly and reliably after each checkpoint, but recent data is still missing.
After some digging I found a reference in an unrelated issue on Flink: "it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline". This lines up very well with the symptoms I'm seeing -- my app checkpoints every 5m, and I'm seeing latency in the output table fluctuating from 5m (immediately after a checkpoint) to 10m (immediately before the next checkpoint).
My questions:
• Is it expected behavior that the committed data consistently lags behind by one checkpoint? Are there any more authoritative docs on this behavior?
• Is there any workaround (config or code changes) to avoid/fix this behavior?
• Does the v2 sink API have this problem?
Thank you!Rushikesh Gulve
06/26/2025, 6:49 AM# JRE version: OpenJDK Runtime Environment Temurin-11.0.27+6 (11.0.27+6) (build 11.0.27+6)
2025-06-23 11:43:03.749
#
2025-06-23 11:43:03.749
# SIGSEGV (0xb) at pc=0x00007fa5f75ff898, pid=1, tid=3479
2025-06-23 11:43:03.748
#
2025-06-23 11:43:03.748
# A fatal error has been detected by the Java Runtime Environment:
2025-06-23 11:43:03.748
#
2025-06-23 11:43:03.748
pthread lock: Invalid argument
2025-06-23 11:29:59.808
2025-06-23 09:29:59,808 INFO /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - after minio data_output: WindowProcessComputeIOStates-5-1736416800000, 64558e0e8caa4ebabc262d76e8117256, 1736416800000
During this time, other task managers are logging a failiure in checkpointing.
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-dist-1.20.1.jar:1.20.1]
2025-06-23 11:33:00.661
at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:127) ~[flink-dist-1.20.1.jar:1.20.1]
2025-06-23 11:33:00.661
2025-06-23 09:33:00,634 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - KEYED PROCESS -> (Extract-Timestamp -> Timestamps/Watermarks -> Remove-Timestamp -> Map -> Sink: Writer -> Sink: Committer, Map, Map -> Sink: Writer -> Sink: Committer) (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
2025-06-23 11:33:00.661
2025-06-23 09:33:00,629 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: prepareData->ProcessComputePartTypeTransactions -> _stream_key_by_map_operator (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
2025-06-23 11:33:00.661
java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.
I am guessing the one task manager whcih got unresponsive was responsible for the checkpoint failure which happened due to timeout. Can anyone help with identifying the root cause?Magdalena Kobusch
06/26/2025, 8:19 AMtEnv.executeSql(
"CREATE CATALOG pulse_table WITH ("
+ "'type'='iceberg',"
+ "'warehouse'='arn:aws:s3tables:us-east-1:058264243434:bucket/kyk-dataeng-table-iceberg',"
+ "'catalog-impl'='org.apache.iceberg.aws.s3tables.S3TablesCatalog',"
+ ")"
);
But that is not working for me.
I have added:
dependencies {
implementation 'software.amazon.awssdk:s3tables:2.29.26'
implementation 'software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.7'
}
But I'm still getting:
Exception in thread "main" java.lang.IllegalArgumentException: Cannot initialize Catalog implementation org.apache.iceberg.aws.s3tables.S3TablesCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.aws.s3tables.S3TablesCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.aws.s3tables.S3TablesCatalog]
Jan Bergeson
06/26/2025, 5:42 PMmy_topic.v1
for the old job, my_topic.v2
for the new job)
• B) An abstraction over Flink deployments so you could automatically have multiple deployments (my_flink_deployment_v1
, my_flink_deployment_v2
) which originate from different versions of the same code (We're using the k8s operator)
So, questions:
• Is there a better way to do this? ^ (maybe the flow is different/easier when using AWS managed Flink?)
• If this is the way to do it, are there any existing libs that could help with automating this flow?dontu balu
06/27/2025, 12:56 AMGeorge Leonard
06/27/2025, 2:27 PMSystem.out.println(
Thread.currentThread().getName() + " SNMP Source Reader initialized. s(Direct System.out)"
);
LOG.debug("{} SNMP Source Reader initialized.",
Thread.currentThread().getName()
);
environment:
- ENV_ROOTLOG_LEVEL=INFO
- ENV_FLINKLOG_LEVEL=INFO
- ENV_SNMPLOG_LEVEL=DEBUG
- ENV_ZOOKEEPERLOG_LEVEL=INFO
- ENV_PEKKOLOG_LEVEL=INFO
- ENV_KAFKALOG_LEVEL=INFO
- ENV_HADOOPLOG_LEVEL=INFO
- FLINK_PROPERTIES_JAVA_OPTS=-Dlog4j.configurationFile=file:///opt/flink/conf/log4j.properties
George Leonard
06/27/2025, 5:26 PMdmitri
06/30/2025, 11:41 AMjava 11.0.26 2025-01-21 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.26+7-LTS-187)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.26+7-LTS-187, mixed mode)
OS: Windows 11
IDE: VSCode
I send the data from the Python producer and let Flink consumer uses the logic as below
1. Read Kafka Stock topic with Datastream API:
This is how I read the kafka topics:
KafkaSource<Stock> kafkaSource = createKafkaSource(env, inputProperties, new JsonDeserializationSchema<>(Stock.class));
DataStream<Stock> stockNoWatermark = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "Kafka source");
2. After that, I convert the data to Table SQL because I prefer to manipulate the data using SQL:
Table stockTableWatermark = tableEnv.fromDataStream(
stockNoWatermark,
Schema.newBuilder()
.column("event_type", DataTypes.STRING())
.column("exchange", <http://DataTypes.INT|DataTypes.INT>())
.column("id", DataTypes.BIGINT())
.column("price", DataTypes.FLOAT())
.column("sequence_number", DataTypes.BIGINT())
.column("size", <http://DataTypes.INT|DataTypes.INT>())
.column("symbol", DataTypes.STRING())
.column("tape", DataTypes.STRING())
.column("timestamp", DataTypes.TIMESTAMP_LTZ(3))
.column("trf_id", DataTypes.STRING())
.column("trf_timestamp", DataTypes.STRING())
.column("actual_timestamp", DataTypes.STRING())
.watermark("timestamp", "`timestamp` - INTERVAL '1' SECOND")
.build()
The result from stockTableWatermark like this:
5> +I[T, 2015, 4991, 158.85, 282034, 95, GOOG, null, +57465-02-11T05:36:48Z, null, null, 2025-06-30 00:10:53.808]
5> +I[T, 4231, 4642, 181.31, 751310, 35, NVDA, null, +57465-02-11T05:36:51Z, null, null, 2025-06-30 00:10:53.811]
5> +I[T, 2692, 2536, 236.31, 435106, 50, AAPL, null, +57465-02-11T05:36:58Z, null, null, 2025-06-30 00:10:53.818]
5> +I[T, 3531, 1780, 137.95, 879217, 15, NVDA, null, +57465-02-11T05:37:31Z, null, null, 2025-06-30 00:10:53.851]
5> +I[T, 2046, 2779, 340.58, 658954, 24, NVDA, null, +57465-02-11T05:37:37Z, null, null, 2025-06-30 00:10:53.857]
3. I aggregated with tumbling based on the column timestamp:
Table resultTable = stockTableWatermark
.window(Tumble.over(lit(1).minutes()).on($("timestamp")).as("window")) // define window
.groupBy($("symbol"), $("window")) // group by key and window
.select(
$("symbol").as("ticker"),
$("window").start(),
$("window").end(),
$("sequence_number").count().as("trades")
);
But why, when I print out the output of resultTable, it shows empty?
And this is the last log message:
14:48:34,340 INFO org.apache.kafka.clients.consumer.ConsumerConfig [] - ConsumerConfig values:
allow.auto.create.topics = true
<http://auto.commit.interval.ms|auto.commit.interval.ms> = 5000
auto.include.jmx.reporter = true
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = flink-dev-group-stock-consumer-4
client.rack =
<http://connections.max.idle.ms|connections.max.idle.ms> = 540000
<http://default.api.timeout.ms|default.api.timeout.ms> = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
<http://fetch.max.wait.ms|fetch.max.wait.ms> = 500
fetch.min.bytes = 1
group.id = flink-dev-group-stock-consumer
group.instance.id = null
<http://heartbeat.interval.ms|heartbeat.interval.ms> = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
<http://max.poll.interval.ms|max.poll.interval.ms> = 300000
max.poll.records = 500
<http://metadata.max.age.ms|metadata.max.age.ms> = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
<http://metrics.sample.window.ms|metrics.sample.window.ms> = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor, class org.apache.kafka.clients.consumer.CooperativeStickyAssignor]
receive.buffer.bytes = 65536
<http://reconnect.backoff.max.ms|reconnect.backoff.max.ms> = 1000
<http://reconnect.backoff.ms|reconnect.backoff.ms> = 50
<http://request.timeout.ms|request.timeout.ms> = 30000
<http://retry.backoff.ms|retry.backoff.ms> = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
<http://sasl.login.connect.timeout.ms|sasl.login.connect.timeout.ms> = null
<http://sasl.login.read.timeout.ms|sasl.login.read.timeout.ms> = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
<http://sasl.login.retry.backoff.max.ms|sasl.login.retry.backoff.max.ms> = 10000
<http://sasl.login.retry.backoff.ms|sasl.login.retry.backoff.ms> = 100
sasl.mechanism = GSSAPI
sasl.oauthbearer.clock.skew.seconds = 30
sasl.oauthbearer.expected.audience = null
sasl.oauthbearer.expected.issuer = null
<http://sasl.oauthbearer.jwks.endpoint.refresh.ms|sasl.oauthbearer.jwks.endpoint.refresh.ms> = 3600000
<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.max.ms> = 10000
<http://sasl.oauthbearer.jwks.endpoint.retry.backoff.ms|sasl.oauthbearer.jwks.endpoint.retry.backoff.ms> = 100
sasl.oauthbearer.jwks.endpoint.url = null
sasl.oauthbearer.scope.claim.name = scope
sasl.oauthbearer.sub.claim.name = sub
sasl.oauthbearer.token.endpoint.url = null
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
<http://session.timeout.ms|session.timeout.ms> = 45000
<http://socket.connection.setup.timeout.max.ms|socket.connection.setup.timeout.max.ms> = 30000
<http://socket.connection.setup.timeout.ms|socket.connection.setup.timeout.ms> = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
14:48:34,413 WARN org.apache.kafka.clients.consumer.ConsumerConfig [] - These configurations '[client.id.prefix, <http://partition.discovery.interval.ms|partition.discovery.interval.ms>, aws.secret.username, aws.database.username, environment, kafka.bootstrap.servers, aws.database.password, aws.database.database, aws.database.hostname, kafka.topic, aws.secret.password]' were supplied but are not used yet.
14:48:34,413 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka version: 3.4.0
14:48:34,414 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka commitId: 2e1947d240607d53
14:48:34,414 INFO org.apache.kafka.common.utils.AppInfoParser [] - Kafka startTimeMs: 1751269714413
14:48:34,427 INFO org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher [] - Starting split fetcher 0
14:48:34,436 INFO org.apache.kafka.clients.consumer.KafkaConsumer [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Assigned to partition(s): dev-stock-topic-0
14:48:34,444 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Seeking to earliest offset of partition dev-stock-topic-0
14:48:34,470 INFO org.apache.kafka.clients.Metadata [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Resetting the last seen epoch of partition dev-stock-topic-0 to 0 since the associated topicId changed from null to jKk4sUaiRfSsg8h4GfqpbQ
14:48:34,471 INFO org.apache.kafka.clients.Metadata [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Cluster ID: MkU3OEVBNTcwNTJENDM2Qk
14:48:34,491 INFO org.apache.kafka.clients.consumer.internals.SubscriptionState [] - [Consumer clientId=flink-dev-group-stock-consumer-4, groupId=flink-dev-group-stock-consumer] Resetting offset for partition dev-stock-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.
As you can see there is no row showing in the console from resultTable.
Could anyone help me address the issue?
Because I assigned the event_time and watermark, I print the output and the timestamp is there. But it is failed to group by window_start and window_end.Jaehyeon Kim
06/30/2025, 9:45 PMCREATE CATALOG demo_ib WITH (
'type' = 'iceberg',
'catalog-type' = 'hive',
'uri' = '<thrift://hive-metastore:9083>'
);
I can create an iceberg table (CREATE TABLE ... USING) and describe it without an issue.
However I cannot create a view from it. i.e.
CREATE VIEW ... AS SELECT ... FROM <iceberg_table> fails with java.lang.IllegalArgumentException: table should be resolved.
The select part is not an issue as EXPLAIN SELECT ... FROM <iceberg_table> works.
Note I can create a view on a hive catalog.
Can you inform me how to fix this issue?Niharika Sakuru (Niha)
07/01/2025, 3:14 PMPythonEnvUtils.java
, and I noticed that it logs the entire environment variable map when launching PyFlink jobs.
This seems like it could be a security issue, especially in Kubernetes setups using the Flink Kubernetes Operator — since secrets are commonly mounted as env vars in pods. That means things like AWS_SECRET_ACCESS_KEY
, DB_PASSWORD
, etc. could end up in plaintext JobManager or TaskManager logs.
📌 Here's an example of what’s being logged:
Starting Python process with environment variables: AWS_SECRET_ACCESS_KEY=..., DB_PASSWORD=...
Has anyone else run into this?
Curious if there's already been discussion or a fix proposed upstream.
Would love thoughts from others who are deploying PyFlink in production or using secrets in K8s environments.
I've created https://issues.apache.org/jira/browse/FLINK-38035 with all detailsL P V
07/02/2025, 9:29 AMMartin Egri
07/02/2025, 3:57 PMjava.lang.ClassCastException: class org.apache.flink.table.types.logical.LegacyTypeInformationType cannot be cast to class org.apache.flink.table.types.logical.DecimalType
during runtime using a homebrew KafkaSource that deserialises Avro data. I'm not sure where the LegacyTypeInformationType comes from; I've googled but the only thing that looks similar is this: https://github.com/apache/flink/pull/11874#discussion_r414251140
I'm doing the conversion like this:
case byteBuffer: ByteBuffer =>
logicalType match
case decimal: LogicalTypes.Decimal =>
val precision = decimal.getPrecision
val scale = decimal.getScale
DecimalConversion().fromBytes(byteBuffer, schema, LogicalTypes.decimal(precision, scale))
case _: LogicalTypes.BigDecimal =>
BigDecimalConversion().fromBytes(byteBuffer, schema, LogicalTypes.bigDecimal)
case _ =>
val bytes = Array.ofDim[Byte](byteBuffer.remaining)
byteBuffer.get(bytes)
bytes
case genericFixed: GenericFixed =>
logicalType match
case decimal: LogicalTypes.Decimal =>
val precision = decimal.getPrecision
val scale = decimal.getScale
DecimalConversion().fromFixed(genericFixed, schema, LogicalTypes.decimal(precision, scale))
case _ => genericFixed.bytes
… and for the `TypeInformation`s I return the corresponding:
case Schema.Type.BYTES =>
avroSchema.getLogicalType match
case _: LogicalTypes.Decimal => Types.BIG_DEC
case _: LogicalTypes.BigDecimal => Types.BIG_DEC
case _ => Types.PRIMITIVE_ARRAY(Types.BYTE)
case Schema.Type.FIXED =>
avroSchema.getLogicalType match
case _: LogicalTypes.Decimal => Types.BIG_DEC
case _ => Types.PRIMITIVE_ARRAY(Types.BYTE)
Vikas Patil
07/02/2025, 7:10 PMSachin
07/04/2025, 5:37 AM