https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Adrian Chang

    06/22/2023, 3:57 PM
    Hello, I noted PyFlink 1.17.0 is logging secrets variables on this line
    Copy code
    INFO  org.apache.flink.client.python.PythonEnvUtils [] - Starting Python process with environment variables:
    I am using Flink Kubernetes Operator v1.5.0 and setting the secrets using secrets Kubernetes section in the yml file. Is there any way to prevent Flink logging sensitive information ? Thanks
    • 1
    • 1
  • c

    Cmx Uni

    06/22/2023, 5:07 PM
    Hello everyone, I have a question regarding the watermark "awareness" of sinks. I have a source that generates elements with monotonous event-time and therefore I use the built-in
    WatermarkStrategy*.*forMonotonousTimestamps*()*
    for watermark generation. Right after the sourceStream, the elements are sinked into Kafka. My expectation was that the sink would not emit the elements with the latest timestamp because the current watermark is
    latestTimestamp - 1
    , but the latest elements are indeed sinked to kafka before the watermark is emitted. Are sinks not aware of watermarks? Do I need to write a dummy ProcessFunction that holds these elements back? Thanks! Sorry, I could not find anything explaining this in the docs..
    m
    • 2
    • 2
  • r

    Rajat Ahuja

    06/22/2023, 6:47 PM
    Hi team, i am want to set up a session cluster on k8s and then expose a simple SQL UI where customers can write SQL query and set up streaming job. is that possible to achieve so that customers do not need to build image/jar and run the job on k8s ?
    • 1
    • 1
  • c

    Cameron Miller

    06/22/2023, 11:21 PM
    Hi everyone, I'm currently working on a project where we've deployed a Flink Session using the Flink Kubernetes Operator. We are trying to run a multi-language Beam pipeline on the cluster (pipeline is primarily using the Python SDK, but also uses the ReadFromKafka and WriteToKafka PTransforms which require a Java expansion service). We've successfully deployed this pipeline on the cluster, but are unclear about the best way to approach task manager autoscaling. Is there a recommended way to handle this when using a session cluster? Or alternatively, is there a way to deploy a multi-language beam pipeline as an Application deployment?
  • s

    Sumit Nekar

    06/23/2023, 7:11 AM
    Hi Everyone, I am using flink:1.17.1 docker image for my deployment. I want to take heap dump and I dont see jcmd or jmpa utilities bundled with the openjdk installation available with this image. What are other ways to take heap dump of TM pod in k8s based deployment?
  • a

    Amenreet Singh Sodhi

    06/23/2023, 9:55 AM
    Hi team, I am deploying Flink cluster on kubernetes in application mode. 1. Is there any default metrics for denoting the restart of the pod of jobmanager/taskmanager. I saw there is metrics for job restart, but I am specifically looking for if there is any metrics for pod restart. 2. Is there any metrics for jobmanager/taskmanager pods is running successfully(like heartbeat or something). I saw there is metrics for running time, but looking for something similar to pod heartbeat. Thanks
    h
    • 2
    • 2
  • a

    Arun Karthick

    06/23/2023, 11:45 AM
    Hi, Looking to consume from kafka topic using Flink but not sure about the schema used in kafka topic. Is there a way to consume the topic without knowing its schema for deserialization? Or use generic schema instead?
    p
    • 2
    • 1
  • p

    Pankaj Singh

    06/23/2023, 1:50 PM
    Hi, Whats the best way to measure latency (via prometheus metrics with custom labels) from any operator (in pyflink)? I see that histogram is not supported in python https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#histogram
    • 1
    • 1
  • r

    Richard Diekema

    06/23/2023, 10:04 PM
    For those running Flink applications on AWS with the Kinesis Data Analytics 1.13.2 runtime, I have a job with a KeyedProcessFunction that registers a processing time timer in the processElement method. The timer fires as expected when running unit tests with the test harness, when running the job in the LocalEnvironment, and as a JAR in a Flink docker cluster, but it does not fire when deployed as a KDA app on AWS. I see events being received by it in the job graph, but nothing is emitted. Any insight would be appreciated.
  • l

    Leon Xu

    06/24/2023, 4:16 AM
    Hi Flink users, I am trying to use Adaptive Scheduler to auto scale our Flink streaming jobs (NOT batch job). Our jobs are running on YARN with application mode. There isn't much docs around how adaptive scheduler works. So I have some questions: 1. How does Adaptive Scheduler work with YARN/Application mode? If the scheduler decides to request more tasks will it trigger the request to YARN while the job is already running 2. What's the evaluation criteria to trigger a scale-up ? Is it possible to manually trigger a scale-up for testing purpose?
    ➕ 3
    • 1
    • 1
  • o

    Oscar Perez

    06/24/2023, 8:48 PM
    hei, I am interested in running component test with flink. my approach is to use testcontainer to spin a kafka instance and then execute the job that connects to this kafka instance. the problem is that given that is an unbounded stream, the job running in the minicluster keeps executing and I cannot assert anything. One possibility would be to start the job in a different thread but I wonder if there is a better way to do component tests with flink. Thanks in advance!
  • s

    Season

    06/25/2023, 5:59 AM
    I am trying flink on k8s high availablily, start using flink application mode, k8s deployment pod-template-file, but when i want to update the jar in pod-template-file, i found i still run the old logic, the initial container download the jar, but it not execute, still use old logic, how to fix this.
    d
    • 2
    • 2
  • s

    Sumit Nekar

    06/26/2023, 4:16 AM
    Hello Team, We have deployed a flink job in application mode using flink k8s operator. I see that flink job manager memory usage is increasing and reaching 95%. Heap usage of JM is around 60% though. What else could cause increase JM memory usage.
    d
    • 2
    • 2
  • a

    Abhishek Joshi

    06/26/2023, 6:35 AM
    Hi Team, I am using context.timerService() in keyedProcessFunction to schedule the timer for a future time and then onTimer() will be called at a scheduled time. Here I'm using Kubernetes PVC for RocksDB as the state backend. Now when we schedule the timer for a future time and in-between if our pod get down and restarted again, the pod will not be able to trigger the onTimer() function at the scheduled time, even though we use PV for RocksDB as state backend. As per documentation, the timer will get restored when the pod gets restarted. Can anyone help me with this? Thanks.
    g
    m
    • 3
    • 11
  • h

    HJK nomad

    06/26/2023, 6:56 AM
    bro,how to do this...somebody help🙏
    e
    • 2
    • 1
  • y

    Yaroslav Bezruchenko

    06/26/2023, 10:09 AM
    I have a Flink Job with keyBy processing functions. Problem is that it's unevenly distributed. Unfortunately, I can't know for sure which key will have more data. Is there a way to make a better distribution between slots?
    s
    • 2
    • 1
  • t

    Tong Yue

    06/26/2023, 10:34 AM
    Why flink run command don't support submit job with s3 main jar file and s3 dependency jar (with classpath command -C )?
  • c

    Cheguri Vinay Goud

    06/26/2023, 11:01 AM
    Hello, I want to show the life cycle state of flink deployments in a grafana dashboard (datasource: prometheus). Is there any way to include the flinkDeployment name in the metric? I've followed this while setting up observability in the Kubernetes Cluster (using apache flink operator to manage flink deployments).
  • o

    Oscar Perez

    06/26/2023, 11:04 AM
    hei, one question regarding integration tests and MiniClusterWithClientResource. I have implemented integration tests with and without MiniClusterWithClientResource and both work so that makes me wonder on why is this even needed? I see that in the documentation we need to use MiniClusterWithClientResource for integration test but is it mandatory? the integration tests pass even though I dont use this cluster so I wonder what I miss if I dont use this class. Thanks!
  • e

    Eric Shieh

    06/26/2023, 7:05 PM
    Hi, I have a simple data ingestion job
    FileSource
    (S3) ->
    KafkaSink
    with a
    JobListner.onJobExecuted
    to "mv" the source file to a new folder/bucket (cleanup logic). When the job runs, one or more of the fetchers throw
    FileNotFoundException
    indicating the source file is missing; if I remove the
    JobListener
    code, the job completes successfully. It appears that the source file was moved to the new folder/bucket while the
    SplitFetcher
    is still polling for the records. Is there a way to perform clean-up logic after all streaming tasks have been completed? ps. this seems to occur when running in session mode (1 JobManager, 1 TaskManager); when I ran it in my IDE, the job completes successfully with the cleanup logic.
  • a

    Amenreet Singh Sodhi

    06/26/2023, 7:21 PM
    Hi team, I am deploying Flink cluster on kubernetes in application mode. I am passing flink-conf.yaml in flink-configmap, and it contains all the necessary flink configuration parameters(like TM/JM memory params, TM slots, etc). How do i upgrade the flink configuration when the flink cluster is already deployed? I tried using helm upgrade, the changes get reflected in the flink-conf.yaml file, but the actual flink configuration doest change. (eg- i deployed with 4 TM slots, changed it to 8 via helm upgrade, the file got updated, and not the actual deployment). Any workaround for this? I am looking for a way where i dont need to redeploy(helm delete/install) and use helm upgrade directly.
  • f

    Faisal A. Siddiqui

    06/26/2023, 7:48 PM
    Hi, I have configured
    akka.client.timeout: 10min
    and
    akka.ask.timeout:60 s
    but still seeing this error. Am i missing something ??
    Copy code
    java.lang.Exception: Cannot deploy task GroupAggregate[109] -> Calc[110] (5/8) (47e7dd1ba4f795e0dc9357f834c09f55_0122d1f6290e823b79ad614da62c0d8f_4_33) - TaskManager (xxx.xx.x.xx:6122-ca433e @ xxx-xx-x-xx.flink-taskmanager.xxx-flink-xxxx.svc.cluster.local (dataPort=33833)) not responding after a rpcTimeout of 10000 ms
    ...
    ...
    ....
    Caused by: java.util.concurrent.TimeoutException: Invocation of [RemoteRpcInvocation(TaskExecutorGateway.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] at recipient [<akka.tcp://flink@xxx.xx.x.xx:6122/user/rpc/taskmanager_0>] timed out. This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
    	at org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:59)
    s
    • 2
    • 2
  • s

    Saj Yogeswaran

    06/27/2023, 12:16 AM
    Hi. I’ve got two streams, the first has basic messages and the other has IDs. There isn’t any sort of relation between the two that requires keying logic. If I want to randomly pair an entry from each of the streams to produce a tuple2 that I later processed, would a keyedCoProcessFunction be the right way to go about joining them?
    • 1
    • 2
  • f

    Faisal A. Siddiqui

    06/27/2023, 4:47 AM
    Hi, I noticed in my application all raw tables are created with
    'properties.group.id' = 'testGroup'
    and i noticed this consumer group is quite behind in Kafka. Q#1: Is this information reliable ?? Q#2: What is the purpose of this property when checkpointing is available?
  • a

    Adam Richardson

    06/27/2023, 6:05 AM
    My company is in the process of rebuilding some of our batch Spark-based ETL pipelines in Flink. We use protobuf to define our schemas. One major challenge is that Flink protobuf deserialization has some semantic differences with the ScalaPB encoders we use in our Spark systems. This poses a serious barrier for adoption as moving any given dataset from Spark to Flink will potentially break all downstream consumers. I have a long list of feature requests in this area: 1. Support for mapping protobuf optional wrapper types (StringValue, IntValue, and friends) to nullable primitive types rather than RowTypes 2. Support for mapping the protobuf Timestamp type to a real timestamp rather than RowType 3. A way of defining custom mappings from specific proto types to custom Flink types (the previous two feature requests could be implemented on top of this lower-level feature) 4. Support for nullability semantics for message types (in the status quo, an unset message is treated as equivalent to a message with default values for all fields, which is a confusing user experience) 5. Support for nullability semantics for primitives types (in many of our services, the default value for a field of primitive type is treated as being equivalent to unset or null, so it would be good to offer this as a capability in the data warehouse) Would Flink accept patches for any or all of these feature requests? We're contemplating forking flink-protobuf internally, but it would be better if we could just upstream the relevant changes. (To my mind, 1, 2, and 4 are broadly applicable features that are definitely worthy of upstream support. 3 and 5 may be somewhat more specific to our use case)
    s
    • 2
    • 2
  • b

    Boris Litvak

    06/27/2023, 7:53 AM
    Stateful Flink SQL processing question. I remember reading a while back about: Flink operators unable to pick up the state from the savepoints in an event the SQL changed by the developer and the query optimizer changed the execution plan. Is this so/still the case?
    m
    • 2
    • 1
  • y

    Yang osmondy

    06/27/2023, 8:46 AM
    Hi Team, I'm using Flink SQL to sync from Mongo to Hive. but the Mongo collection has 2.8 billion+ data, how can i spend less time to sync data?
    a
    m
    • 3
    • 7
  • c

    Carlos Santos

    06/27/2023, 10:50 AM
    I'm using flink 1.17.1 and can't find the classes expressed on this documentation: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/datagen/ https://nightlies.apache.org/flink/flink-docs-release-1.17/api/java//org/apache/flink/connector/datagen/source/GeneratorFunction.html Any tips? I'm trying to mock a kafka source for a test.
    m
    • 2
    • 3
  • g

    Gintaras Matulas

    06/27/2023, 11:44 AM
    Hi. We are using flink 1.17.0 version and recently got this error for kafka sink. And from the ticket I don’t see any workaround for this issues https://issues.apache.org/jira/browse/FLINK-25920 Any ideas for the workaround ?
    Copy code
    java.lang.UnsupportedOperationException: Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920
    m
    • 2
    • 4
  • p

    Prathit Malik

    06/27/2023, 12:38 PM
    Hi Team We are using EMR for our Flink workload. Flink version : 1.16.0. EMR version :
    emr-6.10.0
    Our EMR currently contains only master & core nodes. We are trying to use Task Node but we are facing an issue as listed here. Error :
    Caused by: java.nio.file.AccessDeniedException: /var/lib/flink
    Since the stackoverflow question has no resolution for the same, can someone check if they faced the same issue & were able to resolve it ?
    s
    • 2
    • 1
1...909192...98Latest