Jason Politis
03/17/2023, 9:49 PMRion Williams
03/19/2023, 4:02 AMAviv Dozorets
03/19/2023, 2:26 PMflink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_loopFrequencyHz
flink_taskmanager_job_task_mailboxLatencyMs
this is in flink-conf.yaml:
metrics.reporter.prom.filter.excludes: "*:KinesisConsumer*, *:mailboxLatencyMs:*"
However, when both values are present 0 metrics are emitted. When only one, the filter is applied and metric is filtered out. Looks like some syntax issue that i’m missing.Kiran Shivappa Japannavar
03/20/2023, 6:03 AMKiran Shivappa Japannavar
03/20/2023, 6:03 AMCaused by: org.apache.flink.util.FlinkRuntimeException: The sorter is closed already
at org.apache.flink.runtime.operators.sort.CircularQueues.take(CircularQueues.java:89)
at org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:68)
at org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:88)
at org.apache.flink.runtime.operators.sort.ExternalSorterBuilder$1.writeRecord(ExternalSorterBuilder.java:199)
at org.apache.flink.streaming.api.operators.sort.SortingDataInput$ForwardingDataOutput.emitRecord(SortingDataInput.java:172)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.api.operators.sort.SortingDataInput.emitNext(SortingDataInput.java:193)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)
Kiran Shivappa Japannavar
03/20/2023, 6:04 AMkingsathurthi
03/20/2023, 7:23 AM2023-03-16 15:32:14,402 INFO org.apache.flink.runtime.filecache.FileCache [] - User file cache uses directory /tmp/flink-dist-cache-95e40d9c-1117-41d4-9633-ca1bf329f5b6
2023-03-16 15:32:17,491 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1(9600fae37136677bec744b7260164c38)>.
2023-03-16 15:32:17,558 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:17,564 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:17,571 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:32:27,589 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:27,589 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:27,590 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:32:37,610 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:37,610 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:37,610 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:32:47,629 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:47,629 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:47,629 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:32:57,649 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:57,649 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:57,649 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:07,668 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:07,669 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:07,669 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:17,691 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:17,691 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:17,692 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:27,710 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:27,710 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:27,711 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:37,728 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:37,729 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:37,729 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:47,749 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:47,749 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:47,750 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:57,769 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:57,769 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:57,770 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:34:07,788 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:34:07,789 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:34:07,789 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:34:17,808 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
Tsering
03/20/2023, 9:28 AMbuild.gradle
when deploying flink app on KDA?Slackbot
03/20/2023, 11:21 AMFredrik
03/20/2023, 11:23 AMparallelism
setting in the job
-spec supposed to work?
If I change the replica count on the taskManager
or change the taskmanager.numberOfTaskSlots: "2"
but keep the job.parallelism
setting as is, the parallelism of the job (according to the UI) changes.
Changing the job.parallelism
does not seem to have an effect.
I tried to read the documentation for the CRD but it did not clarify the issue.
Am I misunderstanding something?Mohit Aggarwal
03/20/2023, 2:00 PM2023-03-20 13:53:34,477 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore.
2023-03-20 13:53:34,478 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2720) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
... 4 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
... 4 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
Has anyone faced a similar issue ?Thijs van de Poll
03/20/2023, 2:03 PMRafał Trójczak
03/20/2023, 2:49 PM15:05:22.357 [main] INFO o.a.f.a.java.typeutils.TypeExtractor -- Field Person#hobbies will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
I have the following class:
public class Person {
private String name;
private List<String> hobbies;
public Person() {
}
public Person(String name, List<String> hobbies) {
this.name = name;
this.hobbies = hobbies;
}
// getters and setters
}
and I prepared this `TypeInformation`:
TypeInformation<Person> personTypeInformation = Types.POJO(Person.class, Map.of(
"name", Types.STRING,
"hobbies", Types.LIST(Types.STRING)));
I saw a few options that don't work for me:
• @TypeInfo(PersonSerializerFactory.class)
- but I can't use this approach because the Person
class is in a different module.
• returns
method, e.g.:
env.fromCollection(persons)
.returns(personTypeInformation)
but this doesn't seem to remove the problem.
How can I add this type information to the environment?Siddhesh Kalgaonkar
03/20/2023, 6:09 PMUsman Ismail
03/20/2023, 9:37 PMCaused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://localhost:9200>, response=HTTP/1.1 200 OK}
at org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
....
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:208)
at org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
at org.opensearch.action.index.IndexResponse.<init>(IndexResponse.java:67)
This seems to be known issue for https://github.com/elastic/elasticsearch/issues/84173 elastic search but I don’t have clear path for open searchSimon Lawrence
03/21/2023, 9:58 AMThijs van de Poll
03/21/2023, 10:21 AMflink-connector-opensearch
connector. However, it fails with the following error: java.lang.ClassNotFoundException: org.opensearch.common.Strings
. I am unsure what is causing it, because I have been loading the .jar
dependency to ${FLINK_HOME}/lib
. Can anyone help me out? 🙂Thijs van de Poll
03/21/2023, 11:46 AMCaused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://host.docker.internal:9200>, response=HTTP/1.1 200 OK}
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:396)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:390)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
jobmanager | ... 1 more
jobmanager | Caused by: java.lang.NullPointerException
jobmanager | at java.base/java.util.Objects.requireNonNull(Unknown Source)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse.<init>(UpdateResponse.java:86)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:193)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:181)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:172)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:208)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2075)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1836)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1940)
jobmanager | ... 18 more
Oscar Perez
03/21/2023, 1:06 PMOscar Perez
03/21/2023, 1:07 PMOscar Perez
03/21/2023, 1:12 PMOscar Perez
03/21/2023, 1:19 PMAri Huttunen
03/21/2023, 1:44 PMpackage fi.elisa.datalake.flink.flinktools.aggregation;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.annotation.DataTypeHint;
import static org.apache.flink.table.api.Expressions.*;
import com.tdunning.math.stats.MergingDigest;
public class MergingDigestMedian extends AggregateFunction<Double, MergingDigest> {
@Override
public @DataTypeHint("RAW") MergingDigest createAccumulator() {
return new MergingDigest(100); // 100 is a common value for normal uses.
}
@Override
public Double getValue(@DataTypeHint("RAW") MergingDigest acc) {
return acc.quantile(0.5);
}
public void accumulate(@DataTypeHint("RAW") MergingDigest acc, Double value) {
acc.add(value);
}
}
I'm calling it in pyflink code by defining it as a temporary system function.
table_env.create_java_temporary_system_function("udf_median", "fi.elisa.datalake.flink.flinktools.aggregation.MergingDigestMedian")
It fails. I'll put the logs in the comments.Raghunadh Nittala
03/21/2023, 1:45 PM[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Temporal Table Join requires primary key in versioned table, but no primary key can be found
. The table has primary key but when view is created on top of it, there is no primary key.Virender Bhargav
03/21/2023, 4:34 PMExistingSavepoint savepoint = Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend());
DataSet<KeyedState> keyedState = savepoint.readKeyedState(""uid1"", new CoreProcessStateReaderFunction());
BootstrapTransformation<KeyedState> transformation = OperatorTransformation
.bootstrapWith(keyedState)
.keyBy(state -> state.id )
.transform(new StateBootstrapper());
Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend())
.withOperator("uid1", transformation)
.removeOperator("uid1")
.write(newSavePointPath);
The intent is to perform a "transformation" on KeyedState and replace older operate state with modified one.
• CoreProcessStateReaderFunction : KeyedStateReaderFunction to read existing savepoint
• StateBootstrapper : KeyedStateBootstrapFunction for state modification/cleanup
I end up with an empty new savepoint(it has only _metadata folder and nothing else). Can someone help me with what I might be doing wrong?Huaqiang Kang
03/21/2023, 7:59 PMauth-no-challenge
in Flink ?Amir Hossein Sharifzadeh
03/21/2023, 9:15 PMstatic void processWorkflow(
StreamTableEnvironment tableEnv,
DataStream<DataFileChunk> rawDataStream,
DataStream<DataFileChunk> bkgdDataStream,
String jsonCalibrationData
) {
tableEnv.createTemporaryView("EMPAD_RAW_TBL", rawDataStream);
tableEnv.createTemporaryView("EMPAD_BKGD_TBL", bkgdDataStream);
String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_BKGD_TBL.data as bkgd_enc_data " +
"FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
Table resultTable =
tableEnv.sqlQuery(data_query);
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
and this is the implementation of my ProcessFunction:
package org.varimat.process;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
public class EMPADProcessor extends ProcessFunction<Row, Row> {
@Override
public void processElement(Row row, ProcessFunction<Row, Row>.Context context,
Collector<Row> collector) throws Exception {
// process row here...
System.out.println(row.getField(0));
String rawObject = String.valueOf(row.getField(1));
String bkgdObject = String.valueOf(row.getField(2));
}
}
When I run my application, it never stops and prints duplicated data of row.getField(0)
in the processElement
method. Is there any suggestion? In other words, do I need to implement class EMPADProcessor
different?Ben Thomson
03/21/2023, 9:24 PMThijs van de Poll
03/22/2023, 6:36 AMD = select * from A left join B on <http://a.pk|a.pk>=<http://b.fk|b.fk> left join C on <http://a.pk|a.pk>=<http://c.fk|c.fk>
• table E: E = select <some aggregations> from D group by D.group_key
• table F: F = select *, <some expensive transformations> from E
• F gets inserted in Elasticsearch.
So what I notice is that because of the left joins, and some events arriving later than others, table F get’s updated a couple of times for the same key. Which is logical I think. However, since the transformations made to create F are expensive, it recomputes that over and over again.
For me it would be better if there was some sort of timed buffer on which events to E could arrive such that the expensive transformations do not need to be calculated on every event updating E. I am trying to understand if windows can help me out here, but I am unsure. Also, it is very important that the aggregations in E do contain all of the group elements.
Thanks!chenlu
03/22/2023, 9:00 AM