Felix Angell
12/07/2022, 2:25 PMMatt Weiss
12/07/2022, 2:33 PMkafkaSource.setParallelism(5)
, Flink would make 5 consumer instances, each consuming from a partition. There's gotta be some tradeoff or limit to that I'd think. What's better? 5 task containers each with a parallelism of 1 or 1 container with a parallelism of 5?Morey Straus
12/07/2022, 4:23 PMding bei
12/07/2022, 4:48 PMFinished : On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to "Finished"
AND
As part-81fc4980-a6af-41c8-9937-9939408a734b-0 is now pending completion, after the next successful checkpoint, it is finalized
contradict (pic2)
Important Note 3: Flink and the FileSink never overwrites committed data. Given this, when trying to restore from an old checkpoint/savepoint which assumes an in-progress file which was committed by subsequent successful checkpoints, the FileSink will refuse to resume and will throw an exception as it cannot locate the in-progress file.
since the first quote said only pending files will be committed and then become a finished file , what does quote 2 mean when it said a in-progress file was committed ?Adisun Wheelock
12/07/2022, 7:02 PMcheckpointingMode
of EXACTLY_ONCE
(the default). Does a restart caused by OOM carry different meaning in terms of failure recovery than other reasons for restart? There are some internal discussions at my company going back and forth where some believe OOM will result in different restart semantics and I wanted to come here for the source of truth.Dan Hill
12/07/2022, 7:10 PMVictor
12/07/2022, 8:32 PMVictor
12/07/2022, 8:32 PMDeryl Rodrigues
12/07/2022, 10:23 PMKhafka Stream Table A
LEFT JOIN B ON joining fields
LEFT JOIN C ON joining fields
Deryl Rodrigues
12/07/2022, 10:24 PMCaused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The physical plan is:
Deryl Rodrigues
12/07/2022, 10:24 PMEmmanuel Leroy
12/07/2022, 11:11 PMTravis Carter
12/08/2022, 12:48 AMjava.lang.NoClassDefFoundError: Could not initialize class com.thepublichealthco.protos.core.protos.ForecastProtos
at com.thepublichealthco.protos.core.protos.Forecast.getDescriptor(Forecast.java:123)
at com.thepublichealthco.statefun.io.elasticsearch.ForecastSinkFn.extractForecast(ForecastSinkFn.java:56)
at com.thepublichealthco.statefun.io.elasticsearch.ForecastSinkFn.process(ForecastSinkFn.java:36)
at com.thepublichealthco.statefun.io.elasticsearch.ForecastSinkFn.process(ForecastSinkFn.java:23)
at com.thepublichealthco.statefun.io.elasticsearch.ElasticsearchSinkFunctionDelegate.process(ElasticsearchSinkFunctionDelegate.java:27)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.invoke(ElasticsearchSinkBase.java:330)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:86)
at org.apache.flink.streaming.runtime.tasks.CopyingBroadcastingOutputCollector.collect(CopyingBroadcastingOutputCollector.java:55)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:62)
at org.apache.flink.statefun.flink.core.functions.SideOutputSink.accept(SideOutputSink.java:47)
at org.apache.flink.statefun.flink.core.functions.ReusableContext.send(ReusableContext.java:99)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleEgressMessages(RequestReplyFunction.java:248)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.handleInvocationResultResponse(RequestReplyFunction.java:207)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:178)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.sendToFunction(RequestReplyFunction.java:356)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.sendToFunction(RequestReplyFunction.java:330)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.sendToFunction(RequestReplyFunction.java:321)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onRequest(RequestReplyFunction.java:136)
at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:118)
at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48)
at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74)
at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60)
at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164)
at org.apache.flink.statefun.flink.core.functions.Reductions.apply(Reductions.java:149)
at org.apache.flink.statefun.flink.core.functions.FunctionGroupOperator.processElement(FunctionGroupOperator.java:90)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:80)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.sendDownstream(FeedbackUnionOperator.java:180)
at org.apache.flink.statefun.flink.core.feedback.FeedbackUnionOperator.processElement(FeedbackUnionOperator.java:86)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
The type is defined as
public static final Type<Forecast> FORECAST_TYPE =
TypeUtils.createProtobufType(TYPES_NAMESPACE, Forecast.getDescriptor(), Forecast.parser());
---
public static <T extends GeneratedMessageV3> Type<T> createProtobufType(String typeNamespace,
Descriptors.Descriptor descriptor,
Parser<T> parser) {
return SimpleType.simpleImmutableTypeFrom(
TypeName.typeNameOf(typeNamespace, descriptor.getFullName()), T::toByteArray,
parser::parseFrom);
}
It seems the issue comes from statefun using Protoc 3.7.1
while I'm using 3.21.7
in my proto library. Any insight or direction would be greatly appreciatedEmmanuel Leroy
12/08/2022, 3:57 AMTsering
12/08/2022, 4:11 AMCannot access org.apache.flink.runtime.state.StateBackend
Matthias
12/08/2022, 8:37 AMLAST_STATE
recovery. It states that it recovers from HA metadata (see corresponding section in docs). I would interpret this as relying on Flink's HA metadata. But based on what I found in the code (in AbstractJobReconciler:206) it looks like the operator is relying on the savepoints periodically triggered by the operator itself. Am I right or do I miss something here? 🤔RICHARD JOY
12/08/2022, 10:05 AM....[ERROR] [xyz/basic-checkpoint-ha-example] Exception while listing jobs
java.util.concurrent.TimeoutException.
at java.base/java.util.concurrent.CompletableFuture.timedGet(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.listJobs(AbstractFlinkService.java:228)
at org.apache.flink.kubernetes.operator.observer.JobStatusObserver.observe(JobStatusObserver.java.68)
Yang LI
12/08/2022, 1:25 PMCaused by: java.lang.RuntimeException: Error creating an instance of com.amazonaws.auth.WebIdentityTokenCredentialsProvider for URI <s3p://cs-flink-savepoints-eu-west-1/dev/cs-flink/newflink/_entropy_>
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getCustomAWSCredentialsProvider(PrestoS3FileSystem.java:724)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getAwsCredentialsProvider(PrestoS3FileSystem.java:708)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.createAmazonS3Client(PrestoS3FileSystem.java:632)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.initialize(PrestoS3FileSystem.java:216)
at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:123)
... 28 more
Caused by: java.lang.NoSuchMethodException: com.amazonaws.auth.WebIdentityTokenCredentialsProvider.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
at java.base/java.lang.Class.getConstructor0(Unknown Source)
at java.base/java.lang.Class.getConstructor(Unknown Source)
at com.facebook.presto.hive.s3.PrestoS3FileSystem.getCustomAWSCredentialsProvider(PrestoS3FileSystem.java:720)
... 32 more
Because WebIdentityTokenCredentialsProvider is introduced in aws java sdk from version 1.11.603. So I think the flink-s3-fs-presto 1.13.6 is using aws api version of its presto dependency but not the aws version specified in flink-s3-fs-bash 1.11.788 specified here https://github.com/apache/flink/blob/release-1.13.6/flink-filesystems/flink-s3-fs-base/pom.xml#L35. And it's weired because we have excluded aws java sdk of presto dependency in the flink project🤨
Is there anybody who has dealed with this error message before? 🙏Victor
12/08/2022, 4:18 PM[cp104/my-event-processor-flinkdep] Error during event processing ExecutionScope{ resource id: ResourceID{name='my-event-processor-flinkdep', namespace='cp104'}, version: 1574921286} failed.
org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.util.concurrent.ExecutionException: org.apache.flink.runtime.rest.util.RestClientException: [org.apache.flink.runtime.rest.NotFoundException: Operation not found under key: org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@c351173
at org.apache.flink.runtime.rest.handler.async.AbstractAsynchronousOperationHandlers$StatusHandler.handleRequest(AbstractAsynchronousOperationHandlers.java:182)
at
Any ideas what could be the root cause?Adrian Chang
12/08/2022, 4:31 PMGROUP BY TUMBLE
with a user-defined aggregate function. Can I expect the values sent to the function are in the order they were received ?
I know that SQL does not guarantee order when applying an aggregation function. Does Flink SQL behave the same way or it respect the order of the events ?
ThanksKrish Narukulla
12/08/2022, 5:29 PM"org.apache.hive:hive-serde:3.1.3",
"org.apache.hive:hive-metastore:3.1.3",
"org.apache.hive:hive-common:3.1.3",
Exception in thread "main" org.apache.flink.table.api.ValidationException: Could not execute CREATE CATALOG: (catalogName: [myhive], properties: [{hive-conf-dir=./src/main/java/com/xxx/common/flink/examples/sql/resources/, default-database=xxx, type=hive}])
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1435)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1172)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at com.roku.common.flink.examples.sql.Hive.main(Hive.java:53)
Caused by: org.apache.flink.table.catalog.exceptions.CatalogException: Configured default database roku doesn't exist in catalog myhive.
at org.apache.flink.table.catalog.hive.HiveCatalog.open(HiveCatalog.java:309)
at org.apache.flink.table.catalog.CatalogManager.registerCatalog(CatalogManager.java:211)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1431)
... 3 more
Jeremy DeGroot
12/08/2022, 6:34 PMRommel
12/08/2022, 6:43 PMTravis Carter
12/08/2022, 7:10 PM3.21.x
however Statefun has been compiled with Protos version 3.7.1
which includes the Lite library that has been removed in >=3.8.x
and is now a plugin.
In traditional Flink I was required to register these protos with KryoSerializer. Is there a similar requirement for this case? Or is there something I'm missing in getting our Protos in line with what StateFun Expects?Hygor Knust
12/08/2022, 7:28 PMupsert-kafka
sink producing an Avro Schema to Schema Registry. It auto generates a Avro schema like this:
{
"fields": [...],
"name": "record",
"type": "record"
}
The problem is that the "name"
attribute is always "record"
. This breaks compatibility with the gradle-avro-plugin as it tries to create a class using the AvroSchema.name
, which in this case would be class Record
, a reserved keyword in Java.Suparn Lele
12/08/2022, 8:15 PMJason Politis
12/09/2022, 12:30 AMThomas Zhang
12/09/2022, 12:48 AM/.../flink-playground/venv/lib/python3.8/site-packages/pyflink/plugins/flink-s3-fs-hadoop/flink-s3-fs-hadoop-1.15.2.jar
and my python file I'm running is in /.../flink-playground/main.py
. However I'm still getting Could not find a file system implementation for scheme 's3'.
Is there a way to get this to work with python locally?Kenny Lu
12/09/2022, 1:11 AM2022-12-08 16:44:05
org.apache.flink.util.FlinkExpectedException: The TaskExecutor is shutting down.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.onStop(TaskExecutor.java:456)
at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.lambda$terminate$0(AkkaRpcActor.java:578)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:577)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:196)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
ding bei
12/09/2022, 5:02 AMFinished : On successful checkpoints (STREAMING) or at the end of input (BATCH) pending files transition to "Finished"
AND
As part-81fc4980-a6af-41c8-9937-9939408a734b-0 is now pending completion, after the next successful checkpoint, it is finalized
contradict (pic2)
Important Note 3: Flink and the FileSink never overwrites committed data. Given this, when trying to restore from an old checkpoint/savepoint which assumes an in-progress file which was committed by subsequent successful checkpoints, the FileSink will refuse to resume and will throw an exception as it cannot locate the in-progress file.
since the first quote said only pending files will be committed and then become a finished file , what does quote 2 mean when it said a in-progress file was committed ?