https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • m

    Maryam

    01/24/2023, 7:48 PM
    Are
    Partition By
    in the Table API and
    KeyBy
    in the DataStream API the same in terms of how they process data in streaming mode, specifically in terms of whether data is processed together in the
    same task slot
    ? Additionally, I am asking because my Top-N query seems to be producing non-deterministic results when using parallelism greater than 1.
    s
    • 2
    • 6
  • a

    Adrian Chang

    01/24/2023, 8:01 PM
    Hello, What could be the reasons of watermark not be propagating downstream ?
    j
    • 2
    • 7
  • a

    Andre Cloutier

    01/24/2023, 8:17 PM
    šŸ‘‹ I have a long shot question for y’all. As an alternative to stateful functions, I was thinking of using SideOutputs in the Data Stream API as a means to dynamically dispatch ā€œmessagesā€ to/from different Process Functions. Are there any drawbacks to this approach? (eg. does it impact checkpointing?)
    j
    • 2
    • 7
  • e

    Eric Xiao

    01/24/2023, 10:02 PM
    šŸ‘‹, we are testing Flink's reactive mode deployment and it seems to scale up and down fine when a pipeline is fresh with a few checkpoints, but after a couple of days (more than 100? checkpoints), when we try to scale up or down, the pipeline gets into a weird state where the Task Manager will get into a "Running" state briefly (a couple of seconds) before getting canceled. Combing through the logs and I can't find anything to support my hypothesis that this has to do with the number of checkpoints. But I did find this log in the Job Manager:
    Copy code
    {
      "instant": {
        "epochSecond": 1674595600,
        "nanoOfSecond": 114000000
      },
      "thread": "flink-akka.actor.default-dispatcher-17",
      "level": "INFO",
      "loggerName": "org.apache.flink.runtime.jobmaster.JobMaster",
      "message": "Disconnect TaskExecutor .... because: The TaskExecutor is shutting down.",
      "endOfBatch": false,
      "loggerFqcn": "org.apache.logging.slf4j.Log4jLogger",
      "contextMap": {},
      "threadId": 103,
      "threadPriority": 5
    }
    g
    r
    • 3
    • 13
  • t

    Thomas Zhang

    01/24/2023, 10:15 PM
    Hi, I'm testing StatementSets and I have two sources I want to insert into one sink, like so:
    Copy code
    stmt_set \
            .add_insert_sql("INSERT INTO OFFER_ELIGIBILITY_CURRENT_STATE SELECT * FROM offer_elig_j_1")
        stmt_set \
            .add_insert_sql("INSERT INTO OFFER_ELIGIBILITY_CURRENT_STATE SELECT * FROM offer_elig_j_2")
        
        table_result = stmt_set.execute()
    but I'm running into this error
    Copy code
    Caused by: java.lang.IllegalArgumentException: Hash collision on user-specified ID "uid_hoodie_stream_writeOFFER_ELIGIBILITY_CURRENT_STATE". Most likely cause is a non-unique ID. Please check that all IDs specified via `uid(String)` are unique.
    is how I'm using StatementSets to write into one destination from multiple sources supported?
    m
    d
    • 3
    • 18
  • c

    Colin Williams

    01/24/2023, 10:25 PM
    Hi folks, has anyone been executing SQL jobs via the flink k8s operator? It looks like there's an example running SQL jobs wrapped within the table API https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example . It also looks like there's a newish SQL gateway and REST API. Pardon my ignorance but would it be difficult to modify the operator to accept SQL jobs via the rest interface in https://issues.apache.org/jira/browse/FLINK-15472 / https://cwiki.apache.org/confluence/display/FLINK/FLIP-91%3A+Support+SQL+Gateway? Perhaps I can "tool" around. If anyone has any experience or advice with the following perhaps they could suggest how this might be implemented for a demonstration we could share under the apache license.
    g
    • 2
    • 4
  • s

    Suparn Lele

    01/25/2023, 6:49 AM
    I have one use case where I am getting data from kafka, I am creating a datastream out of it. And I want to filter the contents of the datastream by looking up data in one of our tables sitting in postgres. The table has around 1k rows. And I want to do the lookup say every 1 hour. Is there a way through which I can achieve this in flink?
    s
    • 2
    • 1
  • s

    Samrat Deb

    01/25/2023, 9:37 AM
    Hi, what is the difference between
    CatalogPartitionSpec
    and
    CatalogPartition
    with respect to flink codebase ?
  • k

    kingsathurthi

    01/25/2023, 12:16 PM
    [Flink Kubernetes operator 1.3.0] with watch namespace on openshift when deploying flinkdeployment im getting below error
    Error: container has runAsNonRoot and image has non-numeric user (flink), cannot verify user is non-root (pod: "podname-66bfdf99f6-s6pzr_welk-tx02-vzw-anpd-y-nk-inf-020(3a796c2e-f11b-4de2-8a09-eab801cde6d4)", container: conainer_name)
    flink kubernetes operator has belwo security context enabled
    podSecurityContext:
    runAsUser: 9999
    runAsGroup: 9999
    runAsNonRoot: true
    fsGroup: 9999
    sharing pointer will be helpful
    b
    r
    • 3
    • 7
  • a

    Ari Huttunen

    01/25/2023, 1:44 PM
    Is it possible (using either pyflink or sql) to select all columns except columns "a", "b", and "c"? Or rather, I would like to select all columns and create a couple of calculated columns "d" and "e". As there are a lot of columns, I'd prefer not to explicitly list all columns.
    j
    • 2
    • 2
  • a

    Ari Huttunen

    01/25/2023, 3:36 PM
    The readability of this page could be improved..
    m
    • 2
    • 5
  • t

    Thomas Abraham

    01/25/2023, 6:34 PM
    Quick Question on Flink Is there a way i can replace a value for a nested column in Table API? I am trying to do the following • I want to pick up value from
    __event_headers.header_key1
    • cast it as String • save it under a nested field.
    __event_headers_parsed.header_key1
    Copy code
    table.addOrReplaceColumns($("__event_headers").get("header_key1").cast(DataTypes.STRING()).as(s"__event_headers_parsed.header_key1"))
    But instead it is creating a new field with
    .
    and having it in the root as follows. Output i am getting:
    Copy code
    {
      "id": 3,
      "name": {
        "firstName": "Jon",
        "lastName": "Doe"
      },
      "__event_headers": {
        "header_key1": "aGVhZGVyX3ZhbHVlMQ==",
        "changeAction": "aGVhZGVyX3ZhbHVl"
      },
      "__event_headers_parsed": null,
      "__event_headers_parsed.header_key1": "header_value"
    }
    Expected output
    Copy code
    {
      "id": 3,
      "name": {
        "firstName": "Jon",
        "lastName": "Doe"
      },
      "__event_headers": {
        "header_key1": "aGVhZGVyX3ZhbHVlMQ==",
        "changeAction": "aGVhZGVyX3ZhbHVl"
      },
      "__event_headers_parsed": {
        "header_key1": "header_value"
      }
    }
  • s

    Sami Badawi

    01/25/2023, 7:04 PM
    What is the simplest way to install RC1 for 1.16.1 on a Mac with M1 with both PyFlink and Java/Scala Flink?
    m
    d
    x
    • 4
    • 16
  • y

    Yaroslav Bezruchenko

    01/25/2023, 8:14 PM
    Hey, I'm using Flink Operator 1.3.0 on kubernetes. And Taskmanagers of my job have a dependency on a sidecar that should start in 2-3 mins. Problem is that task managers instantly contact jobmanager and if it's healthy it tries to start a Job. But sidecar of jobmanager is still not ready yet (according to readiness health checks). Can I delay start-up of a job or respect kubernetes readiness health checks somehow?
    k
    • 2
    • 1
  • j

    Jeremy DeGroot

    01/25/2023, 8:18 PM
    I'm getting started running Batch SQL Jobs in Flink, and I've run into this strange behavior a couple of times. One Task completes, a subsequent Task fails and gets cancelled, and rather than the Job failing or getting cancelled it stays in a Running state forever (never completes). Any idea what the cause is or how to keep this from happening?
    m
    w
    • 3
    • 16
  • j

    Jason Politis

    01/25/2023, 11:39 PM
    Hey everyone. In flink(SQL) 1.15.3, is there a way to force the broadcast of a table, similar to the new hints in 1.16? I have a table A with only 225 records, they are date ranges with an ID. Table B that i'm joining to it has 7mil+ records and a date. If the date of a record in table B falls between the date range of a record in table A then it'll receive the ID. This date range comparison is forcing the join operator to parallelism of 1. Or, is there a way to force these tables to partition on a date range somehow? Thanks
  • d

    DJ

    01/26/2023, 12:21 AM
    hi everyone, I noticed jdbc sink doesn’t seem to support writing array type for postgres? can anyone confirm?
    s
    m
    • 3
    • 5
  • r

    Ruslan Danilin

    01/26/2023, 4:12 AM
    Hi Guys! I'm using Flink's Iceberg connector with Hive MetaStore (HMS). Inside, it uses HiveMetaStoreClient class that by default sets "hive" as a value for the catName property (corresponds to CTLG_NAME in the underlying HMS MySSQL table). I need to change "hive" to another value to make it work with our Trino setup. And it's possible to configure it via metastore.catalog.default parameter. I've tried to provide it via flink-conf.yaml and by using inside SQL WITH during the catalog creation. Neither worked. Could you please share any ideas what is the correct way to do it? Thank you
    • 1
    • 1
  • m

    Mikhail Spirin

    01/26/2023, 7:39 AM
    Hi everyone! I’ve met a strange one and i need to ask you guys if im not missing anything. I have an issue with parsing gzipped json in plain file, but i’m cutting this to much more simple case: I have filesystem raw source, and simple sql which counts lines. For non-compressed test file of 1k lines, i get 1k as result of count. for same file, gzipped with terminal, i get 12 as a result. Docs says gzipped files should be decompressed on-the-fly, based on extension, but it doesn’t happen to me. There is no error, result is just wrong. More details in thread
    m
    • 2
    • 3
  • a

    Almog Golbar

    01/26/2023, 9:27 AM
    Hi, i have trouble with communicate with kafka configured with ssl. producer has the following config:
    Copy code
    security.protocol = SSL
            ssl.truststore.type = JKS
            ssl.truststore.location = /store/truststore.jks
            ssl.keystore.type = JKS
            ssl.keystore.location = /store/my-keystore.jks
            ssl.keystore.password=keystore_password
            ssl.truststore.password=trustore_password
    I'm getting SSL handshake error in kafka broker side, and this error in flink job manager:
    Copy code
    java.io.IOException: Connection to ******:9093 (id: -1 rack: null) failed.
    	at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70) ~[validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:526) ~[validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:447) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    Copy code
    java.io.EOFException: null
    	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:120) ~[validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) ~[validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) ~[validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) ~[validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.common.network.Selector.poll(Selector.java:481) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:560) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:526) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:447) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:316) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:243) [validator_2.0.0-RC2-48-d6f38ff4.jar:2.0.0-RC2-48-d6f38ff4]
    Important: i have another microservice that works fine with kafka. any ideas?
    r
    l
    • 3
    • 6
  • s

    Sami Badawi

    01/26/2023, 10:17 AM
    I am trying to recreate: DataGeneratorSource in my test code: But Gradle cannot find these Flink files:
    Copy code
    org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase
    org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader
    I can see them in the main Flink repo on GitHub, but I cannot find the right dependencies for my project. My Gradle dependencies:
    Copy code
    flinkVersion = '1.16.0'
    implementation "org.apache.flink:flink-streaming-java:${flinkVersion}"
    implementation "org.apache.flink:flink-clients:${flinkVersion}"
    implementation "org.apache.flink:flink-table-common:${flinkVersion}"
    implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"
    implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
    implementation "org.apache.flink:flink-table-planner_2.12:${flinkVersion}"
    implementation "org.apache.flink:flink-connector-base:${flinkVersion}"
    implementation "org.apache.flink:flink-connectors:${flinkVersion}"
    implementation "org.apache.flink:flink-connector-test-utils:${flinkVersion}"
    implementation "org.apache.flink:flink-core:${flinkVersion}"
    h
    y
    • 3
    • 10
  • f

    Felix Angell

    01/26/2023, 2:10 PM
    Hey there, is there a way that I can measure how many late arrival events occur in PyFlink on 1.15?
    s
    • 2
    • 4
  • a

    Aviv Dozorets

    01/26/2023, 4:07 PM
    Running flink 1.16 as StandaloneApplication, checkpoints are stored in s3 bucket. But whenever i’m passing
    -s <s3://bucket-name/checkpoints/000000/chk-20/>
    or location to the manually created savepoint, i’m getting:
    Copy code
    Exception in thread "main" java.lang.NoSuchMethodError: 'boolean org.apache.commons.cli.CommandLine.hasOption(org.apache.commons.cli.Option)'
    	at org.apache.flink.client.cli.CliFrontendParser.createSavepointRestoreSettings(CliFrontendParser.java:631)
    	at org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory.createResult(StandaloneApplicationClusterConfigurationParserFactory.java:90)
    	at org.apache.flink.container.entrypoint.StandaloneApplicationClusterConfigurationParserFactory.createResult(StandaloneApplicationClusterConfigurationParserFactory.java:45)
    	at org.apache.flink.runtime.entrypoint.parser.CommandLineParser.parse(CommandLineParser.java:51)
    	at org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils.parseParametersOrExit(ClusterEntrypointUtils.java:70)
    	at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:58)
    I’ve been all over documentation and feel like i’m missing something. Any help would be appreciated.
    s
    r
    • 3
    • 10
  • a

    Amir Halatzi

    01/26/2023, 4:28 PM
    Hey šŸ™‚ I’m trying to set up a flink pipeline to dedup Kafka events - and send the deduped events to another kafka topic, with exactly once guarantee. Looks like I’m going wrong about the issue and it cannot be done. Can someone explain what I’m doing wrong?
    s
    • 2
    • 15
  • v

    Vivek

    01/26/2023, 6:11 PM
    Trying to understand how to implement checkpoint in custom source reader. My question is how do I know what all events are processed by last step and what to checkpoint? I read about event barriers. Let's say we processing data A1, A2, A3, A4, A5, and checkpoint barriers B are inserted randomly. And it got processed as below A1, A2, B1, A3, A4, B2, A5 At any point if checkpoint on source function is called then how do I know which all events are processed by last step ? Is it A2 or A4 or so on.
  • j

    Jin S

    01/26/2023, 7:45 PM
    Hello, I’m considering a few possibilities for a Flink job, and would like to ask for insights from the community šŸ™ There may have been gotchas that I’ve missed out or things I’m yet to be aware of. Let’s say I have one event per min on a Kafka topic. I need to do some operations on the current element and an element from a certain time offset, X min ago. Thinking about 2 options: 1. 2 consumers, one reading from latest offset, another reading from latest-X offset. Only the leading consumer will help move the event time on the operator. Cons: a. If we need to run the same logic for various offsets (5min, 1440min, 10080min, etc), there would be a fan-out issue (1 new consumer needed for each time offset). Perhaps we can split the topic into multiple smaller topics before applying the same logic on them. b. Anything else that we need to be aware of if the offset is big? e.g. 24 hours? 7 days? Theoretically, as long as the retention time for the Kafka topic exceeds this, it seems fine…? c. Perhaps can consider having 2 kafka topics (replicated), if there’s issues with the above. 2. Use Statefun to send delayed messages (non-blocking, fault tolerant) One single consumer, with 2 downstream operators, one operator process it as is, another operator sends delayed events to the first operator. Question: a. The same logic can be achieved by using a process function and storing all the elements within the time offset in a ListState (can get big if time offset is big); think ā€˜pseudo sliding window’. But instead of having a big ListState, perhaps it’s more optimised/efficient when having a separate operator deal with the delayed events? b. If we need to run the same logic for various offsets (5min, 1440min, 10080min etc, can be on separate threads, parallelism), would the Statefun delayed messages approach be scalable? How does it fare/how does it work compared to Timers? (I’m not familiar with Statefun and would like to know what to watch out for. For timers, we can try RocksDB backend if it’s too much I guess). Thank you very much šŸ™‚
    e
    • 2
    • 2
  • s

    Sandeep Kathula

    01/26/2023, 9:43 PM
    Hi, I am trying to start a Flink application from savepoint. When I check in Google, I see we can run flink application using
    flink run -s <savepoint path>
    But I would like to run from intellij starting from savepoint. Is there a way to specify in Flink to start from savepoint programatically instead of command line? I am trying to debug something and would like to set breakpoints from Intellij. So I would like to start my flink application from intellij from a savepoint
    g
    • 2
    • 2
  • s

    Sergio Sainz

    01/27/2023, 2:52 AM
    Hi , I have a JSON stream with nested arrays like:
    Copy code
    {
      "day": 20230126,
      "stations": [
        {
          "name": "station1",
          "avail_spots": 1
        },
        {
          "name": "station2",
          "avail_spots": 2
        }
      ]
    }
    I am investigating how we can select the list of station names by day:
    Copy code
    SELECT A.day, B.name FROM TOPIC1  as A JOIN INNER_ARRAY as B
    Wonder if something like above is supported? Thanks šŸ™‚ !
  • a

    Artun Duman

    01/27/2023, 7:16 AM
    Hello folks, I think PyFlink doesn't have support for AsyncIO yet. Does anyone have experience making parallel network calls with PyFlink? What do people think the best way for this is? Thanks!
    g
    • 2
    • 3
  • s

    Sajid Samsad

    01/27/2023, 9:00 AM
    Hi folks. In this metrics document for Flink 1.15.0, all the registered metrics are on map function. But there is no example of registering a custom metrics on a filter function. As an experimentation, I registered some custom metrics (counter) in the filter function. But I did not see those metrics. Is metrics collection from filter function not supported?
    s
    • 2
    • 3
1...505152...98Latest