Or Keren
05/25/2023, 7:33 AMSumit Nekar
05/25/2023, 10:55 AM# Restart of unhealthy job deployments by flink-kubernetes-operator
# <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.3/docs/custom-resource/job-management/#restart-of-unhealthy-job-deployments>
kubernetes.operator.cluster.health-check.enabled: true
kubernetes.operator.cluster.health-check.restarts.threshold: 2
kubernetes.operator.cluster.health-check.restarts.window: 15 min
kubernetes.operator.job.restart.failed: true
As per the above configs, if the job has restarted more than 2 times, flink operator is redeploying the job with available state. That means in application mode, new JM pod is coming up after the running job has restarted 2 times within 15 min . Need some clarifications on this.
1. In case job is not able to recover at all, will flink operator continue restarting forever? Is there any threshold count after which flink operator gives up this activity?
2. How the restartegy-startegy configured at JM level is honoured by flink operator in this case?
3. flink operator referes āflink_jobmanager_job_numRestartsā metric to decide if restartCount threshold is breached but everytime operator redeploys the job , new JM comes and flink_jobmanager_job_numRestarts value starts from 0 again and flink operator continues to redeploy after every 2 numRestarts . In this case, when the job will be marked as failed?Oleksandr Nitavskyi
05/25/2023, 12:15 PMPluginClassLoader
, see details in š§µ
Removal monitorInterval
introduced in FLINK-20510 helps to mitigate the issue.
I wonder if it is a known issue and should Flink provide any workaround:
1. Disable monitorInterval
by default in log configuration, so users who do not tune Flink doesnāt hit memory leak
2. Add some mechanism on unload, which properly stops such Log4jThread.
cc @Nicolas Fraisonslowratatoskr
05/25/2023, 3:21 PMCarlos Santos
05/25/2023, 5:43 PMRichAsyncFunction
is missing an onError method.Jeremy Ber
05/25/2023, 9:29 PMtrue
, but iāve purposely set it all to false
. I run this app for some time to gather checkpoints, then i read that checkpoint into my StateProcessorAPI app.
Within that app, I want to read the state and modify some elements to true
, so when I restore the state, some keys will emit results. However, after modifying the state and reading this back in, there appears to be no elements in my written checkpoint.
The examples of reading / modifying and writing state seem disconnected. Is there any end-to-end example of reading state, modifying, then writing it back out I can refer to? Or am I using this incorrectly?Amir Hossein Sharifzadeh
05/26/2023, 3:38 AMsetParallelism
, EmbeddedRocksDBStateBackend,...
)?Sudhan Madhavan
05/26/2023, 6:52 AMJan Kevin Dick
05/26/2023, 9:25 AMclose
Method the sessions are not closed if an exception happens during the call to flush()
is this correct? Or should there be the connectionProvider.closeConnection()
call be put into an finally
block?ē°ęå
05/26/2023, 10:37 AMSlackbot
05/26/2023, 10:53 AMē°ęå
05/26/2023, 11:21 AMē°ęå
05/26/2023, 11:36 AMOscar Perez
05/26/2023, 1:29 PMOscar Perez
05/26/2023, 1:58 PMPritam Agarwala
05/26/2023, 4:59 PMVitor Leal
05/26/2023, 5:16 PMflink-sqlclient
, which sends SQL queries to Apache Flink (init script):
flink-jobmanager:
build:
dockerfile: ./apache-flink/Dockerfile
ports:
- '8081:8081'
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
flink-taskmanager:
build:
dockerfile: ./apache-flink/Dockerfile
depends_on:
- flink-jobmanager
command: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
flink-sqlclient:
build:
dockerfile: ./apache-flink/Dockerfile
command: bin/sql-client.sh -f init.sql
depends_on:
- flink-taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
rest.address: flink-jobmanager
it says "Execute statement succeed" for each statement in init.sql
but nothing is actually created? Also if I change
jobmanager.rpc.address: flink-jobmanager
rest.address: flink-jobmanager
to gibberish:
jobmanager.rpc.address: adsasd
rest.address: asdasdf
it still works somehowē°ęå
05/27/2023, 1:09 AMē°ęå
05/27/2023, 4:21 AMOr Keren
05/27/2023, 7:45 AMMohan M
05/29/2023, 4:42 AMSuparn Lele
05/29/2023, 6:51 AMjava.lang.RuntimeException: Failed to fetch next result
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
at scala.collection.TraversableOnce$<http://class.to|class.to>(TraversableOnce.scala:310)
at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$<http://1.to|1.to>(CloseableIterator.scala:35)
...
Cause: java.io.IOException: Failed to fetch job execution result
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
...
Cause: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.hasNext(CloseableIterator.scala:36)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at org.apache.flink.streaming.api.scala.CloseableIterator$$anon$1.foreach(CloseableIterator.scala:35)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
...
Cause: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:614)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1983)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174)
at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
...
Cause: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:228)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:218)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:209)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:679)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444)
at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
...
Cause: com.esotericsoftware.kryo.KryoException: Unable to find class: sun.reflect.GeneratedConstructorAccessor88
Serialization trace:
delegate (sun.reflect.DelegatingConstructorAccessorImpl)
constructorAccessor (java.lang.reflect.Constructor)
_constructor (com.fasterxml.jackson.databind.introspect.AnnotatedConstructor)
_fromLongCreator (com.fasterxml.jackson.databind.deser.std.StdValueInstantiator)
_valueInstantiator (com.fasterxml.jackson.databind.deser.BeanDeserializer)
_rootDeserializers (com.fasterxml.jackson.databind.ObjectMapper)
objectMapper (com.jayway.jsonpath.spi.mapper.JacksonMappingProvider)
mappingProvider (com.jayway.jsonpath.Configuration)
configuration (com.jayway.jsonpath.internal.JsonContext)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:730)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
...
Cause: java.lang.ClassNotFoundException: sun.reflect.GeneratedConstructorAccessor88
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
I am using object to initialize StreamExecutionEnvironment. The code goes like following
def getStreamExecutionEnvironment: StreamExecutionEnvironment = {
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val unmodifiableCollection = Class.forName("java.util.Collections$UnmodifiableCollection")
streamExecutionEnvironment.getConfig.addDefaultKryoSerializer(unmodifiableCollection, classOf[UnmodifiableCollectionsSerializer])
streamExecutionEnvironment
}
I suspect this to be a memory issue. Could someone please help??Keyur Makwana
05/29/2023, 8:55 AMSumit Lakra
05/29/2023, 8:59 AMOscar Perez
05/29/2023, 9:52 AMAri Huttunen
05/29/2023, 12:32 PMs3.connection-timeout
, the hashmap state backend configuration, execution.checkpointing.interval
.
There are these places I could put them in my case
⢠The flink-conf.yaml file
⢠StreamExecutionEnvironment
⢠StreamTableEnvironment
⢠The filesystem SQL connector settings
⢠(The Kafka connector settings)
Usually when the manual explains a setting, it doesn't say where the setting should be placed. It could also have different syntax for different places. Any chance of getting the manual more clear on this?Oscar Perez
05/29/2023, 5:01 PMCREATE TEMPORARY TABLE users_tnc
(
userId STRING,
country STRING,
acceptedTime TIMESTAMP(3),
eventMetadata row(eventId STRING, eventTime TIMESTAMP(3)),
PRIMARY KEY (userId) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'usertnc.v1beta1',
'properties.bootstrap.servers' = 'localhost:9094',
'properties.group.id' = 'ethanol-bmp',
'key.format' = 'raw',
'value.format' = 'raw',
'format' = 'protobuf',
'protobuf.message-class-name' = 'com.test.UserTnCChangedEvent',
'protobuf.ignore-parse-errors' = 'true'
);
we are facing with the following exception:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'protobuf' that implements 'org.apache.flink.table.factories.DeserializationFormatFactory' in the classpath.
We are using flink 1.16 and made sure that the flink protobuf library is in the classpath (packaged as part of the uber jar)
thanks!Yaroslav Bezruchenko
05/29/2023, 8:47 PMwill be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema
On 1.15 it was totally fine and used POJO. Is there something I'm missing here? Why Flink isn't using POJO anymore?Oscar Perez
05/30/2023, 8:08 AMCREATE TEMPORARY TABLE IF NOT EXISTS devices
(
deviceId STRING PRIMARY KEY NOT ENFORCED,
userId STRING,
status STRING,
pairedOn TIMESTAMP,
eventTime TIMESTAMP
)WITH (
'connector' = 'kafka',
'topic' = 'devices',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka-rt:9093',
'properties.group.id' = 'ethanol-test',
'scan.startup.mode' = 'earliest-offset'
);
but when running the select query I get the following error:
org.apache.flink.table.api.ValidationException: The Kafka table 'default_catalog.default_database.devices' with 'json' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.Eli Golin
05/30/2023, 8:23 AMobject ParquetSink {
def parquetFileSink[A <: Message: ClassTag](
assigner: A => String,
config: Config
)(implicit lc: LoggingConfigs): FileSink[A] = {
val bucketAssigner = new BucketAssigner[A, String] {
override def getBucketId(element: A, context: BucketAssigner.Context): String = {
val path = assigner(element)
<http://logger.info|logger.info>(LogMessage(-1, s"Writing file to ${config.getString(baseDirKey)}/$path", "NA"))
path
}
override def getSerializer: SimpleVersionedSerializer[String] = SimpleVersionedStringSerializer.INSTANCE
}
def builder(outFile: OutputFile): ParquetWriter[A] =
new ParquetProtoWriters.ParquetProtoWriterBuilder(
outFile,
implicitly[ClassTag[A]].runtimeClass.asInstanceOf[Class[A]]
).withCompressionCodec(config.getCompression(compressionKey)).build()
val parquetBuilder: ParquetBuilder[A] = path => builder(path)
FileSink
.forBulkFormat(
new Path(s"wasbs://${config.getString(baseDirKey)}@${config.getString(accountNameKey)}.<http://blob.core.windows.net|blob.core.windows.net>"),
new ParquetWriterFactory[A](parquetBuilder)
)
.withBucketAssigner(bucketAssigner)
.withOutputFileConfig(
OutputFileConfig
.builder()
.withPartSuffix(".parquet")
.build()
)
.build()
}
}
After deploying the job I get the following exception:
Caused by: java.lang.UnsupportedOperationException: Recoverable writers on AzureBlob are only supported for ABFS
at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.checkSupportedFSSchemes(AzureBlobRecoverableWriter.java:44) ~[?:?]
at org.apache.flink.fs.azure.common.hadoop.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57) ~[?:?]
at org.apache.flink.fs.azurefs.AzureBlobRecoverableWriter.<init>(AzureBlobRecoverableWriter.java:37) ~[?:?]
at org.apache.flink.fs.azurefs.AzureBlobFileSystem.createRecoverableWriter(AzureBlobFileSystem.java:44) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.createRecoverableWriter(PluginFileSystemFactory.java:134) ~[flink-dist-1.18-SNAPSHOT.jar:1.18-SNAPSHOT]
The question is whether I can make any changes to this code/configs to make it work with wasbs protocol and not abfs ?
Tnx everyone.