Amir Hossein Sharifzadeh
05/05/2023, 5:26 PMDanila Maksimenko
05/05/2023, 6:41 PMDanila Maksimenko
05/05/2023, 6:54 PM<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:<postgresql://postgres:5432/metastore></value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.postgresql.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>true</value>
</property>
<property>
<name>metastore.thrift.port</name>
<value>9083</value>
</property>
<property>
<name>aws.region</name>
<value>us-west-2</value>
</property>
<property>
<name>metastore.task.threads.always</name>
<!-- <value>org.apache.hadoop.hive.metastore.events.EventCleanerTask,org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask</value>-->
<value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value>
</property>
<property>
<name>metastore.expression.proxy</name>
<value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value><s3://tenant1/tenant1/data/></value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>sql-demo</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>demo-sql</value>
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>true</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3.access.key</name>
<value>sql-demo</value>
</property>
<property>
<name>fs.s3.secret.key</name>
<value>demo-sql</value>
</property>
<property>
<name>fs.s3.connection.ssl.enabled</name>
<value>true</value>
</property>
<property>
<name>fs.s3.path.style.access</name>
<value>true</value>
</property>
<property>
<name>s3.access.key</name>
<value>sql-demo</value>
</property>
<property>
<name>s3.secret.key</name>
<value>demo-sql</value>
</property>
<property>
<name>s3.connection.ssl.enabled</name>
<value>true</value>
</property>
<property>
<name>s3.path.style.access</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value><thrift://localhost:9083></value>
</property>
<property>
<name>hive.metastore.client.factory.class</name>
<value>com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory</value>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
</configuration>
Danila Maksimenko
05/05/2023, 6:55 PM<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.s3.endpoint</name>
<value><http://minio:9000></value>
</property>
<property>
<name>hive.s3.aws-access-key</name>
<value>hive</value>
</property>
<property>
<name>hive.s3.aws-secret-key</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:<postgresql://postgres:5432/metastore></value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.postgresql.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hive</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
</property>
<property>
<name>hive.metastore.schema.verification</name>
<value>true</value>
</property>
<property>
<name>metastore.thrift.port</name>
<value>9083</value>
</property>
<property>
<name>metastore.task.threads.always</name>
<!-- <value>org.apache.hadoop.hive.metastore.events.EventCleanerTask,org.apache.hadoop.hive.metastore.MaterializationsCacheCleanerTask</value>-->
<value>org.apache.hadoop.hive.metastore.events.EventCleanerTask</value>
</property>
<property>
<name>aws.region</name>
<value>us-west-2</value>
</property>
<property>
<name>metastore.expression.proxy</name>
<value>org.apache.hadoop.hive.metastore.DefaultPartitionExpressionProxy</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value><s3://tenant1/tenant1/data/></value>
</property>
<property>
<name>fs.s3a.connection.ssl.enabled</name>
<value>true</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
<property>
<name>fs.s3.access.key</name>
<value>sql-demo</value>
</property>
<property>
<name>fs.s3.secret.key</name>
<value>demo-sql</value>
</property>
<property>
<name>fs.s3.connection.ssl.enabled</name>
<value>true</value>
</property>
<property>
<name>fs.s3.path.style.access</name>
<value>true</value>
</property>
<property>
<name>s3.access.key</name>
<value>sql-demo</value>
</property>
<property>
<name>s3.secret.key</name>
<value>demo-sql</value>
</property>
<property>
<name>s3.connection.ssl.enabled</name>
<value>true</value>
</property>
<property>
<name>s3.path.style.access</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value><thrift://localhost:9083></value>
</property>
<property>
<name>hive.metastore.client.factory.class</name>
<value>com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory</value>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>hive.s3.endpoint</name>
<value><http://minio:9000></value>
</property>
</configuration>
Vladimir Tiukhtin
05/06/2023, 9:26 PMkubernetes.container.image.ref
or kubernetes.pod-template-file.taskmanager
has no effect? Job manager still prefers to spin up task managers with some default image ignoring the values from configAlex Brekken
05/06/2023, 10:02 PMflinkConfiguration:
restart-strategy.type: "failure-rate"
restart-strategy.failure-rate.max-failures-per-interval: "10"
restart-strategy.failure-rate.failure-rate-interval: "1 min"
restart-strategy.failure-rate.delay: "5 s"
And here is the log output when the job is submitted:
INFO org.apache.flink.runtime.jobmaster.JobMaster - Using restart back off time strategy FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, backoffTimeMS=1000) for Flink Streaming Job (000000005a6cddc20000000000000001).
Any ideas? Do I have it configured incorrectly?Brandon
05/07/2023, 12:35 AMTommaso Garuglieri
05/07/2023, 2:22 PMTudor Plugaru
05/08/2023, 7:53 AMbulkFlushMaxActions
parameter? Say that due to ES struggling to index a bulk of 1000 actions and slow response, I'm thinking to have Flink adjust the bulk size based on the latency from ES, so if it takes 5s to index 1k actions, next bulk should be 500 actions and so on... Is this doable in Flink?Krzysztof Chmielewski
05/08/2023, 10:02 AMSaleh
05/08/2023, 12:35 PMNoClassDefFoundError
Here's what I did:
1. Run the Maven project archetype:generate
command.
2. cd frauddetection
3. Run mvn clean package
4. Run java -jar target/frauddetection-0.1.jar
What I get:
Error: Unable to initialize main class spendreport.FraudDetectionJob
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
Note that I did not touch any of the project files. I created a Slack account to post this after scouring the internet for two hours and no solution. Any help would be appreciated.Pritam Agarwala
05/08/2023, 12:57 PMconsumer.setCommitOffsetsOnCheckpoints
set to false, then offset will not be committed at all even if the enable.auto.commit
is set to true. So, when consumer.setCommitOffsetsOnCheckpoints
set to false, shouldn't it fall back on the enable.auto.commit
to do offset commit.Alex T
05/08/2023, 1:37 PMKevin L
05/08/2023, 4:57 PMSlicingWindowOperator
which appears to formulate a sliding window with multiple non-overlapping windows and aggregate them together upon emission (source code reference). However, I am having trouble figuring out where this operator is invoked when a hopping/tumbling window is used in the table API. From the comments in the source code, it appears that it’s related but wanted to reach out to see if anyone knows here. Additionally, any further documentation on the window implementation that people could link would be helpful as well Thanks!Iris Grace Endozo
05/09/2023, 11:14 AM<metric name>:<value>|c[|@<sample rate>]
but it seems it's defaulting to "%s:%s|g"
in the above. Wondering if anyone else has hit this issue?Trevor Burke
05/09/2023, 10:07 PMParquetProtoWriters
to allow for adding additional columns.. We'd like to add Flink processingTime
and an eventTime
column but our Protobuf models do not already have these fields, and we do not want to add them to all of our existing modelsSucheth Shivakumar
05/09/2023, 11:42 PMsurface visited
events with visitId
field.
topic2- will have surface visits linked
events which has sourceVisitId
and destinationVisitId
which will basically link 2 surface visiteds event say SV1 and SV2.
requirement is to stream events from both the topics and copy the attributes of SV1 to SV2 using surface visits linked event coming from topic2.
we can think of window as 30 minsChristophe Bornet
05/09/2023, 11:46 PMChristophe Bornet
05/09/2023, 11:47 PM2023-05-09 23:42:36,211 INFO org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=000000000f85d61c0000000000000001.
org.apache.flink.table.api.TableException: Failed to execute sql
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:867)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:827)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:918)
at org.apache.flink.table.api.internal.TablePipelineImpl.execute(TablePipelineImpl.java:56)
at com.example.flink.SqlRunner.call(SqlRunner.java:123)
at com.example.flink.SqlRunner.call(SqlRunner.java:38)
at picocli.CommandLine.executeUserObject(CommandLine.java:2041)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2461)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2453)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2415)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2273)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2417)
at picocli.CommandLine.execute(CommandLine.java:2170)
at com.example.flink.SqlRunner.main(SqlRunner.java:94)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:107)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.util.FlinkException: Failed to execute job 'insert-into_pulsar.test-12/flink.temp-925ce66e-9c15-4b64-bbb1-1099060712fc'.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:206)
at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:95)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:850)
... 26 more
Caused by: org.apache.flink.util.FlinkException: Could not upload job files.
at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:86)
at org.apache.flink.runtime.client.ClientUtils.extractAndUploadJobGraphFiles(ClientUtils.java:62)
at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.lambda$submitJob$6(EmbeddedExecutor.java:177)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1277)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.complete(Unknown Source)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: java.io.FileNotFoundException: /tmp/flink-web-878fc539-68bb-46d9-b0e4-4720c05b75f3/flink-web-upload/c86d309c-2108-45e3-840e-f8434bcef27c_flink-sql-runner-0.0.1-SNAPSHOT.jar (No such file or directory)
at java.base/java.io.FileInputStream.open0(Native Method)
at java.base/java.io.FileInputStream.open(Unknown Source)
at java.base/java.io.FileInputStream.<init>(Unknown Source)
at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134)
at org.apache.flink.runtime.blob.BlobClient.uploadFile(BlobClient.java:405)
at org.apache.flink.runtime.client.ClientUtils.uploadUserJars(ClientUtils.java:113)
at org.apache.flink.runtime.client.ClientUtils.uploadAndSetUserJars(ClientUtils.java:105)
at org.apache.flink.runtime.client.ClientUtils.uploadJobGraphFiles(ClientUtils.java:83)
... 48 more
2023-05-09 23:42:36,249 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting KubernetesSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
Stream closed EOF for default/session-cluster-d795455b-j86nh (flink-main-container)
I see that the session cluster started a task manager pod when I submitted the job.
But it seems the session cluster should send the application jar to it but doesn't find it ?
I also see that the session cluster starts to execute part of the SQL queries before the error occurs. Does it use the TM only when it gets to an INSERT/SELECT query ?刘路
05/10/2023, 6:26 AM刘路
05/10/2023, 6:26 AM刘路
05/10/2023, 6:27 AMSumit Singh
05/10/2023, 7:51 AM[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Catalog for 'org.apache.flink.connector.jdbc.dialect.MySQLDialect@5b4e61ef' is not supported yet.
Abolfazl Ghahremani
05/10/2023, 8:58 AMTsering
05/10/2023, 10:42 AMProcessFunction
and after that process function i just sink the data, the problem is that i am facing unevenly distribution of data across the parallelism, can someone please enlight me on this?Dheeraj Panangat
05/10/2023, 11:48 AMAbolfazl Ghahremani
05/10/2023, 2:42 PMTsering
05/10/2023, 5:03 PMControl the Guh
05/10/2023, 6:10 PMCody Lightfoot
05/11/2023, 12:10 AM