https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    Jason Politis

    03/17/2023, 9:49 PM
    Hello team! I have a scenario which is plaguing me. I am using flink sql. I'm sourcing 2 tables from kafka, using debezium-avro-confluent format. I inner join with a filter (the filter is on purpose for this test). I'm sending the results back to kafka, using debezium-avro-confluent because I want all the cdc messages. Meaning, when a message is no longer in the result set I want to see an op 'd'. When a message is updated, I want to see 2 messages, op 'd' and op 'c'. For the most part, this is working, except i'm finding that it's not working 100% of time. In my current test, I'm updating the filtered field so that results will be removed, and out of the 5 that I expect to emit delete messages I only receive 2 or 3 (it's not consistent). The same for when I'm updating any other fields, I expect to receive 5 delete messages followed by 5 create messages. But most of the time I'll only receive anywhere between 2-4 delete messages, whereas I always receive the create messages. After our data gets sent back to kafka, our client is using clickhouse as their data-warehouse. It doesn't currently support deletes from a stream (meaning someone would have to manually manage the data to delete anything). So, we are attempting to use clickhouse's collapsing merge tree. This is why the messages from flink back to kafka must replicate their cdc exactly. I am grateful for any help you may provide, including hints on how to solve our problem another way.
  • r

    Rion Williams

    03/19/2023, 4:02 AM
    Hi all, I’m reaching out today for some suggestions (and hopefully a solution) for a Flink job that I’m working on. The job itself reads JSON strings from a Kafka topic and reads those into JSONObjects (currently via Gson), which are then operated against, before ultimately being written out to Kafka again. The problem here is that the shape of the data can vary wildly and dynamically. Some records may have properties unique to only that record, which makes defining a POJO difficult. In addition to this, the JSONObjects fall by to Kryo serialization which is leading to atrocious throughput. I basically need to read in JSON strings, enrich properties on these objects, and ultimately write them to various sinks. Is there some type of JSON-based class or library or an approach I could use to accomplish this in an efficient manner? Or if possibly a way to partially write a POJO that would allow me to interact with sections/properties of the JSON while retaining other properties that might be dynamically present or unique to the message? Any advice or suggestions would be welcome! I’ll also be happy to provide any additional context if it would help!
    s
    a
    • 3
    • 10
  • a

    Aviv Dozorets

    03/19/2023, 2:26 PM
    Any help here would be very much appreciated: Flink 1.16.1 , trying to filter out multiple metrics, e.g.:
    Copy code
    flink_taskmanager_job_task_operator_KinesisConsumer_stream_shardId_loopFrequencyHz
    flink_taskmanager_job_task_mailboxLatencyMs
    this is in flink-conf.yaml:
    Copy code
    metrics.reporter.prom.filter.excludes: "*:KinesisConsumer*, *:mailboxLatencyMs:*"
    However, when both values are present 0 metrics are emitted. When only one, the filter is applied and metric is filtered out. Looks like some syntax issue that i’m missing.
  • k

    Kiran Shivappa Japannavar

    03/20/2023, 6:03 AM
    I am running a Flink batch job on 1.16.1. The job is reading from GCS, doing a keyBy and keyedProcessFunction (for some business logic) and finally writing the output to the GCS sink. The job is able to read from GCS but failing in second stage (keyBy and process function) with an exception
  • k

    Kiran Shivappa Japannavar

    03/20/2023, 6:03 AM
    Copy code
    Caused by: org.apache.flink.util.FlinkRuntimeException: The sorter is closed already
    	at org.apache.flink.runtime.operators.sort.CircularQueues.take(CircularQueues.java:89)
    	at org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:68)
    	at org.apache.flink.runtime.operators.sort.SorterInputGateway.writeRecord(SorterInputGateway.java:88)
    	at org.apache.flink.runtime.operators.sort.ExternalSorterBuilder$1.writeRecord(ExternalSorterBuilder.java:199)
    	at org.apache.flink.streaming.api.operators.sort.SortingDataInput$ForwardingDataOutput.emitRecord(SortingDataInput.java:172)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    	at org.apache.flink.streaming.api.operators.sort.SortingDataInput.emitNext(SortingDataInput.java:193)
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    	at java.lang.Thread.run(Thread.java:750)
  • k

    Kiran Shivappa Japannavar

    03/20/2023, 6:04 AM
    Any pointers to debug this issue?
  • k

    kingsathurthi

    03/20/2023, 7:23 AM
    Hi All, while istio sidecar proxy is enabled we are getting below error and task manager keep on restarting. How to fix this? how to make istio proxy to exclude Akka resource management communication.?
    Copy code
    2023-03-16 15:32:14,402 INFO  org.apache.flink.runtime.filecache.FileCache                 [] - User file cache uses directory /tmp/flink-dist-cache-95e40d9c-1117-41d4-9633-ca1bf329f5b6
    2023-03-16 15:32:17,491 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to ResourceManager <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1(9600fae37136677bec744b7260164c38)>.
    2023-03-16 15:32:17,558 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:32:17,564 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:32:17,571 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:32:27,589 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:32:27,589 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:32:27,590 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:32:37,610 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:32:37,610 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:32:37,610 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:32:47,629 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:32:47,629 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:32:47,629 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:32:57,649 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:32:57,649 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:32:57,649 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:33:07,668 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:33:07,669 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:33:07,669 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:33:17,691 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:33:17,691 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:33:17,692 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:33:27,710 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:33:27,710 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:33:27,711 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:33:37,728 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:33:37,729 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:33:37,729 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:33:47,749 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:33:47,749 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:33:47,750 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:33:57,769 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:33:57,769 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:33:57,770 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:34:07,788 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
    2023-03-16 15:34:07,789 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@192.168.121.75:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@192.168.121.75:6123>]] Caused by: [The remote system explicitly disassociated (reason unknown).]
    2023-03-16 15:34:07,789 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@192.168.121.75:6123/user/rpc/resourcemanager_1>.
    2023-03-16 15:34:17,808 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/192.168.121.75:6123] failed with java.io.IOException: Connection reset by peer
  • t

    Tsering

    03/20/2023, 9:28 AM
    is there any doc about config of
    build.gradle
    when deploying flink app on KDA?
    d
    • 2
    • 4
  • s

    Slackbot

    03/20/2023, 11:21 AM
    This message was deleted.
    m
    • 2
    • 8
  • f

    Fredrik

    03/20/2023, 11:23 AM
    👋 I’m a newbie trying to figure out how the parallelism is supposed to work using the K8s operator. I’m working based on the basic example. How is the
    parallelism
    setting in the
    job
    -spec supposed to work? If I change the replica count on the
    taskManager
    or change the
    taskmanager.numberOfTaskSlots: "2"
    but keep the
    job.parallelism
    setting as is, the parallelism of the job (according to the UI) changes. Changing the
    job.parallelism
    does not seem to have an effect. I tried to read the documentation for the CRD but it did not clarify the issue. Am I misunderstanding something?
    • 1
    • 1
  • m

    Mohit Aggarwal

    03/20/2023, 2:00 PM
    Hi When I am trying to run Flink job in HA mode with GCS path as a HA directory (eg : gs://flame-poc/ha) or while starting a job from checkpoints in GCS (Job is able to write checkpoints to GCS successfully). I am getting following exceptions:
    Copy code
    2023-03-20 13:53:34,477 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Stopping DefaultJobGraphStore.
    2023-03-20 13:53:34,478 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
    java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    	at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    	at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    	at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2720) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
    	at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
    	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	... 4 more
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
    	at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
    	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	... 4 more
    Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback not found
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2712) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:107) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.<init>(Groups.java:102) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:451) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:338) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:300) ~[?:?]
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:575) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getUgiUserName(GoogleHadoopFileSystemBase.java:1226) ~[?:?]
    	at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.listStatus(GoogleHadoopFileSystemBase.java:858) ~[?:?]
    	at org.apache.flink.fs.gs.org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.listStatus(HadoopFileSystem.java:170) ~[?:?]
    	at org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.listStatus(PluginFileSystemFactory.java:141) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:180) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.17-SNAPSHOT.jar:1.17-SNAPSHOT]
    Has anyone faced a similar issue ?
    k
    c
    j
    • 4
    • 13
  • t

    Thijs van de Poll

    03/20/2023, 2:03 PM
    Hi all, I am attempting to write to an Elasticsearch 8 cluster. Is it correct that an ES-Flink connector for ES 8 is not available yet? And also that an ES:7-Flink connector is not compatible with Elasticsearch 8?
    m
    • 2
    • 9
  • r

    Rafał Trójczak

    03/20/2023, 2:49 PM
    Hi, All! I have a question concerning the following info from Flink:
    Copy code
    15:05:22.357 [main] INFO  o.a.f.a.java.typeutils.TypeExtractor -- Field Person#hobbies will be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance and schema evolution.
    I have the following class:
    Copy code
    public class Person {
    
        private String name;
        private List<String> hobbies;
    
        public Person() {
        }
    
        public Person(String name, List<String> hobbies) {
            this.name = name;
            this.hobbies = hobbies;
        }
    // getters and setters
    }
    and I prepared this `TypeInformation`:
    Copy code
    TypeInformation<Person> personTypeInformation = Types.POJO(Person.class, Map.of(
        "name", Types.STRING,
        "hobbies", Types.LIST(Types.STRING)));
    I saw a few options that don't work for me: •
    @TypeInfo(PersonSerializerFactory.class)
    - but I can't use this approach because the
    Person
    class is in a different module. •
    returns
    method, e.g.:
    Copy code
    env.fromCollection(persons)
       .returns(personTypeInformation)
    but this doesn't seem to remove the problem. How can I add this type information to the environment?
  • s

    Siddhesh Kalgaonkar

    03/20/2023, 6:09 PM
    Hi #C03G7LJTS2G, I know that Pyflink uses Java APIs under the hood so does it mean that it uses JVM for each process? or does it uses PVM? I didn't find anything about this therefore wanted to clarify how it works internally. Can somebody enlighten me on this? Any links would also help. TIA
    d
    d
    • 3
    • 3
  • u

    Usman Ismail

    03/20/2023, 9:37 PM
    Hi #C03G7LJTS2G Does anyone have experience using the Open Search flink connector? I am trying to follow the guide here and getting the following exception:
    Copy code
    Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://localhost:9200>, response=HTTP/1.1 200 OK}
    	at org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
    	at org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
    	at org.opensearch.client.RestClient$1.completed(RestClient.java:396)
    	at org.opensearch.client.RestClient$1.completed(RestClient.java:390)
    ....
    Caused by: java.lang.NullPointerException
    	at java.base/java.util.Objects.requireNonNull(Objects.java:208)
    	at org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
    	at org.opensearch.action.index.IndexResponse.<init>(IndexResponse.java:67)
    This seems to be known issue for https://github.com/elastic/elasticsearch/issues/84173 elastic search but I don’t have clear path for open search
    t
    • 2
    • 7
  • s

    Simon Lawrence

    03/21/2023, 9:58 AM
    Hi all, I am currently looking at Flink deployment, high availability using zookeeper being the goal. The documentation provides examples for deploying a flink cluster running in session mode, however I am wondering if it is possible to deploy in application mode using zookeeper for high availability? If anyone could point me in the right direction that would be amazing. Thank you!
    k
    • 2
    • 8
  • t

    Thijs van de Poll

    03/21/2023, 10:21 AM
    Hi all, I am trying to write to an OpenSearch instance using the
    flink-connector-opensearch
    connector. However, it fails with the following error:
    java.lang.ClassNotFoundException: org.opensearch.common.Strings
    . I am unsure what is causing it, because I have been loading the
    .jar
    dependency to
    ${FLINK_HOME}/lib
    . Can anyone help me out? 🙂
    • 1
    • 1
  • t

    Thijs van de Poll

    03/21/2023, 11:46 AM
    @Martijn Visser Is it correct that Datastream API is compatible with OpenSearch 2.x and Table API is not compatible with OpenSearch 2.x yet? I am currently using Table API with OpenSearch 2.6, and I am getting the following error:
    Copy code
    Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=<http://host.docker.internal:9200>, response=HTTP/1.1 200 OK}
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1942)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$FailureTrackingResponseListener.onSuccess(RestClient.java:662)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:396)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestClient$1.completed(RestClient.java:390)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:181)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
    jobmanager     |        ... 1 more
    jobmanager     | Caused by: java.lang.NullPointerException
    jobmanager     |        at java.base/java.util.Objects.requireNonNull(Unknown Source)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.DocWriteResponse.<init>(DocWriteResponse.java:140)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse.<init>(UpdateResponse.java:86)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:193)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.update.UpdateResponse$Builder.build(UpdateResponse.java:181)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:172)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:208)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:2075)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient.lambda$performRequestAsyncAndParseEntity$10(RestHighLevelClient.java:1836)
    jobmanager     |        at org.apache.flink.opensearch.shaded.org.opensearch.client.RestHighLevelClient$1.onSuccess(RestHighLevelClient.java:1940)
    jobmanager     |        ... 18 more
  • o

    Oscar Perez

    03/21/2023, 1:06 PM
    hei, we are using testharness to test flink event time. I wonder how to use the processElement method in the test harness, should we pass there the event time as second parameter?
  • o

    Oscar Perez

    03/21/2023, 1:07 PM
    we are facing the problem that ctx.currentWatermark always return Instant.MIN.toEpochTime() even though we set processWatermark before and after calling processElement with values
  • o

    Oscar Perez

    03/21/2023, 1:12 PM
    also the onTimer method is never called even though we initialize a timer in the processElement function. We suspect this has to do with the fact that ctx.currentWatermark is always Instant.MIN no matter what
  • o

    Oscar Perez

    03/21/2023, 1:19 PM
    as a matter of fact when calling testHarness.processElement we can call it using streamRecord like this testHarness.processelement(StreamRecord(event)). In this case, no processing time is set. wonder what is the use case for this?
  • a

    Ari Huttunen

    03/21/2023, 1:44 PM
    I made this function for calculating the median with the t-digest library written in Java. It doesn't work 😭.
    Copy code
    package fi.elisa.datalake.flink.flinktools.aggregation;
    
    import org.apache.flink.table.api.*;
    import org.apache.flink.table.functions.AggregateFunction;
    import org.apache.flink.table.annotation.DataTypeHint;
    import static org.apache.flink.table.api.Expressions.*;
    
    import com.tdunning.math.stats.MergingDigest;
    
    public class MergingDigestMedian extends AggregateFunction<Double, MergingDigest> {
    
      @Override
      public @DataTypeHint("RAW") MergingDigest createAccumulator() {
        return new MergingDigest(100); // 100 is a common value for normal uses.
      }
    
      @Override
      public Double getValue(@DataTypeHint("RAW") MergingDigest acc) {
        return acc.quantile(0.5);
      }
    
      public void accumulate(@DataTypeHint("RAW") MergingDigest acc, Double value) {
        acc.add(value);
      }
    
    }
    I'm calling it in pyflink code by defining it as a temporary system function.
    Copy code
    table_env.create_java_temporary_system_function("udf_median", "fi.elisa.datalake.flink.flinktools.aggregation.MergingDigestMedian")
    It fails. I'll put the logs in the comments.
    • 1
    • 1
  • r

    Raghunadh Nittala

    03/21/2023, 1:45 PM
    Hi Team, Can the temporal join be achieved on Temporary views? I’m creating 2 tables using ‘kafka’ and ‘upsert-kafka’ connectors respectively, creating temporary views with some transformations on these tables. Finally joining both the temporary views. But this will be an equi-join and not temporal join. While tried to do temporal join, the client throws the exception -
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.ValidationException: Temporal Table Join requires primary key in versioned table, but no primary key can be found
    . The table has primary key but when view is created on top of it, there is no primary key.
    p
    • 2
    • 3
  • v

    Virender Bhargav

    03/21/2023, 4:34 PM
    Hey, I am trying to use State Processor API to cleanup an existing savepoint.
    Copy code
    ExistingSavepoint savepoint = Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend());
    DataSet<KeyedState> keyedState = savepoint.readKeyedState(""uid1"", new CoreProcessStateReaderFunction());
    BootstrapTransformation<KeyedState> transformation = OperatorTransformation
                    .bootstrapWith(keyedState)
                    .keyBy(state -> state.id )
                    .transform(new StateBootstrapper());
    Savepoint.load(bEnv, savepointPath, new EmbeddedRocksDBStateBackend())
            .withOperator("uid1", transformation)
            .removeOperator("uid1")
            .write(newSavePointPath);
    The intent is to perform a "transformation" on KeyedState and replace older operate state with modified one. • CoreProcessStateReaderFunction : KeyedStateReaderFunction to read existing savepoint • StateBootstrapper : KeyedStateBootstrapFunction for state modification/cleanup I end up with an empty new savepoint(it has only _metadata folder and nothing else). Can someone help me with what I might be doing wrong?
    d
    • 2
    • 1
  • h

    Huaqiang Kang

    03/21/2023, 7:59 PM
    Is it possible to set
    auth-no-challenge
    in Flink ?
  • a

    Amir Hossein Sharifzadeh

    03/21/2023, 9:15 PM
    I need help to understand the mechanism process function better and debug my code: Here is the implementation of acquiring DataStream and I will need to pass the results to a process function for further data analysis.
    Copy code
    static void processWorkflow(
            StreamTableEnvironment tableEnv,
            DataStream<DataFileChunk> rawDataStream,
            DataStream<DataFileChunk> bkgdDataStream,
            String jsonCalibrationData
            ) {
        tableEnv.createTemporaryView("EMPAD_RAW_TBL", rawDataStream);
        tableEnv.createTemporaryView("EMPAD_BKGD_TBL", bkgdDataStream);
    
        String data_query = "select EMPAD_RAW_TBL.chunk_i as chunk_i, EMPAD_RAW_TBL.data as raw_enc_data, EMPAD_BKGD_TBL.data as bkgd_enc_data " +
                "FROM EMPAD_RAW_TBL join EMPAD_BKGD_TBL on EMPAD_RAW_TBL.chunk_i = EMPAD_BKGD_TBL.chunk_i";
    
        Table resultTable =
                tableEnv.sqlQuery(data_query);
    
        DataStream<Row> resultStream = tableEnv.toChangelogStream(resultTable);
    and this is the implementation of my ProcessFunction:
    Copy code
    package org.varimat.process;
    
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Collector;
    
    public class EMPADProcessor extends ProcessFunction<Row, Row> {
    
        @Override
        public void processElement(Row row, ProcessFunction<Row, Row>.Context context,
                                   Collector<Row> collector) throws Exception {
    
            // process row here...
            System.out.println(row.getField(0));
            String rawObject = String.valueOf(row.getField(1));
            String bkgdObject = String.valueOf(row.getField(2));
        }
    }
    When I run my application, it never stops and prints duplicated data of
    row.getField(0)
    in the
    processElement
    method. Is there any suggestion? In other words, do I need to implement class
    EMPADProcessor
    different?
  • b

    Ben Thomson

    03/21/2023, 9:24 PM
    Does anyone know the correct format for submitting program arguments via the dashboard?:
    d
    • 2
    • 2
  • t

    Thijs van de Poll

    03/22/2023, 6:36 AM
    Hi all! I am trying to understand if usage of windows are relevant for me. My goal is to sync a postgres database (source) with Elasticsearch (sink) by making transformations on the data.I have the following conceptual problem: • source A: CDC • source B: CDC • source: C: CDC • table D :
    D = select * from A left join B on <http://a.pk|a.pk>=<http://b.fk|b.fk> left join C on <http://a.pk|a.pk>=<http://c.fk|c.fk>
    • table E:
    E = select <some aggregations> from D group by D.group_key
    • table F:
    F = select *, <some expensive transformations> from E
    • F gets inserted in Elasticsearch. So what I notice is that because of the left joins, and some events arriving later than others, table F get’s updated a couple of times for the same key. Which is logical I think. However, since the transformations made to create F are expensive, it recomputes that over and over again. For me it would be better if there was some sort of timed buffer on which events to E could arrive such that the expensive transformations do not need to be calculated on every event updating E. I am trying to understand if windows can help me out here, but I am unsure. Also, it is very important that the aggregations in E do contain all of the group elements. Thanks!
    ➕ 1
    m
    • 2
    • 4
  • c

    chenlu

    03/22/2023, 9:00 AM
    Hi,team. I got a question need to confirm:No matter what version of flink, its 8082 (historyserver) will not show the running jobs
1...666768...98Latest