Jason Politis
03/17/2023, 9:49 PMRion Williams
03/19/2023, 4:02 AMAviv Dozorets
03/19/2023, 2:26 PMflink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_loopFrequencyHz
flink_taskmanager_job_task_mailboxLatencyMs
this is in flink-conf.yaml:
metrics.reporter.prom.filter.excludes: "*:KinesisConsumer*, *:mailboxLatencyMs:*"
However, when both values are present 0 metrics are emitted. When only one, the filter is applied and metric is filtered out. Looks like some syntax issue that i’m missing.Kiran Shivappa Japannavar
03/20/2023, 6:03 AMKiran Shivappa Japannavar
03/20/2023, 6:03 AMCaused by: org.apache.flink.util.FlinkRuntimeException: The sorter is closed already
at org.apache.flink.runtime.operators.sort.CircularQueues.take(CircularQueues.java:89)
at org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:68)
at org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:88)
at org.apache.flink.runtime.operators.sort.ExternalSorterBuilder$1.writeRecord(ExternalSorterBuilder.java:199)
at org.apache.flink.streaming.api.operators.sort.SortingDataInput$ForwardingDataOutput.emitRecord(SortingDataInput.java:172)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.api.operators.sort.SortingDataInput.emitNext(SortingDataInput.java:193)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run(Thread.java:750)Kiran Shivappa Japannavar
03/20/2023, 6:04 AMkingsathurthi
03/20/2023, 7:23 AM2023-03-16 15:32:14,402 INFO org.apache.flink.runtime.filecache.FileCache [] - User file cache uses directory /tmp/flink-dist-cache-95e40d9c-1117-41d4-9633-ca1bf329f5b6
2023-03-16 15:32:17,491 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Connecting to ResourceManager <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1(9600fae37136677bec744b7260164c38)>.
2023-03-16 15:32:17,558 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:17,564 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:17,571 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:32:27,589 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:27,589 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:27,590 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:32:37,610 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:37,610 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:37,610 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:32:47,629 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:47,629 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:47,629 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:32:57,649 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:32:57,649 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:32:57,649 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:07,668 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:07,669 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:07,669 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:17,691 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:17,691 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:17,692 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:27,710 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:27,710 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:27,711 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:37,728 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:37,729 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:37,729 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:47,749 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:47,749 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:47,750 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:33:57,769 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:33:57,769 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:33:57,770 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:34:07,788 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
2023-03-16 15:34:07,789 WARN akka.remote.ReliableDeliverySupervisor [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
2023-03-16 15:34:07,789 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
2023-03-16 15:34:17,808 WARN akka.remote.transport.netty.NettyTransport [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peerTsering
03/20/2023, 9:28 AMbuild.gradle when deploying flink app on KDA?Slackbot
03/20/2023, 11:21 AMFredrik
03/20/2023, 11:23 AMparallelism setting in the job-spec supposed to work?
If I change the replica count on the taskManager or change the taskmanager.numberOfTaskSlots: "2" but keep the job.parallelism setting as is, the parallelism of the job (according to the UI) changes.
Changing the job.parallelism does not seem to have an effect.
I tried to read the documentation for the CRD but it did not clarify the issue.
Am I misunderstanding something?Mohit Aggarwal
03/20/2023, 2:00 PM2023-03-20 13:53:34,477 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore.
2023-03-20 13:53:34,478 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2720) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
... 4 more
Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
... 4 more
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
Has anyone faced a similar issue ?Thijs van de Poll
03/20/2023, 2:03 PMRafał Trójczak
03/20/2023, 2:49 PM15:05:22.357 [main] INFO o.a.f.a.java.typeutils.TypeExtractor -- Field Person#hobbies will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
I have the following class:
public class Person {
private String name;
private List<String> hobbies;
public Person() {
}
public Person(String name, List<String> hobbies) {
this.name = name;
this.hobbies = hobbies;
}
// getters and setters
}
and I prepared this `TypeInformation`:
TypeInformation<Person> personTypeInformation = Types.POJO(Person.class, Map.of(
"name", Types.STRING,
"hobbies", Types.LIST(Types.STRING)));
I saw a few options that don't work for me:
• @TypeInfo(PersonSerializerFactory.class) - but I can't use this approach because the Person class is in a different module.
• returns method, e.g.:
env.fromCollection(persons)
.returns(personTypeInformation)
but this doesn't seem to remove the problem.
How can I add this type information to the environment?Siddhesh Kalgaonkar
03/20/2023, 6:09 PMUsman Ismail
03/20/2023, 9:37 PMCaused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://localhost:9200>, response=HTTP/1.1 200 OK}
at org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
....
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:208)
at org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
at org.opensearch.action.index.IndexResponse.<init>(IndexResponse.java:67)
This seems to be known issue for https://github.com/elastic/elasticsearch/issues/84173 elastic search but I don’t have clear path for open searchSimon Lawrence
03/21/2023, 9:58 AMThijs van de Poll
03/21/2023, 10:21 AMflink-connector-opensearch connector. However, it fails with the following error: java.lang.ClassNotFoundException: org.opensearch.common.Strings . I am unsure what is causing it, because I have been loading the .jar dependency to ${FLINK_HOME}/lib . Can anyone help me out? 🙂Thijs van de Poll
03/21/2023, 11:46 AMCaused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://host.docker.internal:9200>, response=HTTP/1.1 200 OK}
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:396)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:390)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
jobmanager | at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
jobmanager | ... 1 more
jobmanager | Caused by: java.lang.NullPointerException
jobmanager | at java.base/java.util.Objects.requireNonNull(Unknown Source)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse.<init>(UpdateResponse.java:86)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:193)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:181)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:172)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:208)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2075)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1836)
jobmanager | at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1940)
jobmanager | ... 18 moreOscar Perez
03/21/2023, 1:06 PMOscar Perez
03/21/2023, 1:07 PMOscar Perez
03/21/2023, 1:12 PMOscar Perez
03/21/2023, 1:19 PMAri Huttunen
03/21/2023, 1:44 PMpackage fi.elisa.datalake.flink.flinktools.aggregation;
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.annotation.DataTypeHint;
import static org.apache.flink.table.api.Expressions.*;
import com.tdunning.math.stats.MergingDigest;
public class MergingDigestMedian extends AggregateFunction<Double, MergingDigest> {
@Override
public @DataTypeHint("RAW") MergingDigest createAccumulator() {
return new MergingDigest(100); // 100 is a common value for normal uses.
}
@Override
public Double getValue(@DataTypeHint("RAW") MergingDigest acc) {
return acc.quantile(0.5);
}
public void accumulate(@DataTypeHint("RAW") MergingDigest acc, Double value) {
acc.add(value);
}
}
I'm calling it in pyflink code by defining it as a temporary system function.
table_env.create_java_temporary_system_function("udf_median", "fi.elisa.datalake.flink.flinktools.aggregation.MergingDigestMedian")
It fails. I'll put the logs in the comments.Raghunadh Nittala
03/21/2023, 1:45 PM[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Temporal Table Join requires primary key in versioned table, but no primary key can be found. The table has primary key but when view is created on top of it, there is no primary key.Virender Bhargav
03/21/2023, 4:34 PMExistingSavepoint savepoint = Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend());
DataSet<KeyedState> keyedState = savepoint.readKeyedState(""uid1"", new CoreProcessStateReaderFunction());
BootstrapTransformation<KeyedState> transformation = OperatorTransformation
.bootstrapWith(keyedState)
.keyBy(state -> state.id )
.transform(new StateBootstrapper());
Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend())
.withOperator("uid1", transformation)
.removeOperator("uid1")
.write(newSavePointPath);
The intent is to perform a "transformation" on KeyedState and replace older operate state with modified one.
• CoreProcessStateReaderFunction : KeyedStateReaderFunction to read existing savepoint
• StateBootstrapper : KeyedStateBootstrapFunction for state modification/cleanup
I end up with an empty new savepoint(it has only _metadata folder and nothing else). Can someone help me with what I might be doing wrong?Huaqiang Kang
03/21/2023, 7:59 PMauth-no-challenge in Flink ?Amir Hossein Sharifzadeh
03/21/2023, 9:15 PMstatic void processWorkflow(
StreamTableEnvironment tableEnv,
DataStream<DataFileChunk> rawDataStream,
DataStream<DataFileChunk> bkgdDataStream,
String jsonCalibrationData
) {
tableEnv.createTemporaryView("EMPAD_RAW_TBL", rawDataStream);
tableEnv.createTemporaryView("EMPAD_BKGD_TBL", bkgdDataStream);
String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_BKGD_TBL.data as bkgd_enc_data " +
"FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
Table resultTable =
tableEnv.sqlQuery(data_query);
DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
and this is the implementation of my ProcessFunction:
package org.varimat.process;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
public class EMPADProcessor extends ProcessFunction<Row, Row> {
@Override
public void processElement(Row row, ProcessFunction<Row, Row>.Context context,
Collector<Row> collector) throws Exception {
// process row here...
System.out.println(row.getField(0));
String rawObject = String.valueOf(row.getField(1));
String bkgdObject = String.valueOf(row.getField(2));
}
}
When I run my application, it never stops and prints duplicated data of row.getField(0) in the processElement method. Is there any suggestion? In other words, do I need to implement class EMPADProcessor different?Ben Thomson
03/21/2023, 9:24 PMThijs van de Poll
03/22/2023, 6:36 AMD = select * from A left join B on <http://a.pk|a.pk>=<http://b.fk|b.fk> left join C on <http://a.pk|a.pk>=<http://c.fk|c.fk>
• table E: E = select <some aggregations> from D group by D.group_key
• table F: F = select *, <some expensive transformations> from E
• F gets inserted in Elasticsearch.
So what I notice is that because of the left joins, and some events arriving later than others, table F get’s updated a couple of times for the same key. Which is logical I think. However, since the transformations made to create F are expensive, it recomputes that over and over again.
For me it would be better if there was some sort of timed buffer on which events to E could arrive such that the expensive transformations do not need to be calculated on every event updating E. I am trying to understand if windows can help me out here, but I am unsure. Also, it is very important that the aggregations in E do contain all of the group elements.
Thanks!chenlu
03/22/2023, 9:00 AM