Rushikesh Gulve
06/06/2025, 9:12 AMimport org.apache.flink.runtime.state.KeyGroupRangeAssignment;
public class FlinkKeyGroupMapper {
public static void main(String[] args) {
final int maxParallelism = 128; // Flink maxParallelism
final int parallelism = 10; // Number of subtasks
final int keysNeeded = parallelism;
// Map: subtask index -> key found
java.util.Map<Integer, String> subtaskToKey = new java.util.HashMap<>();
int i = 100;
int configId = 20003;
while (subtaskToKey.size() < keysNeeded) {
String key = configId + "_" + i;
// Compute key group for this key (Flink does MurmurHash inside)
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(key, maxParallelism);
// Find which operator subtask the key group maps to
int subtaskIndex = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup(
maxParallelism, parallelism, keyGroup);
// If this subtask index not assigned a key, assign it
if (!subtaskToKey.containsKey(subtaskIndex)) {
subtaskToKey.put(subtaskIndex, key);
System.out.println("Subtask " + subtaskIndex + " -> Key: " + key);
}
i++;
}
}
}
this is how I have generated the keys, and my key selector in flink also uses the same key. Can anyone guide me in how can I determine which parallel instance of subtask my current key should be redirected to? ThanksSandip Nayak
06/08/2025, 10:32 PMDisaggregated State Management
with ForSt
state backend on Flink 2.0
, even with the following config, during Flink app restoring from state, I see the the full checkpoints are downloaded into the taskmanager pod, do you know what I might be missing?
state.backend.type: "forst"
table.exec.async-state.enabled: "true"
execution.checkpointing.incremental: "true"
table.exec.mini-batch.enabled: "false"
table.optimizer.agg-phase-strategy: "ONE_PHASE"
Yarden BenMoshe
06/10/2025, 7:00 AMMonika Bednarz
06/11/2025, 9:29 AMCREATE TABLE source__topic ( ... ) WITH (
'connector' = 'kafka',
'value.format' = 'avro-confluent',
...
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.mechanism' = 'SCRAM-SHA-512',
'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.scram.ScramLoginModule required username="..." password="...";');
We want to get rid of the explicit config passing.
(if there's some form of authentication that doesn't require this in cleartext, also please point me to it 🙏 )Tiansu Yu
06/11/2025, 12:00 PMDheeraj Panangat
06/11/2025, 1:23 PMMohammadReza Shahmorady
06/12/2025, 1:17 AM<http://linger.ms|linger.ms>=500
batch.size=1048576
compression.type=lz4
However, when there's significant backpressure, the producer doesn't start sending data as expected and takes a very long time.
When I remove the batch.size
configuration, performance slightly improves, but it's still not ideal.
I've attached a screenshot with more details in threads.
Does anyone have any suggestions on how to resolve this?L P V
06/12/2025, 7:51 AMDataStream<Row> merchantQuasiFeatureCreditFilterRow = merchantQuasiFeatureCreditFilter
.map(message -> Row.of(
message.getEntity(), // Field 0
message.getFeatureSet(), // Field 1
message.getTimestamp() , // Field 2
sha256(message.getEntity()) // Field 3
))
.returns(Row.class);
tableEnv.createTemporaryView("datastream_table", merchantQuasiFeatureCreditFilterRow,
Schema.newBuilder()
.columnByExpression("entity", "cast(f0 as string)")
.columnByExpression("proctime", "PROCTIME()") //
.columnByExpression("entityHash", "cast(f3 as string)") // Hashed entity
.build()
);
It show error when deploy Flink job:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Invalid expression for computed column 'entityHash'.
I don't know why because I've already add field 3 to data stream.Rushikesh Gulve
06/12/2025, 9:44 AMapiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: data-processing-flink
namespace: flink
spec:
image: 624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest
flinkVersion: "v1_20"
serviceAccount: flink
flinkConfiguration:
rest.port: "8081"
jobmanager.rpc.address: "data-processing-flink"
jobmanager.rpc.port: "6123"
taskmanager.numberOfTaskSlots: "4"
podTemplate:
spec:
imagePullSecrets:
- name: ecrscr-credentials
containers:
- name: flink-main-container
imagePullPolicy: Always
envFrom:
- secretRef:
name: redis-secret-main
env:
- name: KAFKA_BROKER
value: "kafka.flink.svc.cluster.local:9092"
- name: BOOTSTRAP_SERVERS
value: "kafka.flink.svc.cluster.local:9092"
- name: MINIO_SERVER
value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
- name: MINIO_USER
value: "minio"
- name: MINIO_PASSWORD
value: "minio123"
- name: MAX_PARALLELISM
value: "128"
securityContext:
runAsUser: 9999
runAsGroup: 9999
jobManager:
resource:
memory: "2048m"
cpu: 0.4
taskManager:
resource:
memory: "4096m"
cpu: 2
replicas: 8
job:
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
upgradeMode: stateless
Though I have set 4gb memory explicitly per Task Manager but it ends up using lot more than that and ultimately goes Out Of Memory. I have 2 questions:
Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [<pekko.tcp://flink@192.168.164.1:6122/user/rpc/taskmanager_0>] timed out. This is usually caused by: 1) Pekko failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase pekko.ask.timeout.
1. Can I set any limits to to resource utilization?
2. Is it normal to use this much memory or is there something I can do to reduce consumption?Fabrizzio Chavez
06/12/2025, 2:04 PMFabrizzio Chavez
06/14/2025, 2:52 PMINSERT INTO player_scores_sink
SELECT
playerId,
MAX(score) AS maxScore
FROM
players_source
GROUP BY
playerId;
According to the documentation the group by will generate a stateful app, but my doubt is what is stored? only the playerId and the maxScore (two integer values) ? or the history of events that came from the players_source also will be stored?
I only want to store the playerId and the maxScore so when a new event of this player arrives I want to compare only with the previous aggregation, but not with all historical events to prevent uncontrollable storage growthGeorge Leonard
06/15/2025, 4:02 PMRushikesh Gulve
06/16/2025, 6:47 AMkind: FlinkDeployment
metadata:
name: data-processing-flink
namespace: flink
spec:
image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
flinkVersion: "v1_20"
serviceAccount: flink
flinkConfiguration:
rest.port: "8081"
jobmanager.rpc.address: "data-processing-flink"
jobmanager.rpc.port: "6123"
taskmanager.numberOfTaskSlots: "2"
# High Availability
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
# Checkpoints and Savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoints.dir: file:///flink-data/savepoints
state.backend: rocksdb
state.backend.incremental: true
rest.profiling.enabled: "true"
env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
podTemplate:
spec:
imagePullSecrets:
- name: ecrscr-credentials
containers:
- name: flink-main-container
imagePullPolicy: Always
envFrom:
- secretRef:
name: redis-secret-main
env:
- name: KAFKA_BROKER
value: "kafka.flink.svc.cluster.local:9092"
- name: BOOTSTRAP_SERVERS
value: "kafka.flink.svc.cluster.local:9092"
- name: MINIO_SERVER
value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
- name: MINIO_USER
value: "minio"
- name: MINIO_PASSWORD
value: "minio123"
- name: MAX_PARALLELISM
value: "128"
volumeMounts:
- mountPath: /flink-data
name: flink-pvc
volumes:
- name: flink-pvc
persistentVolumeClaim:
claimName: flink-data-pvc
securityContext:
runAsUser: 9999
runAsGroup: 9999
jobManager:
resource:
memory: "3072m"
cpu: 0.4
taskManager:
resource:
memory: "3584m"
cpu: 2
replicas: 2
job:
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
upgradeMode: last-state
This is the deployment file I am using. My application is already consuming a lot of resources and I cannot afford allocating more resources to it. Also the checkpointing size is also 2-3 mb for a few successful checkpoint that I observed. Can anyone guide from where can I start to debug this issue. ThanksNick Mosin
06/16/2025, 12:40 PMNick Mosin
06/16/2025, 2:20 PMNick Mosin
06/16/2025, 3:46 PMEnabling required built-in plugins
Linking flink-s3-fs-presto-1.20.1.jar to plugin directory
Successfully enabled flink-s3-fs-presto-1.20.1.jar
Linking flink-s3-fs-hadoop-1.20.1.jar to plugin directory
Successfully enabled flink-s3-fs-hadoop-1.20.1.jar
and even
Multiple providers loaded with the same prefix: s3. This might lead to unintended consequences, please consider using only one of them.
but anyway I got
Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop, flink-s3-fs-presto.
Tiansu Yu
06/17/2025, 8:06 AMjava.lang.UnsupportedOperationException: Unsupported to derive Schema for type: TIMESTAMP_LTZ(3)
at org.apache.flink.formats.avro.typeutils.AvroSchemaConverter.convertToSchema(AvroSchemaConverter.java:494) ~[flink-sql-avro-1.19.2.jar:1.19.2]
however, this argument is only available for flink avro not flink confluent avro format. Therefore it is unable to cast unix timstamp in avro properly as a source table in Flink, while using confluent-avro format (where you can hook schema registry).
Is this a known bug already?Rushikesh Gulve
06/17/2025, 12:09 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: data-processing-flink
namespace: flink
spec:
image: <http://624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest|624083781302.dkr.ecr.ap-south-1.amazonaws.com/product/data-streaming-flink:latest>
flinkVersion: "v1_20"
serviceAccount: flink
flinkConfiguration:
rest.port: "8081"
jobmanager.rpc.address: "data-processing-flink"
jobmanager.rpc.port: "6123"
taskmanager.numberOfTaskSlots: "2"
# High Availability
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
high-availability.storageDir: file:///flink-data/ha
# Checkpoints and Savepoints
state.checkpoints.dir: file:///flink-data/checkpoints
state.savepoints.dir: file:///flink-data/savepoints
state.backend.type: rocksdb
execution.checkpointing.incremental: "true"
# execution.checkpointing.alignment-timeout: "3000" # ms, after which checkpoint switches to unaligned mode dynamically
#execution.checkpointing.unaligned: "true" # Enable unaligned checkpoints
#rest.profiling.enabled: "true"
#env.java.opts.taskmanager: "-Dorg.apache.beam.sdk.fn=DEBUG -Dorg.apache.beam.runners.fnexecution.control.FnApiControlClient=DEBUG"
podTemplate:
spec:
imagePullSecrets:
- name: ecrscr-credentials
containers:
- name: flink-main-container
imagePullPolicy: Always
envFrom:
- secretRef:
name: redis-secret-main
env:
- name: KAFKA_BROKER
value: "kafka.flink.svc.cluster.local:9092"
- name: BOOTSTRAP_SERVERS
value: "kafka.flink.svc.cluster.local:9092"
- name: MINIO_SERVER
value: "<http://myminio-hl.flink.svc.cluster.local:9000>"
- name: MINIO_USER
value: "minio"
- name: MINIO_PASSWORD
value: "minio123"
- name: MAX_PARALLELISM
value: "128"
volumeMounts:
- mountPath: /flink-data
name: flink-pvc
volumes:
- name: flink-pvc
persistentVolumeClaim:
claimName: flink-data-pvc
securityContext:
runAsUser: 9999
runAsGroup: 9999
jobManager:
resource:
memory: "3072m"
cpu: 0.4
taskManager:
resource:
memory: "3584m"
cpu: 2
replicas: 2
job:
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-pyclientexec", "/usr/bin/python3", "-py", "/opt/flink/usrlib/data_processing_v9/stream_processing/process_dependency/create_env.py"]
upgradeMode: stateless
I am not able find the root cause or the best way to configure checkpointing. Does anyone have any idea as to how I shall proceed with this?Leong Wai Leong
06/17/2025, 2:10 PMApollo Elon
06/18/2025, 4:05 PMKafkaSourceBuilder<RichCdcMultiplexRecord> kafkaSourceBuilder = KafkaSource.builder();
kafkaSourceBuilder.setTopics(Context.KafkaOptions.topic)
.setBootstrapServers(Context.KafkaOptions.bootstrapServer)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new RichCdcMultiplexRecordDeserializer())
.setGroupId(Context.KafkaOptions.groupId);
public class RichCdcMultiplexRecordDeserializer
implements DeserializationSchema<RichCdcMultiplexRecord> {
@Override
public RichCdcMultiplexRecord deserialize(byte[] message) throws IOException {
RichCdcMultiplexRecord richCdcMultiplexRecord = RichCdcMultiplexRecordDeserializer.deserializeForValue(message);
return richCdcMultiplexRecord;
}
@Override
public boolean isEndOfStream(RichCdcMultiplexRecord nextElement) {
return false;
}
@Override
public TypeInformation<RichCdcMultiplexRecord> getProducedType() {
return TypeInformation.of(RichCdcMultiplexRecord.class);
}
public static RichCdcMultiplexRecord deserializeForValue(byte[] value) {
return KryoSerializerUtil.deserialize(value, RichCdcMultiplexRecord.class);
}
}
I'm sure the Value of the data in Kafka is in the RichCdcMultiplexRecord format. The data has been read from Kafka because the serialization method debug has observed it. However, there was a problem when writing to the next operator. Thank you all!!!Sumit Nekar
06/19/2025, 6:04 AMArtsiom Yudovin
06/20/2025, 10:17 AMSELECT /*+ STATE_TTL('session'='12h', 'userState'='1d', 'authorizedUserState'='365d') */
it looks like it is not working, do anybody have any idea what it can happend?Akash Patel
06/20/2025, 2:04 PMRushikesh Gulve
06/23/2025, 10:31 AM# JRE version: OpenJDK Runtime Environment Temurin-11.0.27+6 (11.0.27+6) (build 11.0.27+6)
2025-06-23 11:43:03.749
#
2025-06-23 11:43:03.749
# SIGSEGV (0xb) at pc=0x00007fa5f75ff898, pid=1, tid=3479
2025-06-23 11:43:03.748
#
2025-06-23 11:43:03.748
# A fatal error has been detected by the Java Runtime Environment:
2025-06-23 11:43:03.748
#
2025-06-23 11:43:03.748
pthread lock: Invalid argument
2025-06-23 11:29:59.808
2025-06-23 09:29:59,808 INFO /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - after minio data_output: WindowProcessComputeIOStates-5-1736416800000, 64558e0e8caa4ebabc262d76e8117256, 1736416800000
During this time, other task managers are logging a failiure in checkpointing.
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-dist-1.20.1.jar:1.20.1]
2025-06-23 11:33:00.661
at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:127) ~[flink-dist-1.20.1.jar:1.20.1]
2025-06-23 11:33:00.661
2025-06-23 09:33:00,634 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - KEYED PROCESS -> (Extract-Timestamp -> Timestamps/Watermarks -> Remove-Timestamp -> Map -> Sink: Writer -> Sink: Committer, Map, Map -> Sink: Writer -> Sink: Committer) (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
2025-06-23 11:33:00.661
2025-06-23 09:33:00,629 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: prepareData->ProcessComputePartTypeTransactions -> _stream_key_by_map_operator (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
2025-06-23 11:33:00.661
java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.
I am guessing the one task manager whcih got unresponsive was responsible for the checkpoint failure which happened due to timeout. Can anyone help with identifying the root cause?Tudor Plugaru
06/24/2025, 1:45 PMtraceparent
?
Currently, from the work I did it seems that it's very very hard to properly do it, unless you pass tracing metadata as part of the POJO that get's passed from operator to operator, but even in this case, you'll have to "activate" the trace in each operator of your pipeline. I found this to be not very DX friendly and very error prone. We're on Flink 1.18 if it's important.
I'm looking for inspiration mainly on how to do it nicer and more DX friendly, so open for suggestions/ideas.
ThanksGeorge Leonard
06/25/2025, 5:32 AMimport org.apache.flink.api.common.serialization.SerializableSerializer;
...
@Override
public SimpleVersionedSerializer<SnmpSourceSplit> getSplitSerializer() {
return new SerializableSerializer<>();
}
@Override
public SimpleVersionedSerializer<List<SnmpSourceSplit>> getEnumeratorCheckpointSerializer() {
return new SerializableSerializer<>();
}
parent pom.xml
<properties>
<flink.version>1.20.1</flink.version>
<java.version>17</java.version>
<maven-compiler.version>3.11.0</maven-compiler.version>
<maven-shade.version>3.5.2</maven-shade.version>
<maven-compiler.source>${java.version}</maven-compiler.source>
<maven-compiler.target>${java.version}</maven-compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
...
local pom.xml
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Fabrizzio Chavez
06/25/2025, 1:50 PMAdam Richardson
06/26/2025, 3:27 AMorg.apache.flink.api.connector.sink.{Writer,Committer,GlobalCommitter,Sink,...}
). I'm observing strange behavior where consumer group latency on Kafka is close to zero after a checkpoint, but the data in the output Delta table lags significantly. I can see in the TM logs that Delta commits are happening quickly and reliably after each checkpoint, but recent data is still missing.
After some digging I found a reference in an unrelated issue on Flink: "it is expected to global committer architecture lag one commit behind in reference to rest of the pipeline". This lines up very well with the symptoms I'm seeing -- my app checkpoints every 5m, and I'm seeing latency in the output table fluctuating from 5m (immediately after a checkpoint) to 10m (immediately before the next checkpoint).
My questions:
• Is it expected behavior that the committed data consistently lags behind by one checkpoint? Are there any more authoritative docs on this behavior?
• Is there any workaround (config or code changes) to avoid/fix this behavior?
• Does the v2 sink API have this problem?
Thank you!Rushikesh Gulve
06/26/2025, 6:49 AM# JRE version: OpenJDK Runtime Environment Temurin-11.0.27+6 (11.0.27+6) (build 11.0.27+6)
2025-06-23 11:43:03.749
#
2025-06-23 11:43:03.749
# SIGSEGV (0xb) at pc=0x00007fa5f75ff898, pid=1, tid=3479
2025-06-23 11:43:03.748
#
2025-06-23 11:43:03.748
# A fatal error has been detected by the Java Runtime Environment:
2025-06-23 11:43:03.748
#
2025-06-23 11:43:03.748
pthread lock: Invalid argument
2025-06-23 11:29:59.808
2025-06-23 09:29:59,808 INFO /usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - after minio data_output: WindowProcessComputeIOStates-5-1736416800000, 64558e0e8caa4ebabc262d76e8117256, 1736416800000
During this time, other task managers are logging a failiure in checkpointing.
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorImpl.loop(ChannelStateWriteRequestExecutorImpl.java:182) ~[flink-dist-1.20.1.jar:1.20.1]
2025-06-23 11:33:00.661
at org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl.dispatchInternal(ChannelStateWriteRequestDispatcherImpl.java:127) ~[flink-dist-1.20.1.jar:1.20.1]
2025-06-23 11:33:00.661
2025-06-23 09:33:00,634 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - KEYED PROCESS -> (Extract-Timestamp -> Timestamps/Watermarks -> Remove-Timestamp -> Map -> Sink: Writer -> Sink: Committer, Map, Map -> Sink: Writer -> Sink: Committer) (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
2025-06-23 11:33:00.661
2025-06-23 09:33:00,629 INFO org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - Source: prepareData->ProcessComputePartTypeTransactions -> _stream_key_by_map_operator (5/40)#0 - asynchronous part of checkpoint 15 could not be completed.
2025-06-23 11:33:00.661
java.util.concurrent.ExecutionException: org.apache.flink.runtime.checkpoint.CheckpointException: The checkpoint was aborted due to exception of other subtasks sharing the ChannelState file.
I am guessing the one task manager whcih got unresponsive was responsible for the checkpoint failure which happened due to timeout. Can anyone help with identifying the root cause?Magdalena Kobusch
06/26/2025, 8:19 AMtEnv.executeSql(
"CREATE CATALOG pulse_table WITH ("
+ "'type'='iceberg',"
+ "'warehouse'='arn:aws:s3tables:us-east-1:058264243434:bucket/kyk-dataeng-table-iceberg',"
+ "'catalog-impl'='org.apache.iceberg.aws.s3tables.S3TablesCatalog',"
+ ")"
);
But that is not working for me.
I have added:
dependencies {
implementation 'software.amazon.awssdk:s3tables:2.29.26'
implementation 'software.amazon.s3tables:s3-tables-catalog-for-iceberg:0.1.7'
}
But I'm still getting:
Exception in thread "main" java.lang.IllegalArgumentException: Cannot initialize Catalog implementation org.apache.iceberg.aws.s3tables.S3TablesCatalog: Cannot find constructor for interface org.apache.iceberg.catalog.Catalog
Missing org.apache.iceberg.aws.s3tables.S3TablesCatalog [java.lang.ClassNotFoundException: org.apache.iceberg.aws.s3tables.S3TablesCatalog]