https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Artun Duman

    01/20/2023, 4:55 PM
    Hey everyone, we have been evaluating whether we should adopt StateFun. We think that its model fits our use case well, but we are concerned about the repo being quite outdated (looks like the latest release was in January 2022). Is it recommended that we use the core Flink runtime instead? Do people deploy StateFun with relatively high scale? Thoughts and feedback appreciated!
    👀 2
    v
    • 2
    • 1
  • g

    Guruguha Marur Sreenivasa

    01/20/2023, 4:59 PM
    I'm observing a strange behavior using
    KafkaSource
    vs
    FlinkKafkaConsumer
    . My streaming application runs just fine using
    FlinkKafkaConsumer
    but when I use the
    KafkaSource
    API, it errors out saying, "unable to fetch topic metadata"! Anyone faced this issue before? The configuration to both clients are exactly the same.
    m
    • 2
    • 9
  • l

    Leon Xu

    01/20/2023, 5:11 PM
    Hi Team, I am upgrading Flink from 1.12 to 1.16. We were using
    ParquetPojoInputFormat
    in 1.12 and looks like this class was removed starting from 1.14 (part of this PR). I wonder if there’s docs on what substitute class or migration we can reference? Thanks
    s
    • 2
    • 3
  • d

    Dheeraj Balakavi

    01/20/2023, 6:42 PM
    Hey folks! Question on Flink Stateful functions. Any reason we’ve stopped publishing snapshots to the maven snapshot repository? I’m looking at https://cwiki.apache.org/confluence/display/FLINK/Snapshots and https://repository.apache.org/content/groups/snapshots/org/apache/flink/statefun-flink-distribution/. This is the feature that I’m expecting to be on
    3.4-SNAPSHOT
    for some of the work that I’m doing: https://issues.apache.org/jira/browse/FLINK-29814.
    👀 1
    • 1
    • 3
  • m

    Mustafa Akın

    01/20/2023, 8:26 PM
    Hello, how can I make a batch table environment to make use of a remote environment in my program? I can convert a StreamExecutionEnvironment to a a StreamTableEnvironment, but I need batch.
    Copy code
    EnvironmentSettings settings = EnvironmentSettings
        .newInstance()
        .inBatchMode()
        .build();
    
    TableEnvironment tEnv = TableEnvironment.create(settings);
  • e

    Eric Liu

    01/20/2023, 9:15 PM
    Is there a metric or tool to monitor the Blob Server Connections? Keep seeing below error
    Copy code
    java.io.IOException: Unknown operation 71
    	at org.apache.flink.runtime.blob.BlobServerConnection.run(BlobServerConnection.java:116) [flink-dist-1.15.3.jar:1.15.3]
    2023-01-19 16:43:37,448 ERROR org.apache.flink.runtime.blob.BlobServerConnection           [] - Error while executing BLOB connection.
  • d

    DJ

    01/21/2023, 1:41 AM
    Hi, folks, questions on flink consuming debezium generated JSON, getting corrupt debezium JSON message exception
    s
    • 2
    • 16
  • n

    Nipuna Shantha

    01/21/2023, 8:34 PM
    Hi team, Is there a way to set a waiting time for the
    taskmanager
    if the
    jobmanager
    is terminated in the middle of the process until I make the
    jobmanager
    to a running state again? (Also I do not use the cluster mode. Need a solution other than using the cluster mode)
  • k

    Kyle Ahn

    01/21/2023, 9:31 PM
    [Quick Question - Duplicate/Missing records] I have a streaming pipeline as shown in the screenshot. The job uses
    EXACTLY_ONCE
    checkpointing mode (k8s Flink operator). When I change the job’s state to
    suspended
    to manually and change back to
    running
    . It seems that the raw data sink, which does not involve
    TumblingEventTimeWindows
    +
    ProcessWindowFunction
    , do not write duplicate records, but the processed data seem to either miss some records or write duplicate records. The two sinks are sharing the same sink. How is this happening?
    a
    m
    h
    • 4
    • 48
  • c

    chunilal kukreja

    01/23/2023, 5:08 AM
    Hi Team, While using asynretry stratergies with async i/o operator, is there a way to know the retry count in asyncInvoke() api?
    🆘 1
  • e

    Eric Liu

    01/23/2023, 7:05 AM
    Has anyone ever seen the
    RocksDBException: file is too short (xxxx bytes) to be an sstable
    error?
    c
    • 2
    • 2
  • b

    Bastien DINE

    01/23/2023, 8:01 AM
    Hello, We are migrating our HA setup from ZK to K8S, and we have a question regarding the RPC port. Previously with ZK, the RPC connection config was the : high-availability.jobmanager.port We were expecting that the config will be the same with K8S HA, as the doc says : "The port (range) used by the Flink Master for its RPC connections in highly-available setups. In highly-available setups, this value is used instead of 'jobmanager.rpc.port" (Flink 1.14) Plus this is in #advanced-high-availability-options section But it seems to use the common "jobmanager.rpc.port" (as the high-availability.jobmanager.port is closed) Does anyone know about this ? Best Regards, Bastien+
  • a

    Aviv Dozorets

    01/23/2023, 4:07 PM
    Question regarding
    FlinkKinesisConsumer
    : as it doesn’t have committed offsets like kafka, after a restart (or new deploy), i’d love to see flink to continue reading data from the point he left and not from
    latest
    or
    timestamp
    . Can anyone share how they approached this issue ? I’m sure that i’m not the only one with this usecase ? I don’t see flink starting from the checkpoint/savepoint.
    m
    • 2
    • 6
  • f

    Felix Angell

    01/23/2023, 4:08 PM
    Hey there, is there any reason why in the flink docs here the examples for a counter metric extend
    RichMapFunction
    whereas in Python (PyFlink) it is just a normal
    MapFunction
    ? How does the open method lifecycle work in the context of PyFlink in this case?
    d
    • 2
    • 2
  • m

    Marcelo Miranda

    01/23/2023, 4:50 PM
    Hello. We are having trouble understanding whether a Flink (1.14) job with a Kafka sink configured with EoS is able to recover in the case where the Kafka transaction expires. The documentation states that data loss occurs in this scenario, but it’s not clear whether the job is able to recover (with this data loss) of whether it aborts. This happened to us, where after a transaction timeout the job was unable to continue with ProducerFencedException. We are trying to understand if this is the root cause or if we’re still missing something else.
    d
    • 2
    • 1
  • m

    Maryam

    01/23/2023, 5:19 PM
    👋 I want to dedupe my stream and keep records with maximum processedCount. Would the following be correct to use if I need to do event time processing afterwards?
    Copy code
    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY id ORDER BY processedCount DESC) AS row_num
      FROM event)
    WHERE row_num = 1
    note
    ORDER BY processedCount
    . I know it does not translate to Deduplicate optimized plan because I am not using time attribute as the Flink doc suggests. My question is that whether the result preserve the watermark?
    • 1
    • 1
  • j

    Jason Politis

    01/23/2023, 6:10 PM
    Hello all. I need help diagnosing why a flink sql job with only 3 tables has only read 30 million records out from the main kafka topic, which contains 170million records. As time is progressing, it is getting slower and slower. We have checkpointing on and are using rocksdb. Could I be pointed to documentation/blog post/anything that will shake my noodle a bit so I can help my team solve this problem? Much appreciated.
    s
    • 2
    • 17
  • e

    Emmanuel Leroy

    01/23/2023, 7:02 PM
    In a Flink ReduceFunction, there is value1 and value2 and then the returned output. Is there any kind of guarantee that on the next call, value1 is the next value and value2 is the previously combined one or vice-versa? If I wanted to implement a simple way to get the latest value in a window, I’d do
    reduce(next, agg) and return next
    but if there are no guarantees that doesn’t work.
  • a

    Artun Duman

    01/23/2023, 9:42 PM
    Hello folks, I am curious about the current state of development for Stateful Functions. The repo seems to be quite inactive and the last release seems to be a year old and the docker image is based on Flink 1.14.x. Has the community moved to a new solution that I need to be aware of? Is anyone actively using this project? Or is the lack of new development is because this is a thin layer on top of Flink and doesn't need many new features? Thank you so much!!
    plusone 1
    s
    d
    +6
    • 9
    • 16
  • a

    Amir Halatzi

    01/24/2023, 8:09 AM
    Hey all! I’m trying to write a test for late arriving messages for a Flink DataStream pipeline with a tumbling window, and I can’t find any example of how to do it. The documentation in the site is a bit out of date (I couldn’t find any of the objects used there in 1.16) Where can I find a reference?
    m
    • 2
    • 5
  • p

    PRASHANT GUPTA

    01/24/2023, 8:35 AM
    Unable to cleanly uninstall my flink deployment CONTEXT - Flink kubernetes operator i am trying to uninstall my flinkdeployment but its giving the below errors and not letting me do anything. Its stuck in a deadlock condition. Status of flinkdeployment is RECONCILING , even if i try to delete it gets stuck
    Untitled.txt
    k
    c
    • 3
    • 5
  • k

    kingsathurthi

    01/24/2023, 10:28 AM
    Is it possible to submit multiple jobs in flinkdeployment? (flink operator)
    m
    • 2
    • 3
  • g

    Giannis Polyzos

    01/24/2023, 12:48 PM
    Flink SQL API: Let's assume two table that im running a join using sql.. when using rocksdb state backend i see that there are 2 column families being created - whether its an inner join left join etc -
    right-records
    and
    left-records
    . 1. Is there any way to specify the name for the column families? How are these names assigned? 2. Do we always keep both tables in state? for example if the first table is an append only stream how does state get expired in sql? if i have billions of events how do i make sure they dont stay around and fill up the disk?
    m
    • 2
    • 9
  • s

    Slackbot

    01/24/2023, 1:07 PM
    This message was deleted.
    a
    w
    • 3
    • 2
  • p

    Pradeep Ramachandra

    01/24/2023, 1:28 PM
    Hello Everyone. I'm trying to run the sample program outlined in flink documentation here, and also included all the relevant dependencies in pom file. During runtime I get an exception mentioned in the thread. Let me know if you have any pointers/suggestions. Thanks!
    ✅ 1
    • 1
    • 3
  • a

    Amyth

    01/24/2023, 3:26 PM
    Hi Flink community, I need a design suggestion for Flink use case of data migration, Problem Statement: We have huge number of records in our DB, and we wanted to copy data from source database to destination DB. Description: We have Source DB where we have almost 50+ tables in Database, all we wanted to migrate data from source DB[TimescaleDB/PostgreSQL] to destination DB[TimescaleDB/PostgreSQL] using Flink and scala. If we insert all these records sequentially, would not be feasible for huge number of records. Having said, we need to make sure our Job should maintain integrity. [All or None records should transfer] Pseudo Code for Batch Job[1-HR Interval]: 1. Initialize stream and table execution env. 2. Connect to default db 3. Check if data migration request present if no stop the flow 4 Otherwise read source and destination db details 5. Connect to source DB and start reading tables data 6. Temporarily stores data in dataset 7. Connect to destination DB and start adding data from temporary storage/Dataset to respective destination table 8. Follow steps 5 to 7 for all tables 9. Update status all data copy has been completed 10. Stop the job So I need a suggestion on 1. Is there Flink plugin or module support for data migration? 2. What is the feasible way to migrate data from one DB to other DB using Flink? 3. Does sequential record copy recommended in this scenario where table count and record count is huge? 4. Please share link/resources if any for this use case.
    s
    s
    • 3
    • 2
  • s

    Sumit Nekar

    01/24/2023, 5:46 PM
    Hello, I get following events when a job manager is starting during upgrade and job manager gets restarted after these events and it comes up fine. I suspect this is something related leader election process. Please suggest if you have any inputs. Job is being deployed using flink kuberenetes operator. }
    2023-01-24 17:43:25,532 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Job 7eef2349fa26a94509d5126440972a25 is submitted.
    2023-01-24 17:43:25,532 INFO  org.apache.flink.client.deployment.application.executors.EmbeddedExecutor [] - Submitting Job with JobId=7eef2349fa26a94509d5126440972a25.
    2023-01-24 17:43:27,040 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
    2023-01-24 17:43:27,042 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
    2023-01-24 17:43:27,044 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
    2023-01-24 17:43:27,299 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
    2023-01-24 17:43:27,299 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
    2023-01-24 17:43:27,301 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
    2023-01-24 17:43:27,301 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
    2023-01-24 17:43:27,313 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
    2023-01-24 17:43:27,313 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
    g
    w
    • 3
    • 5
  • g

    Guruguha Marur Sreenivasa

    01/24/2023, 6:22 PM
    Hi all, quick question: does anyone know the wiring format for AWS Glue AvroSerializer? I know the confluent one - it is well documented. But for Glue, I'm unable to find it. My goal is to retrieve the schema ID from the byte array record.
    d
    • 2
    • 4
  • s

    Sergii Mikhtoniuk

    01/24/2023, 7:17 PM
    Hi all, I'm migrating working code from Flink 1.13 to 1.16 but running into issues with temporal tables. I have a table created from a simple stream with schema:
    Copy code
    (
      `offset` BIGINT NOT NULL,
      `system_time` TIMESTAMP(3) NOT NULL,
      `event_time` TIMESTAMP(3) NOT NULL *ROWTIME*,
      `symbol` STRING NOT NULL,
      `price` INT NOT NULL,
    )
    when I call
    table.createTemporalTableFunction($"event_time", $"symbol")
    I get a following error:
    TableException: Unsupported conversion from data type 'TIMESTAMP(3) NOT NULL' (conversion class: org.apache.flink.table.data.TimestampData) to type information. Only data types that originated from type information fully support a reverse conversion.
    Is anyone familiar with migration to the new type system? I'm at a loss.
    m
    • 2
    • 6
  • r

    Rommel

    01/24/2023, 7:25 PM
    [Flink-Kubernetes-Operator]: Version 1.2. In JobManagerSpec and TaskManagerSpec, I look into Pod podTemplate -> ObjectMeta metadata -> Map<String, String> labels, is this the place where I define the label of the nodes that this pod can deploy to ? I don’t think i am familiar with the code enough to find the config setup in the operator, can someone help me on this? Many many thanks.
    g
    • 2
    • 5
1...495051...98Latest