https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • e

    Eric Xiao

    09/23/2022, 6:26 PM
    Hi we were looking to use the async I/O retry strategies in our flink scala pipeline, but we noticed that some of the functionality is missing from the Scala side - what's the best possible way for us to leverage the retry strategies in Scala? We tried to convert things to a Java datastream then back to a Scala datastream, but get into serialization issues.
    l
    • 2
    • 11
  • m

    Michael LeGore

    09/23/2022, 8:43 PM
    Bumping this question back up: https://apache-flink.slack.com/archives/C03G7LJTS2G/p1663276461024109
  • h

    Hari Krishna Poturi

    09/24/2022, 8:54 AM
    Hello .. deploying flink application in K8S cluster using ArgoCD .. when we change the image and click on the Sync button in Argo the application is not rolling deploying.. we have to explicitly deleting and re-deploying the app... Am in missing any configuration in my helm chart to do the rolling deployment?
  • k

    Kristian Grimsby

    09/24/2022, 4:52 PM
    Hi all! I’m trying to build the latest 1.16-rc0 on my M1 mac. It has now run for 7 hours. Is this expected or am I doing something wrong?
    m
    s
    d
    • 4
    • 13
  • k

    Kristian Grimsby

    09/24/2022, 8:05 PM
    Another question regarding 1.16-rc0 (because of mac M1): When running a python script I get
    AttributeError: 'NoneType' object has no attribute 'message_types_by_name'
    . Is this familiar for anyone? Sorry if this is obvious, but I’m fairly new to Flink - so trying to figure out the way…
    Copy code
    Traceback (most recent call last):
      File "/Users/grimsby/Dev/stimle/flink/wc.py", line 134, in <module>
        word_count(known_args.input, known_args.output)
      File "/Users/grimsby/Dev/stimle/flink/wc.py", line 66, in word_count
        env = StreamExecutionEnvironment.get_execution_environment()
      File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 837, in get_execution_environment
        return StreamExecutionEnvironment(j_stream_exection_environment)
      File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 65, in __init__
        self._open()
      File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 1033, in _open
        startup_loopback_server()
      File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/datastream/stream_execution_environment.py", line 1023, in startup_loopback_server
        from pyflink.fn_execution.beam.beam_worker_pool_service import \
      File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/fn_execution/beam/beam_worker_pool_service.py", line 44, in <module>
        from pyflink.fn_execution.beam import beam_sdk_worker_main  # noqa # pylint: disable=unused-import
      File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py", line 21, in <module>
        import pyflink.fn_execution.beam.beam_operations # noqa # pylint: disable=unused-import
      File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 24, in <module>
        from pyflink.fn_execution import flink_fn_execution_pb2
      File "/opt/homebrew/Caskroom/miniforge/base/envs/flink-test-310/lib/python3.10/site-packages/pyflink/fn_execution/flink_fn_execution_pb2.py", line 38, in <module>
        _INPUT = DESCRIPTOR.message_types_by_name['Input']
    AttributeError: 'NoneType' object has no attribute 'message_types_by_name
    d
    r
    +2
    • 5
    • 12
  • i

    Isaac Pohl-Zaretsky

    09/26/2022, 3:32 AM
    Am I correct in thinking it's impossible to use an AsyncTableFunction UDF in Flink SQL? Trying to use one but the function is not being recognized. Found this (inactive) ticket that made it seem like its not possible. If it is impossible, does anyone have a recommendation of how we should work-around this?
  • a

    Abhinav sharma

    09/26/2022, 7:35 AM
    Hi, I am aggregating data from 1 datastream and the aggregated result looks like [{id: 1}, {id: 2}]. There is another data stream which is being read from Kafka and I am getting the data as {name: abc, index: 1}. 2nd message might be something like {name: xyz, index: 1}. I want to aggregate the 2nd datastream and check if the aggregated result from 1st datastream aggregated result and check if the id from 1st datastream matches the index of the 2nd datastream, it should group that data. Is is possible to pick fields from 1st datastream and group 2nd datastream messages?
    d
    • 2
    • 12
  • s

    Sevvy Yusuf

    09/26/2022, 10:22 AM
    Hi team 👋🏼 I'm wondering if we can get the success/fail status of the job as a Datadog metric for jobs running in batch mode with no checkpoints? Currently all I can see is
    flink.jobmanager.job.numberOfFailedCheckpoints
    and
    flink.jobmanager.job.downtime
    and neither of them solve our problem. Thank you in advance
    c
    l
    • 3
    • 10
  • k

    Kristian Grimsby

    09/26/2022, 11:45 AM
    Sorry for all the newbie questions, but heres one more. Is there support for Python datastreams api and confluent kafka with schema registry (AVRO)? I see that the table api supports this, but I’m looking to run calculation on a topic row-by-row (guessing udf for this). Gathering I need to use datastreams api to achieve this?
    a
    m
    • 3
    • 3
  • f

    Felix Angell

    09/26/2022, 7:38 PM
    Hey, in PyFlink I see the docs on some of the functions for a
    KeyedProcessFunction
    mention that elements can be output via the Collector param though I don't see any methods in any of the functions specified which have a Collector arg type specified here How can I go around writing out an element in this case?
    x
    • 2
    • 3
  • h

    Hunter

    09/27/2022, 6:30 AM
    Hi, team, Is there any way to convert Java PoJo to RowData or Row in Flink?
    d
    • 2
    • 3
  • a

    Adesh Dsilva

    09/27/2022, 7:30 AM
    Are there any hooks to tap into on task manager or slot initialisation? I am creating a http client object that I pass to multiple rich functions from main method. Since they are chained I am assuming they use that same instance and open() method is called unnecessarily multiple times on the client to initialise it. One way to avoid this would be to create static client in a static initialiser or another would be to just create separate objects for each function. What’s the recommended approach in flink?
    ✅ 1
    c
    • 2
    • 5
  • p

    Prathit Malik

    09/27/2022, 8:36 AM
    Hi All, • We are using Flink SQL to build a final reporting aggregation table. Currently we are facing an issue when there is backfill done in case of pipeline failure for few hours due to some outage. ◦ Source : Kafka ( we are using
    upsert-kafka
    connector due to source being
    cdc
    type) ◦ Destination : postgres ◦ Issue : if flink job was down for lets say 3 hours, when the pipeline is resumed, 3 hours of data is backfilled but we are speculating that records are not coming in order from kafka ( in case of backfill ), watermark is not behaving as per expectation. Due to some records with greater event timestamp, watermark is progressed way further and the records before that are dropped. ▪︎ watermark strategy :
    WATERMARK FOR event_created_at AS event_created_at - INTERVAL '5' MINUTE
    • Things we have tried so far : ◦ setting
    SET 'pipeline.auto-watermark-interval' = '0';
    to get punctuated watermark. Can someone please help on this to provide some insights if anyone faced this kind of issue before ? Thanks in advance 🙏
    s
    • 2
    • 6
  • p

    Paul Lam

    09/27/2022, 8:37 AM
    Hi, team It seems that there's no code snippet about using UDTAF in Flink SQL statements, while the other 3 UDXFs have Flink SQL snippets. I'm wondering if UDTAF is supported in pure Flink SQL? If yes, what's the syntax? Thanks a lot!
    x
    • 2
    • 2
  • t

    toe

    09/27/2022, 9:55 AM
    Hi guys I had a job run for about 2 hours but the tm cannot display jvm suammry. Are there any parameters that can be adjusted? Thanks
    ✅ 1
    c
    • 2
    • 5
  • p

    Pouria Modaresi

    09/27/2022, 2:33 PM
    Hi everyone please help : As you know we can easily join and Enriching Multiple Sets of Streaming Data With Amazon Kinesis Data Analytics (KDA)create with "CREATE OR REPLACE STREAM "STREAM" " in AWS KDA ,so then it is possible to create many in-application-streams inside of a single Kinesis Data Analytics application, in-application-pumps are basically inserted queries that continuously move data from one in-application-stream to another, but how we can use same method with flink ?specially with %flink.ssql
    h
    • 2
    • 2
  • a

    Adrian Chang

    09/27/2022, 3:45 PM
    Hello Can I use asyncio in a Python UDF ? In my UDF I am calling SageMaker and waiting for the response. If I use asyncio, will this release the CPU and execute another task until I get the response ? Is it PyFlink compatible with asyncio ?
    x
    • 2
    • 2
  • j

    Justin

    09/27/2022, 8:16 PM
    Using Flink’s SQL client, I’m observing some strange behavior when querying fields that have complex, deeply nested fields. Query 1 in the attached file has an example of a field I’m able to query without any issues, tfmsStatusOutput. Query 2 in the attached file has a field that is much more complex and causes the SQL client to dump a stack trace whenever I do any sorts of query such as a bounded query LIMIT 3. Stack trace dump (screenshot 1) When I press any key on my keyboard it opens the result window again as if it’s waiting for results (screenshot 2) Another strange behavior I noticed is that I have bounded my query with a
    LIMIT 5
    . The Flink job’s topology seems to acknowledge this yet it will continue to receive records (1,269 when I took the screenshot below) and the job will continue to run (screenshot 3) Output from the logs is in screenshots 4 and 5.
    FlinkQueries.txt
    m
    • 2
    • 10
  • a

    Aeden Jameson

    09/27/2022, 11:05 PM
    Even though 1.16 isn’t officially released yet are the jars posted anywhere that is publicly accessible at this moment?
    k
    m
    • 3
    • 3
  • s

    Sergio Sainz

    09/28/2022, 1:01 AM
    Hi, could not find this info online, is there a way to configure
    conf/flink-conf.yaml
    with environment variables? like
    jobmanager.rpc.port=$MYPORT
    ?
    c
    • 2
    • 6
  • k

    Krish Narukulla

    09/28/2022, 3:27 AM
    Incompatible
    avro versions
    across jars:
    flink-avro
    and
    flink-connector-hive
    for same Flink version: 1.15. Any idea how to resolve this when building FAT jar?
    flink-avro
    brings in avro version
    1.10.0
    .
    flink-connector-hive
    brings in
    hive-exec
    FAT jar, it contains avro classes for
    1.8.2
    1. https://mvnrepository.com/artifact/org.apache.flink/flink-avro/1.15.2 2. https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive_2.12/1.15.2 https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/hive/overview/#user-defined-dependencies is there
    hive-exec
    slim jar which only brings in hive udf interface
    • 1
    • 1
  • r

    Rashmin Patel

    09/28/2022, 5:51 AM
    Hii All When reading from kafka source, Is per partition watermark strategy applicable if source parallelism < no. of topic partitions ? What I see from flink code is,
    KafkaSourceFetcherManager
    implements
    SingleThreadFetcherManager.
    , so a single-threaded kafka consumer will be assigned multiple partitions and in
    KafkaRecordEmitter
    , it emits a records of type
    output.collect(element.f0, element.f2)
    where f0 is actual data record and f2 is kafka ingestion timestamp. So, we are not maintaining any partition info along with record which will not allow us to induce a partition aware watermark in the subsequent execution flow. Can someone confirm is my understanding about this correct ?
    d
    • 2
    • 2
  • z

    Zhiqiang Du

    09/28/2022, 7:33 AM
    Hi guys. I need help. Background: 1. Started a Flink job
    flink run -s /chk-1
    2. And enable checkpointing every 2 minutes. 3. My understanding is that if task is failure while it’s running , Flink will recovery the task by the latest succeed checkpoint 4. But why the Flink recovery a job by the savepoint
    chk-1
    ?
    • 1
    • 1
  • h

    Hartmut

    09/28/2022, 2:49 PM
    Hi, not sure where to put this - but I’m experimenting with Flink 1.15.2 and Kotlin. So far things have gone fairly well with Stream Processing. Today I’ve started to dip into the Flink table API and noticed flink is not able to automatically derive table schema from Kotlin
    data class
    but resolves as
    RAW
    type…
    Copy code
    +------+--------------------------------+------+-----+--------+-----------+
    | name |                           type | null | key | extras | watermark |
    +------+--------------------------------+------+-----+--------+-----------+
    |   f0 | RAW('com.example.User', '...') | TRUE |     |        |           |
    +------+--------------------------------+------+-----+--------+-----------+
    Questions: 1. Is this a known issue? 2. Is Kotlin support anywhere on the roadmap? 3. If so, where can I create a feature request for this specific use case?
    c
    • 2
    • 2
  • j

    Jin Yi

    09/28/2022, 5:56 PM
    how does broadcast state recovery work (or it just doesn't)? reading the docs, it says it's only in memory, so not rocks db backed. so is it just heap (HashMapStateBackend) and therefore still subject to check/savepoint behaviors for recovery (albeit with some caveats for the additional memory to disk serialization cost)?
    d
    • 2
    • 1
  • y

    Yaroslav Bezruchenko

    09/28/2022, 8:33 PM
    Hey guys, I'm running Flink 1.14.0 on Kubernetes. Question is: is it better to store savepoints in PVC or in S3 bucket? What is the best practice here?
    👀 1
  • a

    Avinash

    09/28/2022, 8:54 PM
    Hi Folks 👋 I’m trying to read kafka topic data with timestamp column named
    timestamp
    Copy code
    input_topic_schema_builder = Schema.new_builder() \
            .watermark("timestamp", "timestamp - INTERVAL '5' SECOND")\
    When I try to create this schema, it tells me there is an parsing error at
    Copy code
    org.apache.flink.sql.parser.impl.ParseException: Encountered "timestamp -" at line 1, column 1.
    Is
    timestamp
    a reserved word? What can i do if the underlying kafka topic field name is
    timestamp
    ? Should I rename it somehow? TIA
    m
    m
    • 3
    • 4
  • s

    Sumit Nekar

    09/29/2022, 3:38 AM
    Hi Team, Asking to understand better. FlinkDeployment in Flink k8s operator has a field flinkVersion to support multiple versions of flink. How does it work if my image, used in Flinkdeployment is built using a base image Flink 1.13 ?
    g
    • 2
    • 3
  • t

    Taruj Goyal

    09/29/2022, 3:48 AM
    We are currently processing a high-throughput job for Flink Job that has several
    Async
    operators interacting with external services. We are trying to do batching here by KeyBy and Windowing operations currently and would want to know what are some good ways to do so to minimize the external network calls to while evenly distributing the load across all the task-slots. What are some obvious ways to do so and good patterns to follow along for this?
    k
    a
    • 3
    • 12
  • t

    Taruj Goyal

    09/29/2022, 3:51 AM
    Additionally I have another query for flink in general. We are currently using a flink-kubernetes deployment and would want this to externally connect with another microservice potentially in the same namespace. Would having a stateful-function container deployment provide any advantage or obvious pattern here if we would like to query this service or potentially have that exposed via an endpoint and querying that via the Async IO based approach that is shown in the flink docs. We would want this external service to scale and to match the requests that we send out to this external service.
1...202122...98Latest