kingsathurthi
03/09/2023, 8:42 AMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: test_job
spec:
deploymentName: job_1
job:
jarURI: <https://repo.com/jar_1.jar>
parallelism: 1
upgradeMode: savepoint
state: running
savepointTriggerNonce: 0
entryClass: <entryClass>
args:
- "-false,localhost,8081"
- "-1,<service_name>,50051,3,30,false"
Job 2
apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkSessionJob
metadata:
name: test_job
spec:
deploymentName: job_2
job:
jarURI: <https://repo.com/jar_1.jar>
parallelism: 1
upgradeMode: savepoint
state: running
savepointTriggerNonce: 0
entryClass: <entryClass>
args:
- "-false,localhost,8082"
- "-1,<service_name>,50051,3,30,false"
Raghunadh Nittala
03/09/2023, 8:57 AMSELECT event_id, event_type, data_source_type FROM event_changes
Here event_id, event_type, data_source_type are the primary key fields in the table. When run in the sql client shell, this query is giving proper results. When run as part of the job throws below error:
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$1*' doesn't support consuming update and delete changes which is produced by node ChangelogNormalize(key=[event_id, event_type, data_source_type])
This is observed only when I run using tableEnv.sqlQuery() method, when I tried with tableEnv.executeSql() method, the query is running fine and I’m able to see the TableResult type. I am trying to create a DataStream out of the result so using .sqlQuery() method. Any inputs on this is appreciated. Thanks in advance.Mali
03/09/2023, 10:43 AMLuis Calado
03/09/2023, 1:32 PMAbhinav sharma
03/09/2023, 1:39 PMJeesmon Jacob
03/09/2023, 3:58 PMFrancisco Morillo
03/09/2023, 4:58 PMAndreas Kunze
03/09/2023, 5:45 PMclass MyCustomMetric extends RichMapFunction[String, String] {
@transient private var valueToExpose = 0
override def open(parameters: Configuration): Unit = {
getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricKey", "MyMetricValue")
.gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
}
override def map(value: String): String = {
valueToExpose += 1
return value
}
}
First of all, from my understanding, the addGroup(key, value)
will define a User-Variable and this will be exported to Prometheus as a label, is that correct?
I configured the necessary settings for exporting metrics to Prometheus and it's working for the default Flink system metrics.
However, I have some problems with my custom metric:
1. When using the code above as it is, the metric is not accessible at all, neither from the Flink Web UI nor from Prometheus
2. Without using the addGroup(key, value)
part the metric is accessible from the Flink Web UI and Prometheus but, of course, with any additional custom label
Can anyone help me understand what I'm doing wrong or what's missing here?Reme Ajayi
03/09/2023, 6:47 PMmemory.size
set to 4096. However, I can't set config properties on kinesis because kinesis does not allow flink-conf.yaml
. This is the error stack trace below. Does any have suggestions on how to resolve this?
2023-03-09 12:40:24
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_7e145440e6a4bf9d4540b5d7293494cc_(1/1) from any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
... 11 more
Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:296)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
... 13 more
Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:672)
at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:521)
at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:292)
... 18 more
Ricco Førgaard
03/10/2023, 7:52 AMspec.taskManager.podTemplate.spec.containers
. Values from the base get overwritten in the output.
My setup is 3 levels of FlinkDeployment:
• base (common to all jobs)
• job base (common to all environments for a given job)
• overlay (configuration for a particular job in a particular environment)
In base I would for example mount a configmap to get a Hadoop config, in job base I would reference a common configmap, and in the overlay I would set resources for the pod.
I added the openapi
reference to the CRD definition, too.
I did a quick test with a regular Kubernetes deployment and it seems to work as I expect.
Any tips?Jiri Holusa
03/10/2023, 12:30 PMjava.net.ConnectException: Connection refused
. That makes sense. I digged through the code and it seems that the script metric-server.sh is actually never called anywhere and I also don't see it mentioned in the README. So I guess it's not a surprise that it fails. Questions:
• Am I missing something?
• Is this a bug in the Nexmark benchmark? How was it ever supposed to run?
• If it is a bug, it seems that the Nexmark benchmark is actually not really used, is it? Are there better ways how to test Flink from a performance perspective?Konstantinos Samaras-Tsakiris
03/10/2023, 1:57 PMdeviceTimestamp
as ROWTIME:
streamTableEnv.create_table("source",
.schema(Schema.new_builder()
.column('deviceTimestamp' ...)
...
.watermark('deviceTimestamp', "deviceTimestamp - INTERVAL '0.04' SECOND")
2. then join it with the output of a UDTF that adds a new sampleTimestamp
column
3. then try to perform a window operation on the sampleTimestamp
column
I realize that I can't do this because windowing can only happen on the ROWTIME column.
Do you know how I can transform the ROWTIME column, to be sampleTimestamp
instead of deviceTimestamp
?Reme Ajayi
03/10/2023, 2:51 PMReme Ajayi
03/10/2023, 5:57 PMGerald Schmidt
03/10/2023, 6:26 PMYufei Chen
03/10/2023, 11:26 PMapiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
kind: FlinkDeployment
metadata:
name: myauto
spec:
image: <http://ghcr.io/apache/flink-docker:1.17-snapshot-scala_2.12-java11-debian|ghcr.io/apache/flink-docker:1.17-snapshot-scala_2.12-java11-debian>
flinkVersion: v1_17
flinkConfiguration:
kubernetes.operator.job.autoscaler.enabled: "true"
kubernetes.operator.job.autoscaler.scaling.enabled: "true"
kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
kubernetes.operator.job.autoscaler.metrics.window: "1m"
kubernetes.operator.job.autoscaler.vertex.max-parallelism: "16"
kubernetes.operator.job.autoscaler.vertex.min-parallelism: "1"
kubernetes.operator.job.autoscaler.catch-up.duration: "1m"
pipeline.max-parallelism: "24"
taskmanager.numberOfTaskSlots: "8"
execution.checkpointing.interval: "1m"
serviceAccount: flink
Amir Hossein Sharifzadeh
03/10/2023, 11:38 PMKyle Ahn
03/11/2023, 12:32 AMSumit Nekar
03/11/2023, 5:16 AMCaused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: The server disconnected before a response was received.
Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
Yonatan Veksler
03/11/2023, 3:08 PM./bin/start-cluster.sh
I can't open the web-ui on localhost:8081
I'm using this version: flink-1.16.1-bin-scala_2.12.tgz
.Amir Hossein Sharifzadeh
03/12/2023, 1:27 AMdefineWorkflow(
tableEnv,
customerStream,
transactionStream,
workflow -> workflow.sinkTo(new PrintSink<>()));
sinkTo, passes dataStream for print (PrintSink). Should I create a new class (i.e. DataProcessingSink implements Sink) and override createWriter
method?Junqi Xie
03/13/2023, 5:49 AMERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.1.jar:1.16.1]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.16.1.jar:1.16.1]
... 4 more
From previous discussions I learned this might be related to permission issues, so I tried the following fix:
securityContext:
runAsUser: 9999
runAsGroup: 9999
fsGroup: 9999
under podTemplate
. But it's still not working and getting the same error.
I'm wondering if I'm missing any configurations, or if anything not default should be applied to minikube.
Any help is appreciated and thanks in advance.
I'm using minikube v1.25.2 and Flink v1.16.1, if it matters.Amenreet Singh Sodhi
03/13/2023, 7:44 AMThijs van de Poll
03/13/2023, 8:54 AMThe program finished with the following exception:
org.apache.flink.util.FlinkException: Could not stop with a savepoint job "6a334909c5d48819afce473ded3fd599".
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:587)
at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1033)
at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:570)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1102)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1165)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1165)
Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:583)
... 9 more
Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at java.base/java.util.concurrent.CompletableFuture.encodeRelay(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeRelay(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:261)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
at akka.dispatch.OnComplete.internal(Future.scala:299)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:550)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2096)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1062)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: pg_source[1] -> ConstraintEnforcer[2] -> Sink: print_sink[2] (1/1)#1 Failure reason: Task has failed.
at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1375)
at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1318)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1122)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:861)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:786)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
... 1 more
Note: I can create a savepoint manually, then cancel the job, and restart the job from the manually created endpoint.
Any ideas why this happens and how to resolve? Thanks!Vitalii
03/13/2023, 9:41 AMpiby 180
03/13/2023, 10:54 AMDheeraj Panangat
03/13/2023, 1:26 PMThijs van de Poll
03/13/2023, 1:43 PMgroup_id
. I would like to compute some statistic over the group, and it to the record that is being streamed to Iceberg.
If I understand the documentation correctly (not sure?), there is different approaches to computing statistics on windows. They, however, all depend on a slice of data being available (either defined by a time interval or a row count interval) to compute the aggregations.
Would there be a way to ensure that the calculations are computed over the entire group? Thanks! 🙂Andrew Otto
03/13/2023, 1:57 PMMali
03/13/2023, 2:27 PMtable_env.execute_sql("""CREATE CATALOG flink_catalog WITH (
'type'='iceberg',
'warehouse'='<s3://test/test1/test2>,
'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')""")
But i am getting this error;
table_env.execute_sql(“”"CREATE CATALOG flink_catalog WITH (
File “/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py”, line 836, in execute_sql
File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1321, in call
File “/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py”, line 146, in deco
File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py”, line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o54.executeSql.
: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:207)
at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:135)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:413)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1426)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1172)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
... 17 more
I think It is not seeing the jars the iceberg jars needs to be inside of /opt/flink/lib right ?