Fariz Hajiyev
01/18/2023, 7:29 AMCaused by: java.lang.NoSuchMethodError: org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder;
at org.apache.flink.statefun.flink.io.common.AutoRoutableProtobufRouter.typedValuePayload(AutoRoutableProtobufRouter.java:72)
at org.apache.flink.statefun.flink.io.common.AutoRoutableProtobufRouter.route(AutoRoutableProtobufRouter.java:51)
at org.apache.flink.statefun.flink.io.common.AutoRoutableProtobufRouter.route(AutoRoutableProtobufRouter.java:36)
at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.processElement(IngressRouterOperator.java:81)
at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
I am using Statefun dependencies of version 3.1.0, and Flink dependencies of version 1.13.1
I have this protobuf dependency in my pom:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.12</version>
</dependency>
Tried all versions starting from 3.8.0 to 3.21.12, same error. Can't try lower than 3.8.0 because the java proto files in the project won't compile.
"statefun-flink-io" transitively brings in protobuf 3.7.1, so I tried excluding it, but no success either, same error.
Tried statefun 3.1.1, with Flink 1.13.1, same error.
Tried statefun 3.2.0, with Flink 1.14.0, same error.
I opened TypedValue
proto class (from "External Libraries" of IDEA) and it has Builder
inner class with setValue(ByteString value)
public method,
Any idea what I am doing wrong?Amenreet Singh Sodhi
01/18/2023, 9:19 AMenv.log.dir
) but couldnt find out the respective variable for log file name.Dheeraj Panangat
01/18/2023, 1:33 PMCaused by: Configuration property <accoutnname>.<http://dfs.core.windows.net|dfs.core.windows.net> not found.
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:372)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1133)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:174)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)
We have given the configurations in core-site.xml too for following
fs.hdfs.impl
fs.abfs.impl -> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem
fs.file.impl
fs.azure.account.auth.type
fs.azure.account.oauth.provider.type
fs.azure.account.oauth2.client.id
fs.azure.account.oauth2.client.secret
fs.azure.account.oauth2.client.endpoint
fs.azure.createRemoteFileSystemDuringInitialization -> true
On debugging found that flink reads from core-default-shaded.xml, but even if the properties are specified there, the default configs are not loaded and we get a different exception as :
Caused by: Unable to load key provider class.
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getTokenProvider(AbfsConfiguration.java:540)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1136)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:174)
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)
Is anyone familiar with this issue ? Appreciate any help on this.
Thanksjaiprasad
01/18/2023, 2:18 PMPrashant Bhardwaj
01/18/2023, 2:45 PMfixed delay
work without enabling checkpoints? Basically, can I define any restart strategy apart from none
without enabling checkpoint?Sergio Morales
01/18/2023, 3:52 PMio.fabric8.kubernetes.client.KubernetesClientException: JcaPEMKeyConverter is provided by BouncyCastle
. I added the 1.70 version for both libraries: https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk15on, https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on but now when running the command to run flink in kubernetes (./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
) it does not startup the jobmanager. Any ideas?Matt Weiss
01/18/2023, 4:01 PMPRASHANT GUPTA
01/18/2023, 4:36 PMCaused by: java.io.IOException: Target file file:/data/datas-template/checkpoints/00000000067179110000000000000000/chk-1/_metadata already exists.
Aeden Jameson
01/18/2023, 5:02 PMBhupendra Yadav
01/18/2023, 5:47 PMDataStream<Integer> input = env.fromElements(1, 2, 3);
DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4, 5);
I want to take the intersection of these 2 streams. I tried different things, but they seem to fail with error: https://ideone.com/LdtTmE
Any easy way to take intersection?
Set<Integer> st = new HashSet<>();
DataStream<Integer> input = env.fromElements(1, 2, 3);
CloseableIterator<Integer> integerCloseableIterator = input.executeAndCollect();
while(integerCloseableIterator.hasNext()) {
st.add(integerCloseableIterator.next());
}
integerCloseableIterator.close();
DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4, 5)
.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return st.contains(value);
}
});
input2.print();
env.execute();
Mustafa Akın
01/18/2023, 7:56 PMMustafa Akın
01/18/2023, 8:21 PMYaroslav Bezruchenko
01/18/2023, 10:19 PMorg.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2143) [flink-dist-1.15.2.jar:1.15.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
2023-01-18 22:15:47,640 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2082) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2061) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:98) ~[flink-dist-1.15.2.jar:1.15.2]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2143) ~[flink-dist-1.15.2.jar:1.15.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]
Guruguha Marur Sreenivasa
01/19/2023, 2:09 AMConfluentRegistryAvroDeserializationSchema.forGeneric(...)
method in the flink-avro-confluent-registry
package.
1. What is the use of passing this as an argument? The way I understood it, its only used to set the writer schema and nothing else.
2. If say I have a topic with 2 records r1 with schema s1 and r2 with schema s2 (s1 and s2 are slightly different - s2 has one new column than s1). Both schemas are registered in the schema registry. When I run a FlinkKafkaConsumer
, and configure the deserialization schema as ConfluentRegistryAvroDeserializationSchema.forGeneric(s1, URL)
, I see that the new column in r2 is completely ignored. I was under the impression that if the latest schema s2 is compatible with the record that was read, and that it would be used to deserialize the record. But now, that is not happening. What is the point of passing the URL?
How do I make my consumer in such a way that they don't need to know the schema beforehand, but use the writer schema from the registry?
Additionally, I tried to follow this post to write my own deserialization logic, but it fails with a kryo exception. 😞
Can someone please elaborate on this?Sumit Nekar
01/19/2023, 5:04 AM2023-01-19 02:04:13,975 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: streaming-application streaming-application-resourcemanager-leader (dc2d523f-ffba-493c-9192-e0e4ddc8d299)'
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get] for kind: [ConfigMap] with name: [streaming-application-resourcemanager-leader] in namespace: [streaming-application] failed.
at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
Sumit Aich
01/19/2023, 7:24 AMnick christidis
01/19/2023, 10:24 AM---- before introduction of flink kubernetes operator ----
A component in the platform I am working, is providing framework code so that a user could designate-instruct all jobs of job manager to gracefully shutdown with a savepoint.
What is doing pretty much behind the scenes is to leverage the Flink Rest API, with the following order:
1. it is calling the GET /jobs endpoint of the jobmanager to get all the job ids [see attached screen]
2. and then for every job id was stopping the job with POST /jobs/:jobid/stop [see attached screen]
So everything was working as expected.
But....
---- after introduction of flink kubernetes operator -----
because the state is passed in the yaml (eg: state: running), and get managed from Control Plane of k8s, etc.
the above does not work -- seems to getting ignored from flink kubernetes operator, so whole above mentioned mechanics
are not working in a flink kubernetes operator deployment setup.
So my question is the following:
Is there a similar API provided from kubernetes operator so that we can designate job manager to do things we wish (eg: stop jobs) and solve the above issue?Sami Badawi
01/19/2023, 11:45 AMYaroslav Bezruchenko
01/19/2023, 11:48 AMAman Sharma
01/19/2023, 3:32 PMMatyas Orhidi
01/19/2023, 6:01 PMHeartbeat of TaskManager with id <app-name>-2-1 timed out.
Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
Yang LI
01/19/2023, 7:31 PMorg.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
Do you know what could be the cause of this? 🙏Krish Narukulla
01/19/2023, 10:08 PMinsert into table1(col1) select null;
Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 159 to line 1, column 162: Illegal use of 'NULL'
at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
at org.apache.flin
Nathanael England
01/19/2023, 10:31 PMIkvir Singh
01/20/2023, 12:52 AMERROR org.apache.flink.connector.kafka.sink.KafkaCommitter - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@f2f21e9) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.
org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state
I did not have this error in Flink 1.14. Is this a known bug?abhishek sidana
01/20/2023, 5:36 AMAbdul Rafehi
01/20/2023, 8:37 AM/opt/flink/lib
. No issues with postgres driver & the flink jdbc connector, but we're having issues with flink-connector-kinesis:4.0.0-1.16
(running on Flink image flink:1.16.0-scala_2.12-java11
).
Adding this jar to the lib folder gives the following error:
jobmanager_1 | java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
jobmanager_1 | at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
jobmanager_1 | at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
jobmanager_1 | at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
jobmanager_1 | at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:152) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
jobmanager_1 | at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:280) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
jobmanager_1 | at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:412) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
Any thoughts on what might be causing this?Sami Badawi
01/20/2023, 8:54 AMSudhan Madhavan
01/20/2023, 9:53 AMchunilal kukreja
01/20/2023, 3:33 PM