Neha
03/20/2023, 11:21 AMFredrik
03/20/2023, 11:23 AMparallelism
setting in the job
-spec supposed to work?
If I change the replica count on the taskManager
or change the taskmanager.numberOfTaskSlots: "2"
but keep the job.parallelism
setting as is, the parallelism of the job (according to the UI) changes.
Changing the job.parallelism
does not seem to have an effect.
I tried to read the documentation for the CRD but it did not clarify the issue.
Am I misunderstanding something?Mohit Aggarwal
03/20/2023, 2:00 PM2023-03-20 13:53:34,477 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore.
2023-03-20 13:53:34,478 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2720) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
... 4 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
... 4 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
Has anyone faced a similar issue ?Thijs van de Poll
03/20/2023, 2:03 PMRafał Trójczak
03/20/2023, 2:49 PM15:05:22.357 [main] INFO o.a.f.a.java.typeutils.TypeExtractor -- Field Person#hobbies will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
I have the following class:
public class Person {
private String name;
private List<String> hobbies;
public Person() {
}
public Person(String name, List<String> hobbies) {
this.name = name;
this.hobbies = hobbies;
}
// getters and setters
}
and I prepared this `TypeInformation`:
TypeInformation<Person> personTypeInformation = Types.POJO(Person.class, Map.of(
"name", Types.STRING,
"hobbies", Types.LIST(Types.STRING)));
I saw a few options that don't work for me:
• @TypeInfo(PersonSerializerFactory.class)
- but I can't use this approach because the Person
class is in a different module.
• returns
method, e.g.:
env.fromCollection(persons)
.returns(personTypeInformation)
but this doesn't seem to remove the problem.
How can I add this type information to the environment?Siddhesh Kalgaonkar
03/20/2023, 6:09 PMUsman Ismail
03/20/2023, 9:37 PMCaused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://localhost:9200>, response=HTTP/1.1 200 OK}
at org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
....
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:208)
at org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
at org.opensearch.action.index.IndexResponse.<init>(IndexResponse.java:67)
This seems to be known issue for https://github.com/elastic/elasticsearch/issues/84173 elastic search but I don’t have clear path for open searchSimon Lawrence
03/21/2023, 9:58 AMThijs van de Poll
03/21/2023, 10:21 AMflink-connector-opensearch
connector. However, it fails with the following error: java.lang.ClassNotFoundException: org.opensearch.common.Strings
. I am unsure what is causing it, because I have been loading the .jar
dependency to ${FLINK_HOME}/lib
. Can anyone help me out? 🙂Thijs van de Poll
03/21/2023, 11:46 AMCaused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://host.docker.internal:9200>, response=HTTP/1.1 200 OK}
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:396)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:390)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
jobmanager | ... 1 more
jobmanager | Caused by: java.lang.NullPointerException
jobmanager | at java.base/java.util.Objects.requireNonNull(Unknown Source)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse.<init>(UpdateResponse.java:86)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:193)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:181)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:172)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:208)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2075)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1836)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1940)
jobmanager | ... 18 more
Oscar Perez
03/21/2023, 1:06 PMOscar Perez
03/21/2023, 1:07 PMOscar Perez
03/21/2023, 1:12 PMOscar Perez
03/21/2023, 1:19 PMAri Huttunen
03/21/2023, 1:44 PMpackage fi.elisa.datalake.flink.flinktools.aggregation;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.annotation.DataTypeHint;
import static org.apache.flink.table.api.Expressions.*;
import com.tdunning.math.stats.MergingDigest;
public class MergingDigestMedian extends AggregateFunction<Double, MergingDigest> {
@Override
public @DataTypeHint("RAW") MergingDigest createAccumulator() {
return new MergingDigest(100); // 100 is a common value for normal uses.
}
@Override
public Double getValue(@DataTypeHint("RAW") MergingDigest acc) {
return acc.quantile(0.5);
}
public void accumulate(@DataTypeHint("RAW") MergingDigest acc, Double value) {
acc.add(value);
}
}
I'm calling it in pyflink code by defining it as a temporary system function.
table_env.create_java_temporary_system_function("udf_median", "fi.elisa.datalake.flink.flinktools.aggregation.MergingDigestMedian")
It fails. I'll put the logs in the comments.Raghunadh Nittala
03/21/2023, 1:45 PM[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Temporal Table Join requires primary key in versioned table, but no primary key can be found
. The table has primary key but when view is created on top of it, there is no primary key.Virender Bhargav
03/21/2023, 4:34 PMExistingSavepoint savepoint = Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend());
DataSet<KeyedState> keyedState = savepoint.readKeyedState(""uid1"", new CoreProcessStateReaderFunction());
BootstrapTransformation<KeyedState> transformation = OperatorTransformation
.bootstrapWith(keyedState)
.keyBy(state -> state.id )
.transform(new StateBootstrapper());
Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend())
.withOperator("uid1", transformation)
.removeOperator("uid1")
.write(newSavePointPath);
The intent is to perform a "transformation" on KeyedState and replace older operate state with modified one.
• CoreProcessStateReaderFunction : KeyedStateReaderFunction to read existing savepoint
• StateBootstrapper : KeyedStateBootstrapFunction for state modification/cleanup
I end up with an empty new savepoint(it has only _metadata folder and nothing else). Can someone help me with what I might be doing wrong?Huaqiang Kang
03/21/2023, 7:59 PMauth-no-challenge
in Flink ?Amir Hossein Sharifzadeh
03/21/2023, 9:15 PMstatic void processWorkflow(
StreamTableEnvironment tableEnv,
DataStream<DataFileChunk> rawDataStream,
DataStream<DataFileChunk> bkgdDataStream,
String jsonCalibrationData
) {
tableEnv.createTemporaryView("EMPAD_RAW_TBL", rawDataStream);
tableEnv.createTemporaryView("EMPAD_BKGD_TBL", bkgdDataStream);
String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_BKGD_TBL.data as bkgd_enc_data " +
"FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
Table resultTable =
tableEnv.sqlQuery(data_query);
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
and this is the implementation of my ProcessFunction:
package org.varimat.process;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
public class EMPADProcessor extends ProcessFunction<Row, Row> {
@Override
public void processElement(Row row, ProcessFunction<Row, Row>.Context context,
Collector<Row> collector) throws Exception {
// process row here...
System.out.println(row.getField(0));
String rawObject = String.valueOf(row.getField(1));
String bkgdObject = String.valueOf(row.getField(2));
}
}
When I run my application, it never stops and prints duplicated data of row.getField(0)
in the processElement
method. Is there any suggestion? In other words, do I need to implement class EMPADProcessor
different?Ben Thomson
03/21/2023, 9:24 PMNeha
03/22/2023, 4:02 AMThijs van de Poll
03/22/2023, 6:36 AMD = select * from A left join B on <http://a.pk|a.pk>=<http://b.fk|b.fk> left join C on <http://a.pk|a.pk>=<http://c.fk|c.fk>
• table E: E = select <some aggregations> from D group by D.group_key
• table F: F = select *, <some expensive transformations> from E
• F gets inserted in Elasticsearch.
So what I notice is that because of the left joins, and some events arriving later than others, table F get’s updated a couple of times for the same key. Which is logical I think. However, since the transformations made to create F are expensive, it recomputes that over and over again.
For me it would be better if there was some sort of timed buffer on which events to E could arrive such that the expensive transformations do not need to be calculated on every event updating E. I am trying to understand if windows can help me out here, but I am unsure. Also, it is very important that the aggregations in E do contain all of the group elements.
Thanks!chenlu
03/22/2023, 9:00 AMTal Sheldon
03/22/2023, 10:47 AMNeha
03/22/2023, 12:32 PMJalil Alchy
03/22/2023, 12:59 PMpublic KafkaSink<OutboxRecord> getKafkaSink() {
KafkaRecordSerializationSchema<OutboxRecord> serializer = KafkaRecordSerializationSchema.<OutboxRecord>builder()
.setTopicSelector(x -> x.topic)
.setValueSerializationSchema(new KafkaOutboxRecordSerializer())
.build();
Properties p = new Properties() {
{
put("<http://transaction.timeout.ms|transaction.timeout.ms>", (Integer) 60000);
}
};
return KafkaSink.<OutboxRecord>builder()
.setBootstrapServers("localhost:9092")
.setKafkaProducerConfig(p)
.setRecordSerializer(serializer)
.build();
}
However this method causes the application to throw a not serializable error. If I make the method static, it gets better. Is there a better way to do this that I am missing?Amir Hossein Sharifzadeh
03/22/2023, 5:17 PMEMPAD_BKGD_TBL
and EMPAD_BKGD_TBL
where each table has equal rows (64). Both tables have chunk_i field with uniques values (1..64). I am trying to create join on both tables (stream) and I would expect that my joined_query will give me 64 rows but I see duplicated rows there.
String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_RAW_TBL.n_total_chunks as n_total_chunks, " +
"EMPAD_BKGD_TBL.data as bkgd_enc_data FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
Table raw_table =
tableEnv.sqlQuery(raw_query);
DataStream<Row> raw_stream = tableEnv.toDataStream(raw_table);
raw_table
contains 128 rows but I expect to have 64 rows. I don’t know how to fix the issue here. Thanks you.Herat Acharya
03/22/2023, 11:38 PMkubernetes-session.sh
and specifying taskmanager.numberOfTaskSlots=8
these denote task slots per task manager right?? So how does flink know how many task managers to create ? Btw our source is kafka and sink is a database... kafka will constantly have eventsLee xu
03/23/2023, 1:07 AMChen-Che Huang
03/23/2023, 2:02 AMCLAIM
, NO_CLAIM
, and LEGACY
. Assume that my Flink application restores from a savepoint SVP-1
with restore mode CLAIM
. As time goes by, my Flink application creates new savepoints SVP-2
, SVP-3
, and so on. From the doc, the CLAIM
mode may delete SVP-1
when Flink thinks SVP-1
is not needed for recovery anymore. How about SVP-2
, SVP-3
and future savepoints? Will them also be deleted if Flink thinks them no longer required? Thanks in advance for any reply.Chen-Che Huang
03/23/2023, 2:02 AMCLAIM
, NO_CLAIM
, and LEGACY
. Assume that my Flink application restores from a savepoint SVP-1
with restore mode CLAIM
. As time goes by, my Flink application creates new savepoints SVP-2
, SVP-3
, and so on. From the doc, the CLAIM
mode may delete SVP-1
when Flink thinks SVP-1
is not needed for recovery anymore. How about SVP-2
, SVP-3
and future savepoints? Will them also be deleted if Flink thinks them no longer required? Thanks in advance for any reply.Martijn Visser
03/23/2023, 4:51 AMChen-Che Huang
03/23/2023, 5:34 AMThedetermines who takes ownership of the files that make up savepoints or externalized checkpoints after they are restored. Snapshots, which are either checkpoints or savepoints in this context, can be owned either by a user or Flink itself. (doc)Restore Mode