Tal Sheldon
03/22/2023, 10:47 AMJalil Alchy
03/22/2023, 12:59 PMpublic KafkaSink<OutboxRecord> getKafkaSink() {
KafkaRecordSerializationSchema<OutboxRecord> serializer = KafkaRecordSerializationSchema.<OutboxRecord>builder()
.setTopicSelector(x -> x.topic)
.setValueSerializationSchema(new KafkaOutboxRecordSerializer())
.build();
Properties p = new Properties() {
{
put("<http://transaction.timeout.ms|transaction.timeout.ms>", (Integer) 60000);
}
};
return KafkaSink.<OutboxRecord>builder()
.setBootstrapServers("localhost:9092")
.setKafkaProducerConfig(p)
.setRecordSerializer(serializer)
.build();
}
However this method causes the application to throw a not serializable error. If I make the method static, it gets better. Is there a better way to do this that I am missing?Amir Hossein Sharifzadeh
03/22/2023, 5:17 PMEMPAD_BKGD_TBL
and EMPAD_BKGD_TBL
where each table has equal rows (64). Both tables have chunk_i field with uniques values (1..64). I am trying to create join on both tables (stream) and I would expect that my joined_query will give me 64 rows but I see duplicated rows there.
String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_RAW_TBL.n_total_chunks as n_total_chunks, " +
"EMPAD_BKGD_TBL.data as bkgd_enc_data FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
Table raw_table =
tableEnv.sqlQuery(raw_query);
DataStream<Row> raw_stream = tableEnv.toDataStream(raw_table);
raw_table
contains 128 rows but I expect to have 64 rows. I don’t know how to fix the issue here. Thanks you.Herat Acharya
03/22/2023, 11:38 PMkubernetes-session.sh
and specifying taskmanager.numberOfTaskSlots=8
these denote task slots per task manager right?? So how does flink know how many task managers to create ? Btw our source is kafka and sink is a database... kafka will constantly have eventsLee xu
03/23/2023, 1:07 AMChen-Che Huang
03/23/2023, 2:02 AMCLAIM
, NO_CLAIM
, and LEGACY
. Assume that my Flink application restores from a savepoint SVP-1
with restore mode CLAIM
. As time goes by, my Flink application creates new savepoints SVP-2
, SVP-3
, and so on. From the doc, the CLAIM
mode may delete SVP-1
when Flink thinks SVP-1
is not needed for recovery anymore. How about SVP-2
, SVP-3
and future savepoints? Will them also be deleted if Flink thinks them no longer required? Thanks in advance for any reply.Hu Guang
03/23/2023, 3:48 AMkeyedStream.window(SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(5)))
.aggregate(new Last24HAggregator(), new MyProcessWindowFunction())
.addSink(new MySink());
Before we add the aggregation function, everything seems fine. My guess is that maybe aggregation function is not compatible with event time processing semantics.Ashutosh Joshi
03/23/2023, 10:38 AMpublic class SelectAllFields extends TableFunction<Row> {
public void eval(Row row) {
collect(row);
}
}
2 - register it -
env.createTemporarySystemFunction("SelectAllFields", SelectAllFields.class);
3 - using through sql -
select name, SelectAllFields(*) as meta from table1
kingsathurthi
03/23/2023, 10:47 AMTiansu Yu
03/23/2023, 12:37 PMTsering
03/23/2023, 1:47 PMEventTimeTrigger
from flink
@Override
public TriggerResult onElement(
Object element, long timestamp, TimeWindow window, TriggerContext ctx)
throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
suppose there are 5 elements of a same window got into onElement
function and hit the registerEventTimeTimer
five times, will the onEventTime
get invoke 5 times ?Trevor Burke
03/23/2023, 6:56 PMCheckpoint expired before completing
Amir Hossein Sharifzadeh
03/23/2023, 7:56 PMString raw_query = "select DISTINCT chunk_i FROM EMPAD_RAW_TBL"; Table join_table =
tableEnv.sqlQuery(raw_query);
DataStream<Row> bkgd_stream = tableEnv.toDataStream(join_table);
I get this error: org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$1*' doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[chunk_i], select=[chunk_i])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:405)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:185)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:366)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:355)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:354)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:354)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:128)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:53)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram.optimize(FlinkChangelogModeInferenceProgram.scala:43)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:176)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:83)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:315)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:195)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:224)
at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:219)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:253)
at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:226)
Mitchell Jeppson
03/23/2023, 11:45 PMAbhinav Ittekot
03/24/2023, 6:42 AMs3://
prefix passed for checkpoint location. Is there a way we can indicate to Flink to use emrfs like by changing the s3 prefix?Siddhesh Kalgaonkar
03/24/2023, 9:43 AMA shorter checkpointing interval causes higher overhead during regular processing but *can enable faster recovery* because less data needs to be reprocessed.
I get that if the interval is short the throughput is high, but how does it help in faster recovery of the states? I am not sure about that. Can somebody help me to understand this in a better way?Siddhesh Kalgaonkar
03/24/2023, 9:52 AMcallback
method?Dan Sisson
03/24/2023, 12:29 PMenv.java.opts: -Dmetrics.reporter.stsd.host=${DD_AGENT_HOST}
However, at startup, the reporter crashes with:
2023-03-24 12:13:55,011 ERROR org.apache.flink.runtime.metrics.ReporterSetup [] - Could not instantiate metrics reporter stsd. Metrics might not be exposed/reported.
java.lang.IllegalArgumentException: Invalid host/port configuration. Host: null Port: 1125
at org.apache.flink.metrics.statsd.StatsDReporter.open(StatsDReporter.java:72) ~[?:?]
...
... so it seems the metrics.reporter.stsd.host
is not being used in the dynamic properties...
Should it be possible to set metric reporter properties using the env.java.opts
?
Thanks you for any pointers!Rion Williams
03/24/2023, 2:50 PMElasticsearchSink.Builder
class exposed an onFailure
handler to capture these types of situations:
builder.onFailureHander { request, throwable, statusCode, indexer ->
// Handle failures here
}
However, after the sink was recently updated to use the Elasticsearch7SinkBuilder
class, it doesn’t appear to expose the same level of flexibility for capturing failures within the sink outside of adjusting the backoff strategy. Ideally, I’d need the flexibility to process records in batches (via the internal BulkProcessor
) with the ability to discern between the types of failures that might occur similar to the following example:
builder.onFailureHandler { request, throwable, statusCode, indexer ->
if (ExceptionUtils.findThrowable(throwable, EsRejectedExecutionException.class).isPresent()) {
// Potentially transient/ES-related error, retry indexing
indexer.add(action);
} else if (ExceptionUtils.findThrowable(throwable, ElasticsearchParseException.class).isPresent()) {
// Bad/invalid document, need to log and potentially send to a DLQ for further investigation
} else {
// Anything else, crash the job via an exception
}
}
Additionally, when processing in batches, how are single failures within a BulkProcessor
call typically handled? Is the entire batch retried, or does the processor only reattempt the requests that initially failed. For example, if I had a single request that attempted to index 100 documents, but one of them was a duplicate or could not be inserted into the index, how would that be handled? Ideally, I’d like to capture the document(s) that couldn’t’ be written (i.e. throw them to a DLQ or something) or have some way to isolate what failed in the batch.Yufei Chen
03/24/2023, 3:58 PMKevin Lam
03/24/2023, 4:03 PMGil Kirkpatrick
03/24/2023, 8:47 PMCREATE TABLE FOO (id STRING, name STRING)
WITH (
'connector.type'='kafka',
'connector.version'='universal',
'connector.topic'='foo',
'format.type'='json',
'connector.properties.bootstrap.servers'='kafka:29091',
'connector.properties.group.id'='flink'
);
I get this error when I try to select from the table:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.factories.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.properties.bootstrap.servers=kafka:29091
connector.properties.group.id=flink
connector.topic=foo
connector.type=kafka
connector.version=universal
format.type=json
schema.0.data-type=VARCHAR(<tel:2147483647|2147483647>)
schema.0.name=id
schema.1.data-type=VARCHAR(<tel:2147483647|2147483647>)
schema.1.name=name
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
I interpreted the error as a symptom of not having the appropriate JAR files in /opt/flink/lib, so I added flink-sql-connector-kafka.jar, flink-connector-kafka.jar, and flink-table-api-java-1.17.0.jar to the image, but the error persists. The container is based off the Flink 1.17.0 container image on Docker Hub.Tawfik Yasser
03/24/2023, 9:46 PMSerializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
My event structure is as follows: {ip: 152.89.196.211
, ts: 2023-03-04T23:12:26Z
} IP: String, ts: Instant
Can anyone help me? TIA.Sai Sharath Dandi
03/24/2023, 10:11 PMperson_array
is an array of rows like ARRAY<ROW< name string, age int > >
and I want to extract just the name fields into another array ARRAY< string >
. I'd like to know if there's a way to do this with the built-in functions before writing a UDF
SELECT
ARRAY[person_array.name] as person_names
FROM
my_table
-- this is obviously wrong syntax
czchen
03/25/2023, 1:41 AMjava.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
when upgrading from Flink 1.16.0 to Flink 1.17.0 (with GCS as persistent storage). Any idea how to fix the problem?
We have /opt/flink/opt/flink-gs-fs-hadoop-1.17.0.jar
link to /opt/flink/opt/flink-gs-fs-hadoop-1.17.0.jar
, so GCS shall work fine, and GCS works fine in Flink 1.16.
The following is how we config Flink container in Dockerfile:
RUN echo start \
&& echo "networkaddress.cache.ttl=0" >> $JAVA_HOME/lib/security/java.security \
&& ln -s /opt/flink/opt/flink-queryable-state-runtime-*.jar /opt/flink/lib/ \
&& ln -s /opt/flink/opt/flink-state-processor-api-*.jar /opt/flink/lib/ \
&& mkdir -p /opt/flink/plugins/gs-fs-hadoop \
&& ln -s /opt/flink/opt/flink-gs-fs-hadoop-*.jar /opt/flink/plugins/gs-fs-hadoop/flink-gs-fs-hadoop.jar \
&& mkdir -p /opt/flink/plugins/s3-fs-hadoop \
&& ln -s /opt/flink/opt/flink-s3-fs-hadoop-*.jar /opt/flink/plugins/s3-fs-hadoop/flink-s3-fs-hadoop.jar \
&& chown flink:flink /opt/flink/lib/* \
&& chmod 0644 /opt/flink/lib/* \
&& echo end
The following is full exception log:
java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2720) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17.0.jar:1.17.0]
... 4 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17.0.jar:1.17.0]
... 4 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17.0.jar:1.17.0]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17.0.jar:1.17.0]
... 4 more
ConradJam
03/26/2023, 4:38 AMPavan kalyan athukuri
03/26/2023, 10:18 AMNiels Basjes
03/26/2023, 9:47 PMError: nl.basjes.parse.useragent.flink.table.TestTableFunctionClientHints.testMapFunctionReturnMap Time elapsed: 5.42 s <<< ERROR!
java.lang.IllegalArgumentException: fromIndex(2) > toIndex(0)
at java.base/java.util.AbstractList.subListRangeCheck(AbstractList.java:509)
at java.base/java.util.AbstractList.subList(AbstractList.java:497)
at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.safeArgList(CacheGeneratorUtil.java:213)
at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil$CacheKeyStrategy$1.cacheKeyBlock(CacheGeneratorUtil.java:205)
at org.apache.calcite.rel.metadata.janino.CacheGeneratorUtil.cachedMethod(CacheGeneratorUtil.java:68)
at org.apache.calcite.rel.metadata.janino.RelMetadataHandlerGeneratorUtil.generateHandler(RelMetadataHandlerGeneratorUtil.java:121)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.generateCompileAndInstantiate(JaninoRelMetadataProvider.java:138)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.lambda$static$0(JaninoRelMetadataProvider.java:73)
at org.apache.flink.calcite.shaded.com.google.common.cache.CacheLoader$FunctionToCacheLoader.load(CacheLoader.java:165)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3951)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3974)
at org.apache.flink.calcite.shaded.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.revise(JaninoRelMetadataProvider.java:197)
at org.apache.calcite.rel.metadata.RelMetadataQueryBase.revise(RelMetadataQueryBase.java:118)
at org.apache.calcite.rel.metadata.RelMetadataQuery.getPulledUpPredicates(RelMetadataQuery.java:844)
at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:307)
at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:337)
...
刘路
03/27/2023, 8:11 AMRafael Jeon
03/27/2023, 8:44 AM