https://flink.apache.org/ logo
Join SlackCommunities
Powered by
# troubleshooting
  • j

    Jaehyeon Kim

    02/23/2023, 3:52 AM
    Hello I'm teaching myself pyflink table api. I created a script that aggregates revenue by seller_id where tumbling window is setup using event time. The source table is a Kafka topic and it keeps fake sales items as shown below.
    Copy code
    {
      "seller_id": "LNK",
      "quantity": 5,
      "sale_ts": 1677124093560,
      "name": "Toothpaste",
      "product_price": 4.99
    }
    The issue is it fails to create the aggregated output into the sink tables when the window is based on the event time (
    evttime
    ) - it just stuck forever. On the other hand, it works when it is based on the process time (
    proctime
    ) and I'm quite confused. Can you please check the attached script and inform me how to make it working?
    s4_12_eventtime_windows.py
    d
    • 2
    • 3
  • m

    Mingfeng Tan

    02/23/2023, 6:29 AM
    Hi team, when I upgrade Flink jobs to 1.16, many of the jobs fail to perform further deployment/upgrade due to error
    Caused by: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
    . I have tried to increase kafka request time out for the consumer, but it does not have any effect. and the issue is still there. The flink jobs are running under standalone mode and are managed by apache flink operator 1.3.1. During upgrade, the stop and savepoint operation hits the following but manually trigger savepoints are totally OK. Previously there was discussion mentioned it can be resolved by changing the kafka consumer/producer to kafka source and sink. However we have > 50 flink jobs so need time to change all of them. Anybody know if there is workaround for this issue? Thanks!
    Copy code
    2023-02-22 22:25:42
    org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
    	at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1036)
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: Event Source -> Extract Value From Kafka Message -> Add Datamart Metadata -> Filter Datamart Orgs And Stores (1/1)#2 Failure reason: Task has failed.
    	at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1375)
    	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1318)
    	at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:344)
    Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
    	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source)
    	... 3 more
    Caused by: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
    	at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177)
    	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164)
    	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945)
    	at org.apache.flink.streaming.api.operators.StreamSource.stop(StreamSource.java:128)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.stopOperatorForStopWithSavepoint(SourceStreamTask.java:306)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.lambda$triggerStopWithSavepointAsync$1(SourceStreamTask.java:286)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:367)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    	at java.base/java.lang.Thread.run(Unknown Source)
    y
    • 2
    • 2
  • m

    Matt Fysh

    02/23/2023, 11:55 AM
    in PyFlink, is there a way to perform gzip-decompression on events coming from a kinesis or kafka source? Both of these require a SerDe Schema to be passed through, and the only one available is SimpleStringSchema which expects UTF-8 encoding. This does not work for other types of binary messages
    u
    • 2
    • 3
  • s

    Sumit Nekar

    02/23/2023, 12:16 PM
    Hello, Why default value of taskmanager.memory.task.off-heap.size is set to 0. I want to better understand about how flink uses task off heap memory. Can setting lower value for off-heap result in OOMKilled errors in native kubernetes based deployment? Thanks
  • g

    Giannis Polyzos

    02/23/2023, 4:16 PM
    Im having a hard time understand some behavior with temporal joins. Lets assume two kafka topics - an append-only and a changelog i.e a compacted one and a non-compacted. Up until now if i wanted to join these two tables i was using a regular join like
    Copy code
    SELECT *
    FROM orders
    JOIN products
    ON orders.productId = products.id
    what i realised is its best practise?? to use a temporal join for this, which means changing my query to something like this
    Copy code
    SELECT *
    FROM orders
    JOIN products FOR SYSTEM_TIME AS OF orders.orderTime_ltz
    ON orders.productId = products.id
    What im trying to undestand is: 1. How is it different and what SYSTEM_TIME AS OF actually enforces 2. Why when the first query works and emits results, when i specify SYSTEM_TIME AS OF i get no output
    m
    • 2
    • 3
  • e

    Emmanuel Leroy

    02/23/2023, 5:35 PM
    Does anyone know what the part size is when using FileSink to S3, where files are spilt up into multipart uploads? How is this determined?
  • c

    Christos Hadjinikolis

    02/23/2023, 9:29 PM
    Does anyone know how to use pyFlink to read and write to a Kafka topic using a to schemas to write and a schema registry to read?
    d
    • 2
    • 6
  • p

    Prithvi Dammalapati

    02/23/2023, 10:46 PM
    Hello, I have a Flink Application on K8s (using Flink K8s operator) with ~100 operator, 200 parallelism producing 30TB of state on S3. On restarting an application, the checkpoint from S3 is restored pretty quickly but the full checkpoint takes a long time to complete. We've noticed that the Network Out to S3 is very low compared to the Network IN. Experimenting with the following features in
    flink-conf.yaml
    did not produce expected results
    Copy code
    s3a.attempts.maximum: "25"
      s3a.threads.max: "512"
      s3a.threads.keepalivetime: "240"
      s3a.connection.maximum: "2048"
      s3a.multipart.size: "100M"
      s3a.fast.upload.active.blocks: "8"
      s3a.multipart.purge: "true"
    Any recommendations to expedite the checkpointing to S3? TIA
  • z

    Zhiyu Tian

    02/24/2023, 12:43 AM
    Hello team, I am running a simple code of Flink Kafka Source, but it does not continue fetch data from the Kafka topic after fetching few records. And no error message printed in the TaskManager. Could you help on this? Any thoughts? Flink runtime : 1.6.0 flink-connector-kafka: 1.6.0 Kafka-client : 3.1.1, in order to match the version of Kafka Broker. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final String kafkaBrokers = "xxxxx:xx"; final String topicName = "xxxx"; final String consumerGroup = "Flink-justprint-group-12345"; KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(kafkaBrokers) .setTopics(topicName) .setGroupId(consumerGroup) .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST)) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.getCheckpointConfig().disableCheckpointing(); DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); stream.map(item -> item.length()%5) .map(item -> DateTime.now().toString() + ":" + item) .print(); stream.executeAndCollect();
    ✅ 1
    s
    • 2
    • 6
  • a

    Amir Hossein Sharifzadeh

    02/24/2023, 12:59 AM
    In our current project, we only have one topic and we want to create two or more tables based on one topic and distribute columns between multiple tables. Is there a way to assign multiple tables to one topic?
    d
    • 2
    • 2
  • d

    ding bei

    02/24/2023, 4:14 AM
    hey guys, I have a question about plugins mechanism ,to be specific , hadoop-s3 plugin
    Copy code
    1.  My APP is runing on kubernetes managed by flink kubernetes operator, I use ENV ENABLE_BUILT_IN_PLUGINS to link hadoop-s3 plugin to where it supposed to be, and I exec into the pod ,confirmed iflink-s3-fs-hadoop-1.15.2.jar is in plugins/hadoop-s3 folder.
    
    2.  My APP will read proto records from kafka, so I wrote a deserializer which will call hadoop releated class funciton ,but not directly ,in a very deep chain.
    
    3.  However , it report NoClassFoundError org/apache/hadoop/conf/Configuration ,and the error trace back to deserializer  
    
    4.  Then I append hadoop dependency into my application pom ,and package into a fat jar, guess what, it worked
    So can somebody please explain this to me ? I read a lot of times of Flink DOC about plugins and classloader ,still no clue
    m
    • 2
    • 4
  • a

    Aditya

    02/24/2023, 5:00 AM
    Hi Team, Is there a way I can directly join streamA with the dynamic table without creating dynamic table for streamA itself?
    👀 1
    m
    • 2
    • 7
  • r

    Rashmin Patel

    02/24/2023, 6:24 AM
    Hello all I am reading a cdc-stream (strictly ordered within partition) from kafka topic (with 5 partitions). During streaming phase, watermark advances correctly, but during backfill phase (i.e reading historical data of say last 7 days from kafka topic) watermark seems to be advance abruptly, without first processing messages with
    ts
    less than current watermark. I am using partition aware watermark strategy (max-bounded-out-of-order) with
    env.fromSource[Option[T]](kafkaSource, watermarkStrategy, id)
    In what scenarios, this behaviour can happen ?
  • c

    chunilal kukreja

    02/24/2023, 8:21 AM
    Hi Team, While running the job in application mode in k8s cluster, I m facing an issue that last saved checkpoint is getting restored every time whenever I delete & create a new flink cluster. Ideally when I am taking a savepoint & starting. the cluster w/o providing any savepoint config details, then how come it restores the last saved checkpoint?
    g
    a
    • 3
    • 11
  • d

    Dimitris Kalouris

    02/24/2023, 2:12 PM
    Hey team, I am trying to read JSON files where each element is in a new line using Table API, but JSON assumes we have every object in one line. I also tried to eliminate newlines using datastream but it does so line by line so removing newlines does not affect the file. Any thoughts? Example: { "a":1, "b": "a" } CREATE TABLE my_table (entry ROW<`a` INT,
    b
    STRING>) WITH ( 'connector' = 'filesystem', 'path' = 'my_file.json', 'format'='json'); select * from my_table; gives: [ERROR] Could not execute SQL statement. Reason: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.io.JsonEOFException: Unexpected end-of-input: expected close marker for Object (start marker at [Source: (byte[])"{"; line: 1, column: 1]) at [Source: UNKNOWN; line: 1, column: 2]
    s
    • 2
    • 2
  • s

    Sergio Sainz

    02/25/2023, 2:18 AM
    Hello 👋 , Trying to set the slotSharingGroup through the TableAPI layer and could not quite find the setting in the table or table env object. Wonder if there is a way to set the slot sharing group or alternatively a flink ticket requesting for it. https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/finegrained_resource/
  • k

    Krish Narukulla

    02/25/2023, 7:04 PM
    How to pass additional java jar dependencies like my custom jars using pipeline.jars? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/python/dependency_management/
    d
    • 2
    • 1
  • s

    Sumit Nekar

    02/26/2023, 7:06 AM
    Hello Folks, We are having a flink pipeline ingesting high volume of data to kafka/eventhub . Sometimes sink gets overwhelmed for a short period of time and starts throwing network exceptions. We are using FlinkKafkaProducer. Are there any recommendations or best practices to avoid restart of the flink job because of such network exception. Restart of a flink job is expensive for us.
  • v

    Viacheslav Chernyshev

    02/27/2023, 10:29 AM
    Hello, is anyone familiar with how exactly the operator chaining affects the data propagation mechanisms? We're facing severe performance issues after adding a number of
    KeyedCoProcessFunction
    s into the pipeline. As far as I can see, these operators are currently not chainable. Subsequently, they are forced to go through a "network" serialisation barrier here, despite having exactly the same parallelism. I find it a bit unexpected and wonder if there is a way to avoid it. Example job graph:
    Copy code
    Source A -> keyBy(A) --+-> KeyedCoProcess -> Sink
                          /
    Source B -> keyBy(B) +
    If I have either
    Source A -> keyBy(A) -> KeyedProcess -> Sink
    or
    Source B -> keyBy(B) -> KeyedProcess -> Sink
    independently, then everything after
    keyBy
    is chained as expected and the information is passed without extra serialisation.
  • k

    kingsathurthi

    02/27/2023, 11:08 AM
    is it possible configure readiness probe for taskmanager in flink operator?
    w
    • 2
    • 8
  • c

    chunilal kukreja

    02/27/2023, 12:40 PM
    Hi Team, I am constantly getting below exception;
    Copy code
    org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206)
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169)
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2104)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2083)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1375)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1265)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1157)
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89)
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    And this has become a big problem, that once it starts coming in post that job restarts & hardly gets into stable condition.
  • n

    Nikhil Kalyankar

    02/27/2023, 2:07 PM
    Hi Team. I would appreciate any help with this issue: https://stackoverflow.com/questions/75552943/apache-flink-1-14-streamingfilesink-not-copying-all-files-to-s3 I have already tried
    FileSink
    as well.
  • v

    Vibhavari Bellutagi

    02/27/2023, 2:36 PM
    Hello Team, I have started with Flink with Python few days ago. I'm trying to start with the word count example found on this link but wasn't sure how to test it. Any hints would be appreciated
    d
    • 2
    • 26
  • u

    刘路

    02/27/2023, 3:01 PM
    I start flink using ./bin/start-cluster.sh and get below message. But I can't open the localhost:8081 page to watch the flink cluster state. can anyone help me?
    🧵 1
    👀 1
    m
    • 2
    • 4
  • u

    刘路

    02/27/2023, 3:14 PM
    mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-walkthrough-datastream-java \ -DarchetypeVersion=1.18-SNAPSHOT \ -DgroupId=frauddetection \ -DartifactId=frauddetection \ -Dversion=0.1 \ -Dpackage=spendreport \ -DinteractiveMode=false I got this error:[ERROR] Failed to execute goal org.apache.maven.pluginsmaven archetype plugin3.2.1:generate (default-cli) on project standalone-pom: The desired archetype does not exist (org.apache.flinkflink walkthrough datastream java1.18-SNAPSHOT) -> [Help 1]
  • t

    Tommy May

    02/27/2023, 8:10 PM
    Hello, I had a question similar to https://apache-flink.slack.com/archives/C03JKTFFX0S/p1674668351943749. Are there any potential upcoming plans to support this? I also found FLIP-186 which had a proposal for the issue, but from what I can find there hasn't been a lot of detailed discussions or started implementation. My use case is doing a join on two high throughput kafka topics. Even with the data partitioned on the incoming topics, the
    keyBy
    is resulting in hundreds of mb/sec of network shuffling, so we could see a significant performance bump if we could better preserve the upstream partitioning in Flink
  • a

    Aravind

    02/27/2023, 8:19 PM
    Hi, I have been exploring Stateful functions for our project. One thing that's blocking me is the lack of support for kafka headers both at ingress and egress. we send some metadata as headers. I see this ticket for OTel tracing: https://issues.apache.org/jira/browse/FLINK-26617 But I have business use case as well with headers. Please let me know if this ticket would get any traction? Thanks!
  • n

    Nathanael England

    02/27/2023, 8:38 PM
    What would be the right data type to use for state descriptor in pyflink when I expect to have 64 bit unsigned integers? Do I need to use
    BIG_INT
    ?
    d
    • 2
    • 5
  • e

    Eric Xiao

    02/27/2023, 8:40 PM
    Hello, we also have been observing our Jobs and JM getting restarted due to the following exceptions:
    Copy code
    {"instant":{"epochSecond":1677246902,"nanoOfSecond":184000000},"thread":"OkHttp <https://10.28.56.1/>...","level":"WARN","loggerName":"io.fabric8.kubernetes.client.dsl.internal.WatcherWebSocketListener","message":"Exec Failure javax.net.ssl.SSLException Connection reset","endOfBatch":false,"loggerFqcn":"org.apache.logging.slf4j.Log4jLogger","contextMap":{},"threadId":255067,"threadPriority":5}
    
    {"instant":{"epochSecond":1677253016,"nanoOfSecond":460000000},"thread":"pool-278132-thread-1","level":"ERROR","loggerName":"io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector","message":"Exception occurred while acquiring lock 'ConfigMapLock: ...-cluster-config-map (..)'",
    We are running with flink version
    1.15.1
    and not with the flink k8s operator. We are not sure how to investigate into this issue and what the root cause could be.
    • 1
    • 2
  • k

    Krish Narukulla

    02/27/2023, 10:29 PM
    Do jars under
    opt/flink
    would be added to
    flink class path
    ?
    Copy code
    root@examplecode-6d4d45688f-x8k6n:/opt/flink# ls opt/
    flink-azure-fs-hadoop-1.16.1.jar   flink-gs-fs-hadoop-1.16.1.jar             flink-s3-fs-hadoop-1.16.1.jar                              flink-sql-client-1.16.1.jar           python
    flink-cep-scala_2.12-1.16.1.jar    flink-oss-fs-hadoop-1.16.1.jar            flink-s3-fs-presto-1.16.1.jar                              flink-sql-gateway-1.16.1.jar
    flink-gelly-1.16.1.jar             flink-python-1.16.1.jar                   flink-shaded-netty-tcnative-dynamic-2.0.44.Final-15.0.jar  flink-state-processor-api-1.16.1.jar
    flink-gelly-scala_2.12-1.16.1.jar  flink-queryable-state-runtime-1.16.1.jar  flink-shaded-zookeeper-3.6.3.jar                           flink-table-planner_2.12-1.16.1.jar
    s
    • 2
    • 3
1...596061...98Latest