André Santos
10/29/2025, 7:13 PMAbstractFlinkService.submitJobToSessionCluster() - it bypasses rest pod leader discovery entirely.Royston
10/30/2025, 11:43 AMIain Dixon
10/30/2025, 1:23 PMinPoolUsage and outPoolUsage are good metrics with which to assess the presence of backpressure. To test things out I built a really simple setup as seen in the picture below, where records are generated in the generator (via a loop to create the rate and Thread.sleep at the end to buff out the rest of the second, based on the DS2 wordcount here https://github.com/strymon-system/ds2/blob/master/flink-examples/src/main/java/ch/ethz/systems/strymon/ds2/flink/wordcount/sources/RateControlledSourceFunction.java), sent to a pipeline workload simulator (which is a single operator which counts the number of recieved records and runs a Thread.sleep for different frequencies in order to simulate pipeline workload), and exit to a sink where records are received but not saved or sent onwards. I bound the parallelism of each operator to 1 (to create the minimumal possible pipeline). The generator produces a constant workload of 1000 records per second, and the workload simulator produces a constant work for every n records.
!experimental_setup▾
outPoolUsage would trend up to some value (as roughly 500ms of "work" should be created every second) and remain relatively constant at that value, raher than dipping back down and jumping up as seen in the graph. I'm not sure what mechanism in Flink would be responsible for this behaviour if the workload is constant, and I was wondering anyone working on Flink could explain what's occuring or point me in the right direction. I'm aware (from the linked blog post above) that the outPoolUsage metric is an aggregation of the floatingBuffersUsage and exclusiveBuffersUsage metrics, so the dropping to 10% would be one of the exclusiveBuffers, but why would floating buffers come and go if the pipeline workload and arrival rates are constant?
!buffer_question▾
Noufal Rijal
11/04/2025, 4:19 PMSidharth Ramalingam
11/05/2025, 6:12 AMAsyncIO function with Futures to make external gRPC calls. Currently, we have set the async capacity to 1, and we are using a blocking stub to make those calls. For each event, we trigger 4 Futures (i.e., 4 gRPC calls per event).
Does this mean that the Executors.newFixedThreadPool() needs to have at least 4 threads to avoid queue starvation? Also, if we increase the async capacity to 2, should we increase the thread pool size to 8 to keep up with the parallel calls?Jashwanth S J
11/06/2025, 6:50 AMjarURI: "file:///opt/flink/integration_testing_ui_demo.jar". Operator is failing to find the jar inside the JM/TM pod to bring up TM pods even though jar is present within image. I could exec and see the jar. Can someone help to find the cause
Operator logs:
2025-11-06 06:36:55,168 o.a.f.k.o.r.ReconciliationUtils [WARN ][ncm-sc/nc-int] Attempt count: 11, last attempt: false
2025-11-06 06:36:55,259 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2025-11-06 06:36:55,362 o.a.f.r.u.c.m.ProcessMemoryUtils [INFO ][ncm-sc/fsc-flink-cluster] The derived from fraction jvm overhead memory (102.400mb (107374184 bytes)) is less than its min value 192.000mb (201326592 bytes), min value will be used instead
2025-11-06 06:36:55,459 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][ncm-sc/fsc-flink-cluster] Resource fully reconciled, nothing to do...
2025-11-06 06:36:55,469 i.j.o.p.e.EventProcessor [ERROR][ncm-sc/nc-int] Error during event processing ExecutionScope{ resource id: ResourceID{name='nc-int', namespace='ncm-sc'}, version: 48211671}
org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:130)
at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:58)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:153)
at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:111)
Exec to JM Pod: I have jar inside
❯ kubectl exec -it -n ncm-sc fsc-flink-cluster-6c47f5964b-t5jhb -- bash
Defaulted container "flink-main-container" out of: flink-main-container, setup-certs-and-plugins (init)
root@fsc-flink-cluster-6c47f5964b-t5jhb:/opt/flink# ls
'${sys:log.file}' bin examples lib licenses NOTICE plugins README.txt
artifacts conf integration_testing_ui_demo.jar LICENSE log opt pod-template
Describe flinksession job
❯ kubectl describe flinksessionjob nc-int -n ncm-sc | tail -n 20
Status:
Error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.io.FileNotFoundException: /opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{},"throwableList":[{"type":"java.io.FileNotFoundException","message":"/opt/flink/integration_testing_ui_demo.jar (No such file or directory)","additionalMetadata":{}}]}
Job Status:
Checkpoint Info:
Last Periodic Checkpoint Timestamp: 0
Job Id: f72f53c6355212276b75452aa2dc376e
Savepoint Info:
Last Periodic Savepoint Timestamp: 0
Savepoint History:
Lifecycle State: UPGRADING
Observed Generation: 2
Reconciliation Status:
Last Reconciled Spec: {"spec":{"job":{"jarURI":"file:///opt/flink/integration_testing_ui_demo.jar","parallelism":1,"entryClass":"com.beam.screaltime.worker.ncm.process_functions.NcmEventsJob","args":["--jobType","cdc","--cloudProvider","nx","--buildId","dev1-6404","--operatorParallelism","{\"default\":1}"],"state":"suspended","savepointTriggerNonce":null,"initialSavepointPath":null,"checkpointTriggerNonce":null,"upgradeMode":"stateless","allowNonRestoredState":null,"savepointRedeployNonce":null,"autoscalerResetNonce":null},"restartNonce":1,"flinkConfiguration":null,"deploymentName":"fsc-flink-cluster"},"resource_metadata":{"apiVersion":"flink.apache.org/v1beta1","firstDeployment":true}}
Reconciliation Timestamp: 1762411304062
State: UPGRADING
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning SessionJobException 7m5s (x19 over 23m) Job /opt/flink/integration_testing_ui_demo.jar (No such file or directory)
Normal Submit 7m5s (x19 over 23m) Job Starting deploymentwindwheel
11/07/2025, 8:59 AMKamakshi
11/07/2025, 10:59 PMElad
11/09/2025, 1:03 PMHan You
11/10/2025, 3:53 PMVasu Dins
11/11/2025, 6:53 AMflink-connector-elasticsearch7-3.1.0-1.20.jar.
when I run my job, I’m getting the following error:
Exception in thread "Thread-5" java.lang.NoClassDefFoundError: org/apache/flink/connector/elasticsearch/sink/ElasticsearchSinkBuilderBase
py4j.protocol.Py4JError: org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder does not exist in the JVM
here’s a minimal example of my code
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.elasticsearch import Elasticsearch7SinkBuilder, ElasticsearchEmitter
import json
env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:///C:/flink-1.20.1/lib/flink-connector-elasticsearch7-3.1.0-1.20.jar")
data_stream = env.from_collection([
{"id": "1", "name": "John", "age": 25},
{"id": "2", "name": "Jane", "age": 30}
])
json_stream = data_stream.map(lambda x: json.dumps(x))
es7_sink = Elasticsearch7SinkBuilder() \
.set_bulk_flush_max_actions(1) \
.set_emitter(ElasticsearchEmitter.static_index('my-index', 'id')) \
.set_hosts(['10.0.0.102:9200']) \
.build()
json_stream.sink_to(es7_sink)
env.execute("Flink Elasticsearch 7 Job")
i’m running it using
flink run -py sampletestelastic.py
has anyone faced this issue before? seems like ElasticsearchSinkBuilderBase class is missing from the jar or not visible to PyFlink.
do i need an extra dependency or different jar for flink 1.20.1?
It seems like ElasticsearchSinkBuilderBase might be missing or not accessible from the JVM side.
any guidance or suggestions would be really appreciatedVikas Patil
11/11/2025, 9:02 PM徐平
11/12/2025, 5:16 AMGAURAV MIGLANI
11/12/2025, 8:30 AMHristo Yordanov
11/12/2025, 7:28 PMAnanth bharadwaj
11/13/2025, 2:44 AMMehdi Jalili
11/13/2025, 2:45 PMscan.watermark.idle-timeout. I’m using the SQL Gateway to join two tables, backed by Kafka topics. One is a fact table that receives a constant stream of data. The other one is a compact topic populated by Debezium that contains dimension data and I’m joining the two to enrich the facts.
I want to sink the results to an append-only Kafka table sink which is why I have made the dimension table versioned and use a Temporal join to get the snapshot of the dimensions at the specified fact timestamp.
Because the dimension stream rarely gets any new data, the watermark will usually be hours if not days behind the fact stream watermark and this will stop the output watermark from progressing. I have used scan.watermark.idle-timeout on the dimension table to let the watermark generator know to ignore the dimension table watermark but I still see the watermark on the TemporalJoin operator as the minimum of the two. What am i I doing wrong?
Sorry if my grasp of these concepts are somewhat shaky. I’d really appreciate the help. Thank you.
Here’s some simplified code to demonstrate the setup:Brin
11/14/2025, 12:40 PMtest.vip_record) and the sink table (test.ods_vip_record) currently have exactly the same schema, and no DDL changes have been made during the job execution.
However, the job keeps failing after running for some time, and the logs show the following error:
2025-11-14 20:32:03
java.lang.IllegalStateException: Unable to coerce data record from test.vip_record (schema: null) to test.ods_vip_record (schema: null)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.lambda$handleDataChangeEvent$1(SchemaOperator.java:235)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.handleDataChangeEvent(SchemaOperator.java:232)
at org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator.processElement(SchemaOperator.java:152)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:141)
at java.util.Collections$SingletonList.forEach(Collections.java:4824)
at org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema.deserialize(DebeziumEventDeserializationSchema.java:93)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:119)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:101)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlPipelineRecordEmitter.processElement(MySqlPipelineRecordEmitter.java:120)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:73)
at org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:46)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:203)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
The key part is:
schema: null
which suggests that both the source and sink schemas are unexpectedly lost or not recognized during the runtime.
---
### Additional information
* Deployment mode: Flink on Kubernetes (Native K8s Mode)
* Source: MySQL 5.7
* Sink: StarRocks
* Flink CDC performs snapshot + incremental reading normally after restart
* After 1–2 hours of normal running, the job fails with the above error
* No schema changes were made during this time
---
### Questions
1. Under what conditions would SchemaOperator encounter (schema: null) for both source and sink?
2. Could this be related to state inconsistency or schema metadata loss inside checkpoint/savepoint?
3. Is this a known issue in Flink CDC 3.3 with Flink 1.20?
4. Are there recommended configurations to ensure stable schema management for MySQL → StarRocks pipelines?
Thank you!melin li
11/17/2025, 7:04 AMIdan Sheinberg
11/17/2025, 11:54 AMtaskmanager.numOfTaskSlots via the Flink Kubenetes Operator.
I'm using the operator version 1.13.0 and Flink 1.20.3. While the configmap generated by the operator correctly reflects the set value ('3' in my case). It looks like during the actual taskmanager startup process, it resolves taskmanager.numOfTaskSlots=1 as a Dynamic configuration property, which overrides the value at runtime. Has anyone successfully set a value larger than 1 in K8s in the past?Roberto Serafin
11/17/2025, 8:03 PMLee Wallen
11/17/2025, 11:40 PMSaslClientAuthenticator failing to configure.
Relevant configuration:
• Added classloader.parent-first-patterns.additional per Confluent Cloud support to avoid child-first classloading issues with OAuth:
org.apache.kafka.common.security.oauthbearer.;
org.apache.kafka.common.security.auth.;
org.apache.kafka.common.security.plain.;
org.apache.kafka.common.security.scram.
• Both sasl.login.callback.handler.class and sasl.client.callback.handler.class are set to:
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
Runtime classpath (Kafka-related):
+--- io.confluent:kafka-avro-serializer:7.4.12
| +--- io.confluent:kafka-schema-serializer:7.4.12
| | +--- io.confluent:kafka-schema-registry-client:7.4.12
| +--- io.confluent:kafka-schema-registry-client:7.4.12 (*)
+--- org.apache.flink:flink-connector-kafka:3.3.0-1.19
| +--- org.apache.kafka:kafka-clients:3.4.0
Stacktrace excerpt:
Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: Failed to configure SaslClientAuthenticator
Caused by: java.lang.IllegalArgumentException: Callback handler must be castable to org.apache.kafka.common.security.auth.AuthenticateCallbackHandler: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
Has anyone encountered this issue or have suggestions on how to resolve it?
I'll add a thread and include a snippet with the full stacktrace.Anuj Jain
11/18/2025, 6:24 AMif you use helm to install flink-kubernetes-operator, it allows you to specify a postStart hook to download the required plugins.
i need help how we use this posStart hook, is there any example ?Ashish Marottickal Gopi
11/19/2025, 11:10 AMJatin Kumar
11/19/2025, 2:03 PMXianxin Lin
11/19/2025, 8:32 PMCollected metrics:
Backlog information at each source
Incoming data rate at the sources (e.g. records/sec written into the Kafka topic)
Record processing rate at each job vertex
Busy and backpressured time at each job vertexSebastian Tota
11/20/2025, 7:42 PMAvroParquetWriters.forReflectRecord since it specifically returns back the T object we pass in, but we know this is the least efficient solution. I thought about switching to using forGenericRecord, but that method returns a ParquetWriterFactory of GenericRecord which means we no longer have access to the event time information of the record in our Bucket Assigner.
Does anyone have any recommendations for how we can optimize this Parquet writer?Ricardo Mendoza
11/21/2025, 2:40 AMPrafful Javare
11/22/2025, 4:52 PMFROM flink:1.17.1
RUN wget -P /opt/flink/lib/ <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.4.0-1.20/flink-sql-connector-kafka-3.4.0-1.20.jar>
RUN chown -R flink:flink /opt/flink/lib
COPY ./exercises/target/*.jar .
services:
broker:
image: apache/kafka:4.0.0
platform: linux/amd64
container_name: broker
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_LISTENERS: <PLAINTEXT://0.0.0.0:9092>,<CONTROLLER://localhost:9093>
KAFKA_ADVERTISED_LISTENERS: <PLAINTEXT://broker:9092>
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@localhost:9093
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_NUM_PARTITIONS: 3
kcat:
image: edenhill/kcat:1.7.1
platform: linux/amd64
container_name: flink_kcat
entrypoint:
- /bin/sh
- -c
- |
apk add jq;
while [ 1 -eq 1 ];do sleep 60;done
sql-client:
build: .
command: bin/sql-client.sh
depends_on:
- jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
jobmanager:
build: .
ports:
- "8081:8081"
command: jobmanager
volumes:
- flink_data:/tmp/
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
state.backend: hashmap
state.checkpoints.dir: file:///tmp/flink-checkpoints
heartbeat.interval: 1000
heartbeat.timeout: 5000
rest.flamegraph.enabled: true
web.backpressure.refresh-interval: 10000
taskmanager:
build: .
depends_on:
- jobmanager
command: taskmanager
volumes:
- flink_data:/tmp/
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 3
state.backend: hashmap
state.checkpoints.dir: file:///tmp/flink-checkpoints
heartbeat.interval: 1000
heartbeat.timeout: 5000
volumes:
flink_data:
but when I run docker compose up and try to submit the job to the flink cluster, I get this error:
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.lineage.LineageVertexProvider
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
... 46 more
I am using flink 1.17.1 if that helps. I am able to run the WordCount example, but this specific exercise involving writing to a kafka sink is not working. Any pointers? Thanks!Vedanth Baliga
11/23/2025, 9:42 AM# ---------- Namespace ----------
apiVersion: v1
kind: Namespace
metadata:
name: telemetry
---
# ---------- Redpanda-1 ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-1
namespace: telemetry
spec:
selector:
app: redpanda-1
ports:
- name: kafka
port: 29092
targetPort: 29092
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-1
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-1
template:
metadata:
labels:
app: redpanda-1
spec:
containers:
- name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
args:
- redpanda
- start
- --smp
- "1"
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- "1"
- --kafka-addr
- <PLAINTEXT://0.0.0.0:29092>
- --advertise-kafka-addr
- <PLAINTEXT://redpanda-1:29092>
- --rpc-addr
- 0.0.0.0:33145
- --advertise-rpc-addr
- redpanda-1:33145
- --pandaproxy-addr
- <PLAINTEXT://0.0.0.0:28082>
- --advertise-pandaproxy-addr
- <PLAINTEXT://redpanda-1:28082>
ports:
- containerPort: 29092
- containerPort: 28082
- containerPort: 33145
volumeMounts:
- name: data
mountPath: /var/lib/redpanda
volumes:
- name: data
emptyDir: {}
---
# ---------- Redpanda-2 ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-2
namespace: telemetry
spec:
selector:
app: redpanda-2
ports:
- name: kafka
port: 29093
targetPort: 29093
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-2
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-2
template:
metadata:
labels:
app: redpanda-2
spec:
containers:
- name: redpanda
image: docker.redpanda.com/redpandadata/redpanda:v23.1.8
args:
- redpanda
- start
- --smp
- "1"
- --reserve-memory
- 0M
- --overprovisioned
- --node-id
- "2"
- --seeds
- redpanda-1:33145
- --kafka-addr
- <PLAINTEXT://0.0.0.0:29093>
- --advertise-kafka-addr
- <PLAINTEXT://redpanda-2:29093>
- --rpc-addr
- 0.0.0.0:33146
- --advertise-rpc-addr
- redpanda-2:33146
- --pandaproxy-addr
- <PLAINTEXT://0.0.0.0:28083>
- --advertise-pandaproxy-addr
- <PLAINTEXT://redpanda-2:28083>
ports:
- containerPort: 29093
- containerPort: 28083
- containerPort: 33146
volumeMounts:
- name: data
mountPath: /var/lib/redpanda
volumes:
- name: data
emptyDir: {}
---
# ---------- Redpanda Console ----------
apiVersion: v1
kind: Service
metadata:
name: redpanda-console
namespace: telemetry
spec:
selector:
app: redpanda-console
ports:
- name: http
port: 8080
targetPort: 8080
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redpanda-console
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: redpanda-console
template:
metadata:
labels:
app: redpanda-console
spec:
containers:
- name: redpanda-console
image: docker.redpanda.com/redpandadata/console:v2.2.4
command: ["/bin/sh", "-c"]
args:
- echo "$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console
env:
- name: CONFIG_FILEPATH
value: /tmp/config.yml
- name: CONSOLE_CONFIG_FILE
value: |
kafka:
brokers: ["redpanda-1:29092", "redpanda-2:29093"]
schemaRegistry:
enabled: false
redpanda:
adminApi:
enabled: false
connect:
enabled: false
ports:
- containerPort: 8080
---
# ---------- Flink: JobManager ----------
apiVersion: v1
kind: Service
metadata:
name: jobmanager
namespace: telemetry
spec:
selector:
app: jobmanager
ports:
- name: rpc
port: 6123
targetPort: 6123
- name: ui
port: 8081
targetPort: 8081
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: jobmanager
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: jobmanager
template:
metadata:
labels:
app: jobmanager
spec:
containers:
- name: jobmanager
image: flink-sql-k8s:1.19
args: ["jobmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
jobmanager.bind-host: 0.0.0.0
jobmanager.rpc.port: 6123
rest.address: jobmanager
rest.bind-address: 0.0.0.0
rest.port: 8081
ports:
- containerPort: 6123
- containerPort: 8081
---
# ---------- Flink: TaskManager ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: taskmanager
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: taskmanager
template:
metadata:
labels:
app: taskmanager
spec:
containers:
- name: taskmanager
image: flink-sql-k8s:1.19
args: ["taskmanager"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 20
taskmanager.bind-host: 0.0.0.0
ports:
- containerPort: 6121
- containerPort: 6122
---
# ---------- Flink: SQL Client ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: sql-client
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: sql-client
template:
metadata:
labels:
app: sql-client
spec:
containers:
- name: sql-client
image: flink-sql-k8s:1.19
command: ["bash", "-c", "sleep infinity"]
env:
- name: FLINK_PROPERTIES
value: |
jobmanager.rpc.address: jobmanager
rest.address: jobmanager
---
# ---------- ClickHouse ----------
apiVersion: v1
kind: Service
metadata:
name: clickhouse
namespace: telemetry
spec:
selector:
app: clickhouse
ports:
- name: http
port: 8123
targetPort: 8123
- name: native
port: 9000
targetPort: 9000
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: clickhouse
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: clickhouse
template:
metadata:
labels:
app: clickhouse
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:23.8
ports:
- containerPort: 8123
- containerPort: 9000
volumeMounts:
- name: data
mountPath: /var/lib/clickhouse
- name: logs
mountPath: /var/log/clickhouse-server
volumes:
- name: data
emptyDir: {}
- name: logs
emptyDir: {}
---
# ---------- Telemetry Producer (Python) ----------
apiVersion: apps/v1
kind: Deployment
metadata:
name: telemetry-producer
namespace: telemetry
spec:
replicas: 1
selector:
matchLabels:
app: telemetry-producer
template:
metadata:
labels:
app: telemetry-producer
spec:
containers:
- name: telemetry-producer
image: telemetry-producer:latest
imagePullPolicy: IfNotPresent
env:
- name: KAFKA_BOOTSTRAP_SERVERS
value: "redpanda-1:29092"
- name: KAFKA_TOPIC
value: "fleet.prod.telemetry.raw"
My Dockerfile
# base image
FROM flink:1.19-scala_2.12-java11
USER root
RUN wget -P /opt/flink/lib/ \
<https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.2.0-1.19/flink-sql-connector-kafka-3.2.0-1.19.jar> && \
wget -P /opt/flink/lib/ \
<https://repo1.maven.org/maven2/org/apache/flink/flink-json/1.19.0/flink-json-1.19.0.jar>