Jashwanth S J
04/13/2023, 8:27 AMWarning Failed 3s (x3 over 21s) kubelet Error: failed to create containerd task: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: "/docker-entrypoint.sh": stat /docker-entrypoint.sh: no such file or directory: unknown
jobManager:
resource:
memory: "1048m"
cpu: 0.5
podTemplate:
spec:
containers:
- args:
- -n
- -c
- /etc/supervisor/supervisord.conf
command:
- /usr/bin/supervisord
tolerations:
- key: "env"
operator: "Equal"
value: "dev"
effect: NoSchedule
taskManager:
resource:
memory: "1048m"
cpu: 0.5
podTemplate:
spec:
containers:
- args:
- -n
- -c
- /etc/supervisor/supervisord.conf
command:
- /usr/bin/supervisord
tolerations:
- key: "env"
operator: "Equal"
value: "dev"
effect: NoSchedule
Slackbot
04/13/2023, 8:35 AMSumit Nekar
04/13/2023, 8:41 AMJirawech Siwawut
04/13/2023, 9:33 AMWARN org.apache.flink.hive.shaded.parquet.hadoop.MemoryManager - Total allocation exceeds 50.00% (2,130,706,432 bytes) of heap memory
What is the meaning of this error? Does it means that the resource used to write parquet exceeds 50% of HEAP?
If yes, what are some root causes that it suddenly occurs after I have ran the job for months. I tried to restart all taskmanagers and everything seems to work fineDeepyaman Datta
04/13/2023, 12:45 PMSELECT
cc_num,
merchant,
(SUM(amt) OVER w) / (
SUM(amt) OVER (
PARTITION BY cc_num
ORDER BY ts_ltz RANGE BETWEEN INTERVAL '30' MINUTES PRECEDING AND CURRENT ROW
)
)
FROM transaction_amount
WINDOW w AS (
PARTITION BY cc_num, merchant
ORDER BY ts_ltz RANGE BETWEEN INTERVAL '30' MINUTES PRECEDING AND CURRENT ROW
);
(approach similar as https://stackoverflow.com/a/6207658)
If it's not clear, the idea above is to get the breakdown of transaction amounts by merchant
for each cc_num
.
However, from https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/:
You can define multiple(and I do get an error if trying it) Is there a way to get a similar result given current limitations?window aggregates in aOVER
clause. However, for streaming queries, theSELECT
windows for all aggregates must be identical due to current limitation.OVER
Simon Lawrence
04/13/2023, 2:11 PMEric Xiao
04/13/2023, 10:10 PMflink-main-container
for both TM and JM) have changed... to make the migration to the flink operator seamless we were hoping there was a way to make sure those could two values could remain consist but haven't been able to do some in the manifest file...
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
labels:
name: {{ .name }}
app: {{ .appName }}
env: {{ .env }}
runtime-component: trickle
annotations:
<http://cloudplatform.shopify.io/scale-down-for-maintenance|cloudplatform.shopify.io/scale-down-for-maintenance>: "true"
spec:
...
jobManager:
...
podTemplate:
spec:
containers:
- name: jobmanager
taskManager:
...
podTemplate:
spec:
containers:
- name: taskmanager
podTemplate:
...
spec:
...
containers:
- name: flink-main-container
This is an example of what my manifest file looks like. We would also like to have some reusable configurations defined in the general outer podTemplate
as well, instead of having duplicate configs for both TM and JM podTemplate
.Slackbot
04/14/2023, 9:24 AMchunilal kukreja
04/14/2023, 10:00 AMWARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 000000006e6b13320000000000000000. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
Caused by: java.io.IOException: Target file file:/opt/flink/pm/checkpoint/000000006e6b13320000000000000000/chk-1/_metadata already exists.
at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
Expectation: Ideally it should either skip this folder name use another or overwrite the content of the existing folder.
Can someone help me out to know if this is expected behaviour or there is some workaround available?Dimitris Kalouris
04/14/2023, 11:36 AMAmir Hossein Sharifzadeh
04/14/2023, 12:41 PMtableEnv.createTemporaryView("tbl1", stream1);
tableEnv.createTemporaryView("tbl2", stream2);
String data_query = "select ....";
Table raw_table = tableEnv.sqlQuery(data_query);
DataStream<Row> mystream = tableEnv.toDataStream(raw_table);
mystream.process(new DataProcessor()).setParallelism(4);
In ProcessFucntion class:
public class DataProcessor extends ProcessFunction<Row, List<double[][]>> {
public void processElement(Row row, ProcessFunction<Row, List<double[][]>>.Context context, Collector<List<double[][]>> collector) throws Exception {
int id = Integer.parseInt("" + row.getField(0));
String data1 = ((String) (row.getField(1)));
String data2 = ((String) (row.getField(2)));
double[][] results = DataUtils.compute(id, data1, data2);
}
My questions:
1. How to send the results to the output?
2. How to access the output from the main file?AK
04/15/2023, 11:41 AMmohammadreza khedri
04/16/2023, 7:30 AMpy4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
...
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
...
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory!
INFO:root:Starting up Python harness in loopback mode.
....
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
...
Caused by: java.lang.IllegalStateException: Process died with exit code 0
at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
...
Slackbot
04/17/2023, 5:17 AMKush Rohra
04/17/2023, 5:25 AMLikhith Kanigolla
04/17/2023, 6:36 AMCREATE TABLE customers_inv (
`id` STRING,
`first_name` STRING,
`last_name` STRING,
`email` VARCHAR,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.customers',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'eventsGroup=customersinv',
'value.debezium-json.schema-include' = 'true',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
CREATE TABLE products_inv (
`id` STRING,
`name` VARCHAR,
`description` VARCHAR,
`weight` FLOAT,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.products',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'eventsGroup=productsinv',
'value.debezium-json.schema-include' = 'true',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
CREATE TABLE orders_inv (
`id` STRING,
`order_date` STRING,
`purchaser` STRING,
`quantity` INT,
`product_id` STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'dbserver1.inventory.orders',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'eventsGroup=ordersinv',
'value.debezium-json.schema-include' = 'true',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'debezium-json'
);
How can i further proceed and send the data to elasticsearch
I tried to create new table and insert data but didn't worked.
CREATE TABLE users_sink (
`name` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink-topic-json',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'SinkGroup',
'value.format' = 'json'
);
INSERT INTO users_sink
SELECT first_name from customers_inv;
I am getting this error and could'nt get much information about it
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.users_sink' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[def
ault_catalog, default_database, customers_inv]], fields=[id, first_name, last_name, email])
Any other approches also will be helpful. Thanks in Advancebarak ben natan
04/17/2023, 9:00 AM张夏昭
04/17/2023, 9:43 AMcreate view dataset_v as
SELECT a.id as dataset_id,a.dataset_name_zh ,a.dataset_name_en ,a.introduction,
(SELECT c.level_path from dict_detail c WHERE c.id in mysplit(a.industry_type_ids,";")) as level_path,
FROM dataset_info a
left join usage_statistics b on a.id=b.dataset_info_id
This is my funcion:
@FunctionHint(output = @DataTypeHint("ARRAY<f1 INT>"))
public static class MySplit extends TableFunction<List<Integer>> {
public void eval(String str,String delimiter){
List<Integer> ss=new ArrayList<>();
for (String s : str.split(delimiter)) {
ss.add( Integer.parseInt(s));
}
collect(ss);
}
}
Nithin kharvi
04/17/2023, 10:18 AMThijs van de Poll
04/17/2023, 3:53 PM.jar
dependencies. Both pipelines work properly if I add the dependencies to the /opt/flink/lib
folder. As a solution I wanted to add in one of the pipelines and additional pipeline.jars
configuration which adds some more jars required for that specific pipeline. However, I keep on getting errors which indicate that the jars are not loaded correctly.
I also tried to run the pipeline using ./bin/flink run --python pipeline.py --jarfile jar_dependency.jar
, but that also did not work.
Does anyone understand why this does not work? A potential workaround would be to to build different images for the different pipelines, but I rather not do that. Thanks in advance!Hygor Knust
04/17/2023, 9:49 PMSessionJob
using flink-kubernetes-operator
.
However I’m receiving this event on the `SessionJob`: "Connection timed out (Connection timed out)"
Looking in the operator logs I see this:
flink-kubernetes-operator 2023-04-17 21:40:34,212 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
flink-kubernetes-operator 2023-04-17 21:40:34,213 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
flink-kubernetes-operator 2023-04-17 21:40:34,223 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
flink-kubernetes-operator 2023-04-17 21:40:34,223 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
flink-kubernetes-operator 2023-04-17 21:40:49,224 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
flink-kubernetes-operator 2023-04-17 21:40:49,225 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
flink-kubernetes-operator 2023-04-17 21:40:49,233 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
flink-kubernetes-operator 2023-04-17 21:40:49,233 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
flink-kubernetes-operator 2023-04-17 21:41:04,234 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
flink-kubernetes-operator 2023-04-17 21:41:04,235 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
flink-kubernetes-operator 2023-04-17 21:41:04,244 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
flink-kubernetes-operator 2023-04-17 21:41:04,244 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
flink-kubernetes-operator 2023-04-17 21:41:19,246 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
flink-kubernetes-operator 2023-04-17 21:41:19,246 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
flink-kubernetes-operator 2023-04-17 21:41:19,254 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
flink-kubernetes-operator 2023-04-17 21:41:19,254 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
flink-kubernetes-operator 2023-04-17 21:41:30,342 o.a.f.k.o.l.AuditUtils [INFO ][data-team/flink-job] >>> Event | Warning | SESSIONJOBEXCEPTION | Connection timed out (Connection timed out)
flink-kubernetes-operator 2023-04-17 21:41:30,342 o.a.f.k.o.r.ReconciliationUtils [WARN ][data-team/flink-job] Attempt count: 3, last attempt: false
flink-kubernetes-operator 2023-04-17 21:41:30,356 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
flink-kubernetes-operator 2023-04-17 21:41:30,356 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
flink-kubernetes-operator 2023-04-17 21:41:30,364 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
flink-kubernetes-operator 2023-04-17 21:41:30,364 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
flink-kubernetes-operator 2023-04-17 21:41:30,375 o.a.f.k.o.l.AuditUtils [INFO ][data-team/flink-job] >>> Status | Error | UPGRADING | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.net.ConnectException: Connection timed out (Connection timed out)","throwableList":[{"type":"java.net.ConnectException","message":"Connection timed out (Connection timed out)"}]}
flink-kubernetes-operator 2023-04-17 21:41:30,375 i.j.o.p.e.ReconciliationDispatcher [ERROR][data-team/flink-job] Error during event processing ExecutionScope{ resource id: ResourceID{name='flink-job', namespace='data-team'}, version: 1307816475} failed.
flink-kubernetes-operator org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.net.ConnectException: Connection timed out (Connection timed out)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:112)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:52)
flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:145)
flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:103)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:102)
flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
flink-kubernetes-operator at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
flink-kubernetes-operator at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
flink-kubernetes-operator at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
flink-kubernetes-operator at java.base/java.lang.Thread.run(Unknown Source)
flink-kubernetes-operator Caused by: java.net.ConnectException: Connection timed out (Connection timed out)
flink-kubernetes-operator at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
flink-kubernetes-operator at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
flink-kubernetes-operator at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
flink-kubernetes-operator at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
flink-kubernetes-operator at java.base/java.net.SocksSocketImpl.connect(Unknown Source)
flink-kubernetes-operator at java.base/java.net.Socket.connect(Unknown Source)
flink-kubernetes-operator at java.base/sun.security.ssl.SSLSocketImpl.connect(Unknown Source)
flink-kubernetes-operator at java.base/sun.security.ssl.BaseSSLSocketImpl.connect(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.https.HttpsClient.<init>(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.https.HttpsClient.New(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown Source)
flink-kubernetes-operator at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(Unknown Source)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.artifact.HttpArtifactFetcher.fetch(HttpArtifactFetcher.java:59)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.artifact.ArtifactManager.fetch(ArtifactManager.java:61)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.uploadJar(AbstractFlinkService.java:724)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitJobToSessionCluster(AbstractFlinkService.java:207)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:70)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:41)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
flink-kubernetes-operator at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:104)
flink-kubernetes-operator ... 13 more
Could anyone help me debug that?张夏昭
04/18/2023, 6:14 AMCaused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions
at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory.<clinit>(Elasticsearch7DynamicSinkFactory.java:61) ~[flink-connector-elasticsearch7_2.12-1.14.5.jar:1.14.5]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_333]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_333]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_333]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_333]
at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_333]
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ~[?:1.8.0_333]
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) ~[?:1.8.0_333]
at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_333]
at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_333]
at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:623) ~[flink-table_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:378) ~[flink-table_2.12-1.14.5.jar:1.14.5]
at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:156) ~[flink-sql-client_2.12-1.14.5.jar:1.14.5]
... 8 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions
at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_333]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_333]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[?:1.8.0_333]
at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_333]
at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory.<clinit>(Elasticsearch7DynamicSinkFactory.java:61) ~[flink-connector-elasticsearch7_2.12-1.14.5.jar:1.14.5]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_333]
Any other approches also will be helpful. Thanks in AdvanceGintaras Matulas
04/18/2023, 6:47 AMNarges
04/18/2023, 7:51 AMMax Dubinin
04/18/2023, 12:23 PMKeyBy
operator?
I have a Kinesis stream with 350+ event types, each event is eventually streamed into a different sink (using side-outputs)
Is it a good idea to use KeyBy
before the process
function that tags the events?
Does it have any positive effect on the pipeline in terms of durability/performance?Raghunadh Nittala
04/18/2023, 12:30 PMRaghunadh Nittala
04/18/2023, 1:14 PMINSERT INTO sink_table_s3
SELECT event_id, event_type, event_name, DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '1' HOUR), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
FROM source_table
GROUP BY event_id, event_type, event_name, TUMBLE(proc_time, INTERVAL '1' HOUR);
I am partitioning the table on event_id, event_type and date columns. I observed the parquet files are getting saved for an event_id
, event_type
, but the date is not changing. The data being processed today is being saved to 2023-04-14 folder. As I am using proc_time
to derive the date, I expect data to be saved to 2023-04-18 folder.
PS. The deployment has been rolled back due to some errors in between, but running fine since a day. I’m curious what would lead to this behavior?Felix Angell
04/18/2023, 1:54 PMjava.lang.NoSuchMethodError: 'org.apache.flink.metrics.MetricGroup org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()'
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:416)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365)
Essentially we've upgraded to 1.15.2 flink (on KDA) and we're using flink-sql-connector-kinesis
v 1.15.3 which exposes a FlinkKinesisConsumer
. We are using PyFlink and so we mapped this FlinkKinesisConsumer class with the jvm_gateway
. On using this we get the above exception as of doing this upgrade.
I can still resolve this getMetricGroup
symbol manually if I were to add the dependency to this flink-sql-connector-kinesis
pom in a typical java project, but in the PyFlink environment it doesn't seem to pick it up.
Any ideas what is up here? Let me know if you need more detailsZachary Piazza
04/18/2023, 2:33 PMSucheth Shivakumar
04/18/2023, 6:44 PMupgradeMode: stateless
doesn't seem to work as expected (read from the earliest offset). but was working fine with flink version 1.15 and operator version 1.15
Anyone has any idea why it is not working and what has changed ?