FAS
10/22/2025, 4:45 AMassume-role is handled by the Iceberg catalog.
Scenario
1. Account #1: Runs a Flink (1.19.1) job on an EKS cluster.
2. Account #2: Hosts Iceberg tables in an S3 bucket (<s3://account-2-bucket-iceberg/dbstore1/>) and manages metadata using the AWS Glue Catalog (awsAccount2Id).
3. Permissions:
◦ The Flink EKS pod in Account #1 has a Service Account configured with OIDC.
◦ This Service Account assumes a cross-account role (arn:aws:iam::awsAccount2Id:role/cross-account-role) in Account #2.
4. Verification:
◦ I have `exec`'d into the running Flink pod.
◦ From the pod, I can successfully use the AWS CLI to assume the cross-account role.
◦ After assuming the role, I can successfully list the Glue databases and tables in Account #2.
◦ This confirms the underlying EKS OIDC, IAM roles, and network access are all correctly configured.
The Challenge
In my Flink job, I first define the catalog for Account #2.
1. Create Catalog (Success) This SQL statement executes successfully, and the Flink logs confirm it: 2025-10-22 03:57:00,929 INFO ... - SQL statement executed successfully. sql=CREATE CATALOG \awsAccount2Id ...``
SQL
CREATE CATALOG `awsAccount2Id`
WITH (
'type' = 'iceberg',
'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
'warehouse' = '<s3://account-2-bucket-iceberg/dbstore1/>',
'client.assume-role.arn' = 'arn:aws:iam::awsAccount2Id:role/cross-account-role',
'glue.catalog-id' = 'awsAccount2Id',
'client.region' = 'us-east-1',
'client.credentials-provider' = 'software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider'
);
2. Select from Catalog (Failure) Immediately after the catalog is created, my Flink job executes the following SELECT query:
SQL
SELECT
....
FROM `awsAccount2Id`.`dbstore1`.table1
/*+ OPTIONS('streaming'='true', 'monitor-interval'='30s') */;
This query fails with a validation error:
2025-10-22 03:57:06,710 ERROR ... - Failed to execute SQL statement:
SELECT ...
FROM `awsAccount2Id`.`dbstore1`.table1 ...
;
org.apache.flink.table.api.ValidationException: SQL validation failed. From line 11, column 6 to line 11, column 59: Object 'dbstore1' not found within 'awsAccount2Id'
I also noticed that when Flink logs the list of available databases, it only shows databases from Account #1, not the cross-account ones from Account #2.
My Question
My expectation was that by defining client.assume-role.arn and glue.catalog-id in the CREATE CATALOG statement, any subsequent Flink SQL operations referencing the awsAccount2Id catalog (like my SELECT query) would automatically use those settings to assume the role and query the Glue catalog in Account #2.
Why is Flink reporting that the database dbstore1 is "not found," even though the catalog was created successfully and configured to assume a role that can see that database? i can see tables from this database when i manually assume-role using aws-cli from that pod.
It seems the SELECT query is not honoring the catalog's assume-role configuration and is somehow still querying the default Glue catalog in Account #1. Is this expected, or am I missing a configuration step for Flink to correctly use the assumed role for metadata discovery after the catalog is created?Jaya Ananthram
10/22/2025, 9:10 AMמייקי בר יעקב
10/22/2025, 11:18 PMElad
10/23/2025, 8:43 AMEric Huang
10/23/2025, 1:36 PM2025-10-23 14:05:32,327 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (21/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_20_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:05:32,327 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (90/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_89_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:05:32,327 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (59/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_58_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:05:32,327 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (17/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_16_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:05:32,400 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - DelayableMessageProcess -> Sink: xiaoxiang_reach_system (1/1) (dcdc4daa8ced8ca9d2b8fc6c58e26129_0a53a086337bb3f8a33ad689643a92fc_0_1) switched from INITIALIZING to RUNNING.
2025-10-23 14:10:28,717 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 191 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1761199828716 for job ffffffffbd30e5570000000000000001.
2025-10-23 14:10:30,023 INFO org.apache.flink.runtime.state.SharedStateRegistryImpl [] - state self-sustained:true, lastCompletedCheckpoint:191, earliestDependent:9223372036854775807, highestNotClaimedCheckpointID:-1
2025-10-23 14:10:30,023 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - job ffffffffbd30e5570000000000000001 checkpoint 191 completed, job is state-sustained
2025-10-23 14:10:30,207 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 191 for job ffffffffbd30e5570000000000000001 (280739389 bytes, checkpointDuration=1425 ms, finalizationTime=66 ms).
2025-10-23 14:15:28,717 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 192 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1761200128716 for job ffffffffbd30e5570000000000000001.
2025-10-23 14:15:29,030 INFO org.apache.flink.runtime.state.SharedStateRegistryImpl [] - state self-sustained:true, lastCompletedCheckpoint:192, earliestDependent:9223372036854775807, highestNotClaimedCheckpointID:-1
2025-10-23 14:15:29,030 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - job ffffffffbd30e5570000000000000001 checkpoint 192 completed, job is state-sustained
2025-10-23 14:15:29,096 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 192 for job ffffffffbd30e5570000000000000001 (317081932 bytes, checkpointDuration=335 ms, finalizationTime=45 ms).
2025-10-23 14:16:37,533 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Match[7] -> Calc[8] -> SinkConversion[9] (12/96) (dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_11_1) switched from RUNNING to FAILED on session-2123414-1761142270-taskmanager-1-10 @ hldy-data-k8s-flink-ssd-node03895.mt (dataPort=23347).
java.lang.NullPointerException: null
at org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor.materializeMatch(SharedBufferAccessor.java:213) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.cep.nfa.NFA.processMatchesAccordingToSkipStrategy(NFA.java:474) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.cep.nfa.NFA.advanceTime(NFA.java:337) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.cep.operator.CepOperator.advanceTime(CepOperator.java:429) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.cep.operator.CepOperator.onEventTime(CepOperator.java:325) ~[flink-cep-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:599) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:239) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:200) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:552) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:843) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:792) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:969) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:948) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:571) ~[flink-dist-1.16.1.jar:1.16.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_312]
2025-10-23 14:16:37,534 INFO org.apache.flink.runtime.executiongraph.failover.flip1.TaskManagerRestartStrategy [] - Received failure event: TaskFailureEvent{taskManagerId=session-2123414-1761142270-taskmanager-1-10, timestamp=1761200197533, cause=NullPointerException: null}, excluded: false
2025-10-23 14:16:37,534 INFO org.apache.flink.runtime.executiongraph.failover.flip1.TaskManagerRestartStrategy [] - Resetting restart strategy state due to stable running period
2025-10-23 14:16:37,536 INFO org.apache.flink.runtime.executiongraph.failover.flip1.ContinuousRestartLimitation [] - Earliest failure timestamp: 1761199499926, max continuous restart duration: 28800000 ms
2025-10-23 14:16:37,536 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - 423 tasks will be restarted to recover the failed task dcdc4daa8ced8ca9d2b8fc6c58e26129_310a79d541a763d57b050aae3bf30f0a_11_1.
2025-10-23 14:16:37,536 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job rt_scene_51697_staging (ffffffffbd30e5570000000000000001) switched from state RUNNING to RESTARTING.Mrugesh Kadia
10/24/2025, 9:48 AMMohsen Rezaei
10/27/2025, 11:12 PMGROUP BY clause. I filed an issue for this to show some details on what's going on here, but I was curious if anyone else has run into this since it's a very basic test against Flink 2.1? Running that scenario in a sync state works fine, but is not going to be ideal for more complex scenariosArman shakeri
10/28/2025, 10:09 AMTiago Pereira
10/28/2025, 12:11 PMManish Jain
10/28/2025, 1:49 PMstakater/reloader:v1.0.29 to reload the pods when a config changes. But the annotations that are working with other pods, are not working with Flink components.
Is anyone using a similar setup and has run into such a problem? We don't want to create a custom solution for job restarts, and manual restarts, are not optimal.Francisco Morillo
10/28/2025, 7:29 PMNoufal Rijal
10/29/2025, 5:59 AMapiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: flink-session
namespace: flink-test
spec:
image: pyflink-session-test:v1.0
flinkVersion: v1_20
imagePullPolicy: Always
serviceAccount: flink
mode: standalone
flinkConfiguration:
# All Flink runtime config keys go here
fs.allowed-fallback-filesystems: "file"
io.tmp.dirs: "/tmp"
taskmanager.numberOfTaskSlots: "4"
# ===== OPERATOR AUTOSCALER =====
kubernetes.operator.job.autoscaler.enabled: "true"
kubernetes.operator.job.autoscaler.target.utilization: "0.7"
kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2"
kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
kubernetes.operator.job.autoscaler.metrics.window: "5m"
kubernetes.operator.job.autoscaler.scale-up.grace-period: "1m"
kubernetes.operator.job.autoscaler.scale-down.grace-period: "5m"
# # 💡 MOVED: jobManager must be a direct child of 'spec'
jobManager:
replicas: 1
resource:
memory: "2048m"
cpu: 1
taskManager:
replicas: 2
resource:
# memory: "10240m"
memory: "2048m"
cpu: 2
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
metadata:
name: ppe-kafka-streaming
namespace: flink-test
spec:
deploymentName: flink-session
job:
jarURI: /opt/flink/opt/flink-python-1.20.3.jar
# entryClass: org.apache.flink.client.python.PythonDriver
# args:
# # 1. Main PyFlink Script
# - "--python" # Changed from -py
# - "/opt/flink/usrlib/streaming_test.py"
# # 2. Python Archives
# - "--pyArchives" # Changed from -pyarch
# - "blob_path#flink_venv"
# # 3. Python Executable
# - "-pyexec" # This is correct!
# - "flink_venv/bin/python"
args:
- "-py"
- "/opt/flink/usrlib/streaming_test.py"
- "-pyarch"
- "blob_path#flink_venv"
- "-pyexec"
- "flink_venv/bin/python"
parallelism: 2
Request for your help if you have faced and tackled a similar issue.
#C065944F9M2 #C03G7LJTS2G #C03GV7L3G2CSaketh
10/29/2025, 7:09 AMMohamed Galal
10/29/2025, 7:19 AMAndré Santos
10/29/2025, 7:13 PMAbstractFlinkService.submitJobToSessionCluster() - it bypasses rest pod leader discovery entirely.Royston
10/30/2025, 11:43 AMIain Dixon
10/30/2025, 1:23 PMinPoolUsage and outPoolUsage are good metrics with which to assess the presence of backpressure. To test things out I built a really simple setup as seen in the picture below, where records are generated in the generator (via a loop to create the rate and Thread.sleep at the end to buff out the rest of the second, based on the DS2 wordcount here https://github.com/strymon-system/ds2/blob/master/flink-examples/src/main/java/ch/ethz/systems/strymon/ds2/flink/wordcount/sources/RateControlledSourceFunction.java), sent to a pipeline workload simulator (which is a single operator which counts the number of recieved records and runs a Thread.sleep for different frequencies in order to simulate pipeline workload), and exit to a sink where records are received but not saved or sent onwards. I bound the parallelism of each operator to 1 (to create the minimumal possible pipeline). The generator produces a constant workload of 1000 records per second, and the workload simulator produces a constant work for every n records.
!experimental_setup▾
outPoolUsage would trend up to some value (as roughly 500ms of "work" should be created every second) and remain relatively constant at that value, raher than dipping back down and jumping up as seen in the graph. I'm not sure what mechanism in Flink would be responsible for this behaviour if the workload is constant, and I was wondering anyone working on Flink could explain what's occuring or point me in the right direction. I'm aware (from the linked blog post above) that the outPoolUsage metric is an aggregation of the floatingBuffersUsage and exclusiveBuffersUsage metrics, so the dropping to 10% would be one of the exclusiveBuffers, but why would floating buffers come and go if the pipeline workload and arrival rates are constant?
!buffer_question▾
Noufal Rijal
11/04/2025, 4:19 PMSidharth Ramalingam
11/05/2025, 6:12 AMAsyncIO function with Futures to make external gRPC calls. Currently, we have set the async capacity to 1, and we are using a blocking stub to make those calls. For each event, we trigger 4 Futures (i.e., 4 gRPC calls per event).
Does this mean that the Executors.newFixedThreadPool() needs to have at least 4 threads to avoid queue starvation? Also, if we increase the async capacity to 2, should we increase the thread pool size to 8 to keep up with the parallel calls?Jashwanth S J
11/06/2025, 6:50 AMjarURI: "file:///opt/flink/integration_testing_ui_demo.jar". Operator is failing to find the jar inside the JM/TM pod to bring up TM pods even though jar is present within image. I could exec and see the jar. Can someone help to find the cause
Operator logs:
2025-11-06 06:36:55,168 o.a.f.k.o.r.ReconciliationUtils [WARN ][ncm-sc/nc-int] Attempt count: 11, last attempt: false
2025-11-06 06:36:55,259 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2025-11-06 06:36:55,362 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2025-11-06 06:36:55,459 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][ncm-sc/fsc-flink-cluster] Resource fully reconciled, nothing to do...
2025-11-06 06:36:55,469 i.j.o.p.e.EventProcessor [ERROR][ncm-sc/nc-int] Error during event processing ExecutionScope{ resource id: ResourceID{name='nc-int', namespace='ncm-sc'}, version: 48211671}
org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:130)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:58)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
Exec to JM Pod: I have jar inside
❯ kubectl exec -it -n ncm-sc fsc-flink-cluster-6c47f5964b-t5jhb -- bash
Defaulted container "flink-main-container" out of: flink-main-container, setup-certs-and-plugins (init)
root@fsc-flink-cluster-6c47f5964b-t5jhb:/opt/flink# ls
'${sys:log.file}' bin examples lib licenses NOTICE plugins README.txt
artifacts conf integration_testing_ui_demo.jar LICENSE log opt pod-template
Describe flinksession job
❯ kubectl describe flinksessionjob nc-int -n ncm-sc | tail -n 20
Status:
Error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{},"throwableList":[{"type":"java.io.FileNotFoundException","message":"/opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{}}]}
Job Status:
Checkpoint Info:
Last Periodic Checkpoint Timestamp: 0
Job Id: f72f53c6355212276b75452aa2dc376e
Savepoint Info:
Last Periodic Savepoint Timestamp: 0
Savepoint History:
Lifecycle State: UPGRADING
Observed Generation: 2
Reconciliation Status:
Last Reconciled Spec: {"spec":{"job":{"jarURI":"file:///opt/flink/integration_testing_ui_demo.jar","parallelism":1,"entryClass":"com.beam.screaltime.worker.ncm.process_functions.NcmEventsJob","args":["--jobType","cdc","--cloudProvider","nx","--buildId","dev1-6404","--operatorParallelism","{\"default\":1}"],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null,"autoscalerResetNonce":null},"restartNonce":1,"flinkConfiguration":null,"deploymentName":"fsc-flink-cluster"},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","firstDeployment":true}}
Reconciliation Timestamp: 1762411304062
State: UPGRADING
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning SessionJobException 7m5s (x19 over 23m) Job /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
Normal Submit 7m5s (x19 over 23m) Job Starting deploymentwindwheel
11/07/2025, 8:59 AMKamakshi
11/07/2025, 10:59 PMElad
11/09/2025, 1:03 PMHan You
11/10/2025, 3:53 PMVasu Dins
11/11/2025, 6:53 AMflink-connector-elasticsearch7-3.1.0-1.20.jar.
when I run my job, I’m getting the following error:
Exception in thread "Thread-5" java.lang.NoClassDefFoundError: org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase
py4j.protocol.Py4JError: org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder does not exist in the JVM
here’s a minimal example of my code
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///C:/flink-1.20.1/lib/flink-connector-elasticsearch7-3.1.0-1.20.jar")
data_stream = env.from_collection([
{"id": "1", "name": "John", "age": 25},
{"id": "2", "name": "Jane", "age": 30}
])
json_stream = data_stream.map(lambda x: json.dumps(x))
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static_index('my-index', 'id')) \
.set_hosts(['10.0.0.102:9200']) \
.build()
json_stream.sink_to(es7_sink)
env.execute("Flink Elasticsearch 7 Job")
i’m running it using
flink run -py sampletestelastic.py
has anyone faced this issue before? seems like ElasticsearchSinkBuilderBase class is missing from the jar or not visible to PyFlink.
do i need an extra dependency or different jar for flink 1.20.1?
It seems like ElasticsearchSinkBuilderBase might be missing or not accessible from the JVM side.
any guidance or suggestions would be really appreciatedVikas Patil
11/11/2025, 9:02 PM徐平
11/12/2025, 5:16 AMGAURAV MIGLANI
11/12/2025, 8:30 AMHristo Yordanov
11/12/2025, 7:28 PMAnanth bharadwaj
11/13/2025, 2:44 AM