Pankaj Singh
06/20/2023, 10:08 AMDateTimeBucketAssigner
and BasePathBucketAssigner
Is it not possible to write custom bucket assigner in python? or is there any better way to do this?Raghunadh Nittala
06/20/2023, 11:44 AMupsert-kafka
connector and converting the same to a DataStream using .toChangelogStream
. I have another DataStream from source kafka created using kafka
connector. Now, I’m doing a keyBy on both of these and connecting them using a KeyedCoProcessFunction
implementation. I’m facing an issue where I see huge number of records in the source kafka operator in the flink UI, even though I do not have those many records being published to that topic. When I avoid .connect
I could see less records in the source operator. Any idea what could be wrong here?Nishant Goenka
06/20/2023, 12:32 PMJirawech Siwawut
06/20/2023, 1:26 PMTheodore Curtil
06/20/2023, 1:31 PMSET 'state.checkpoints.dir' = '<s3://state/checkpoints>';
SET 'state.backend.incremental' = 'true';
SET 'execution.checkpointing.unaligned' = 'true';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '10s';
SET 'execution.checkpointing.min-pause' = '10s';
SET 'sql-client.execution.result-mode'='TABLEAU';
SET 'parallelism.default' = '1';
ADD JAR '/opt/sql-client/lib/flink-sql-connector-kafka-1.16.0.jar';
ADD JAR '/opt/sql-client/lib/flink-protobuf-1.16.0.jar';
ADD JAR '/opt/sql-client/lib/original-kafka-send-proto-0.1.0.jar';
-- SYNTHETIC EVENTS GEO
DROP TABLE IF EXISTS TEST;
CREATE TABLE TEST (
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'protos_topic_cards',
'properties.bootstrap.servers' = 'kafka_broker:29092',
'format' = 'protobuf',
'protobuf.message-class-name' = 'com.example.CardData',
'protobuf.ignore-parse-errors' = 'true'
);
SELECT * FROM TEST;
And the original-kafka-send-proto-0.1.0.jar
does contain the protobuf Java class com.example.CardData.
Still, the SQL script fails in the SQL Client with the error:
Flink SQL> CREATE TABLE TEST (
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'protos_topic_cards',
'properties.bootstrap.servers' = 'kafka_broker:29092',
'format' = 'protobuf',
'protobuf.message-class-name' = 'com.example.CardData',
'protobuf.ignore-parse-errors' = 'true'
)[INFO] Execute statement succeed.Flink SQL> [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: com.example.CardData Shutting down the session... done. Anyone ever faced this issue with protobuf?
Carlos Santos
06/20/2023, 2:43 PMBruno Filippone
06/20/2023, 4:34 PMjob.jarURI
part of a session job, where I get the error: Could not find a file system implementation for scheme 's3'
. What am I missing?
Here are my Kubernetes resources for context (shortened versions):
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: session-cluster
namespace: flink-jobs
spec:
image: flink:1.17
flinkVersion: v1_17
jobManager:
[...]
podTemplate:
spec:
containers:
- name: flink-main-container
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-presto-1.17.1.jar;flink-s3-fs-hadoop-1.17.1.jar
taskManager:
[...]
podTemplate:
spec:
containers:
- name: flink-main-container
env:
- name: ENABLE_BUILT_IN_PLUGINS
value: flink-s3-fs-presto-1.17.1.jar;flink-s3-fs-hadoop-1.17.1.jar
serviceAccount: flink
---
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: session-cluster-job
namespace: flink-jobs
spec:
deploymentName: session-cluster
job:
jarURI: <s3://flink-test-bucket/TopSpeedWindowing.jar>
parallelism: 4
upgradeMode: stateless
Ilya Sterin
06/20/2023, 5:35 PMkiran kumar
06/20/2023, 5:55 PMsh standalone-job.sh start-foreground -jar ~/Documents/Repository/sherlocksvc/sherlock/target/sherlock-0.0.1.jar --job-classname com.cashfree.sherlock.Application
Zhong Chen
06/20/2023, 11:03 PM>>> Status | Error | STABLE | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.","throwableList":[{"type":"java.util.concurrent.ExecutionException","message":"java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed."},{"type":"org.apache.flink.util.SerializedThrowable","message":"java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed."}]}
Ravi Nishant
06/20/2023, 11:11 PMVishal bharatbhai Vanpariya
06/21/2023, 5:48 AMDr Ravi Tomar
06/21/2023, 7:52 AMDiogo Santos
06/21/2023, 10:45 AMKeyur Makwana
06/21/2023, 1:07 PMAkshat
06/21/2023, 1:25 PMMikhail Spirin
06/21/2023, 1:28 PMDr Ravi Tomar
06/21/2023, 4:35 PMGiannis Polyzos
06/21/2023, 4:48 PMdp api
06/21/2023, 4:49 PMjdbc_url = os.environ.get('MYSQL_JDBC_URL')
jdbc_table_name = os.environ.get('MYSQL_TABLE_NAME')
jdbc_username = os.environ.get('MYSQL_USERNAME')
jdbc_password = os.environ.get('MYSQL_PASSWORD')
sink_mysql = f"""
CREATE TABLE pending_orders_table (
rest_id VARCHAR,
pending_count INT,
PRIMARY KEY (rest_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = '{jdbc_url}',
'table-name' = '{jdbc_table_name}',
'username' = '{jdbc_username}',
'password' = '{jdbc_password}'
)
"""
t_env.execute_sql(sink_mysql)
I am getting an error when I am running this script in a docker container with environment variables set accordingly.
The error is something like -
Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "jdbc" at line 8, column 19.
Was expecting one of:
"UESCAPE" ...
<QUOTED_STRING> ...
")" ...
"," ...
What needs to be changed syntactically for this ?
Thanks in advance!!Razin Bouzar
06/21/2023, 6:17 PMOscar Perez
06/21/2023, 6:42 PMKyle Ahn
06/21/2023, 7:33 PMflinksessionjob
CRD, I am seeing [FLINK-24883] Use flink web ui to submit the jar throw Job client must be a CoordinationRequestGateway. This is a bug exception. Exception occurs here, but TL;DR is that the batch job needs a long-lived client, but flink jobs submitted via Rest API doesn’t have that.
Collecting results require a client to continuously fetching the result till the end or explicitly closes the iterator (causing the job to be cancelled). If client goes away without fetching all the results or terminating the job it will hang forever. Submitting by web UI does not fulfill this requirement (as it does not have a long existing client) so we're not supporting collect in web UI. I'll give out a more proper exception message and refine the document of collect.
Faisal A. Siddiqui
06/21/2023, 10:20 PMstate.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.metrics.enable: true
state.backend.rocksdb.write-buffer-size:128mb
state.backend.rocksdb.max-write-buffer-number:10
state.backend.rocksdb.checkpoint.transfer.thread.num: 4
taskmanager.memory.network.fraction:0.25
taskmanager.memory.managed.fraction:0.5
Since physical memory for this manager is 8GB shouldn't i get 4GB to managed memory on Flink UI or am i missing something ??Rion Williams
06/22/2023, 1:34 AMunion
of all of my other source streams (all mapped to a single common schema), connect that with the previous broadcast stream.
• Apply the enrichments in an appropriate BroadcastProcessFunction where:
◦ Broadcast elements are simply stored in broadcast state (something like context.getBroadcastState(descriptor).put(x.key, x.value)
); Not sure if
there’s anything more needed.
◦ Non-Broadcast elements would simply check broadcast state for the given tenant configuration and apply the enrichment.
Since I haven’t used Broadcast State previously, my questions are:
• Would the addition of a new stream/join/broadcast like this be a breaking change? I.e. would the job be able to be restored from the previous state/savepoint prior to the change?
• Is Broadcast State safe for a scenario like this? The cardinality of the data is fairly small, possibly a thousand or so, with the objects only containing two fields. Enough to store in memory pretty easily.
• Does parallelism increase any issues here? My assumption is that for parallelism >1, performing the broadcastState.put(…)
operation would rebroadcast that new/updated state to all of the task managers/tasks to ensure that the same data is eventually available.
• Since Broadcast state is checkpointed, how does that factor in when the job is restarted or recovers from a failure? Will it be restored/available when data begins flowing again?
I’m happy to provide any more context/detail/etc. if it helps!Dr Ravi Tomar
06/22/2023, 1:45 AMDheeraj Panangat
06/22/2023, 7:52 AMDheeraj Panangat
06/22/2023, 7:53 AMIlia Kapanadze
06/22/2023, 9:23 AMHussain Abbas
06/22/2023, 11:04 AM