https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    Jirawech Siwawut

    11/19/2022, 3:11 PM
    Hi team. Is it possible to read with
    upsert-kafka
    from Kafka topic written by Spark? I use Flink SQL to read with
    upsert-kafka
    and get no result when trying to print table stream. Note: If i sink to
    upsert-kafka
    with Flink and read it back. It seems to work fine. I just wonder how it actually works under the hood and would it be possible to sink data with Spark and read it back with Flink upsert-kafka
    s
    j
    • 3
    • 8
  • d

    dino bin

    11/21/2022, 6:03 AM
    Hi team, I want to set Kerberos using the kubernetes operator. However, the pod cannot be created because the keytab file in the docker image cannot be found. Is anyone aware of this issue? ---solved--- keytab file copy in flink kubernetes operator pod
    g
    • 2
    • 4
  • d

    ding bei

    11/21/2022, 7:14 AM
    Hi guys, I notice flink operator only support local jar address, i have to add a sidecar container to download jar from a http server , is there any plan support remote download jar in near furture?
    g
    • 2
    • 5
  • w

    wuyi yang

    11/21/2022, 8:37 AM
    Hi team. Flink version 1.16.0 , kafka source, how to extract timestamp from message field , rather than the kafka meta-data?
    • 1
    • 5
  • m

    Maher Turifi

    11/21/2022, 11:10 AM
    Hi, Can I ask about using CDC connector VS using JDBC Lookup Join. I'm using Temporal lookup join with JDBC connector (with the lookup cache enabled) to enrich a table with data that is queried from an MYSQL database system. If I used CDC connector (mysql-cdc) to connect to MYSQL to enrich the data in the main stream, this will be reading a database snapshot and continues to read binlogs. My question is: how would that work compared to (the Lookupjoin with cache enabled) approach? how would this affect the performance? would the database snapshot taken at the start be kept in and cached in each node? thanks.
    m
    • 2
    • 1
  • f

    Felix Angell

    11/21/2022, 3:28 PM
    Is there any kind of recommended interval for checkpointing in Flink? We have ours set to 2 minutes and we're not sure if this should be set differently either much greater or much more frequently. There's also some weird race condition in how we have long running checkpoints that could interfere with a snapshot being taken has anyone had any experience with this? Our current thought with this is to increase pause between checkpoints to reduce the chance of this race condition ocurring
    j
    • 2
    • 1
  • j

    Jeremy DeGroot

    11/21/2022, 4:01 PM
    I've been seeing something weird in our windowed Flink jobs: During periods of low activity, we start seeing a lot of late records coming out of the window operator and going to our side output channel. We have max idleness and out of orderness both set to what I think are generous values (5s and 20s respectively). The data comes from a FlinkKafkaConsumer source. Any idea what we should be checking for this?
    s
    • 2
    • 3
  • j

    Jeremy Ber

    11/21/2022, 4:02 PM
    is there any way within a running flink job to get an approximation to some metrics like: • how many windows are currently open • how much state does each window make up? • what is causing state accumulation for each of these windows?
    s
    • 2
    • 2
  • p

    Padraic McAtee

    11/21/2022, 4:09 PM
    Hey all, wanted to resurface my earlier inquiry. Any pointers on how I could achieve this?
  • j

    Joris Basiglio

    11/21/2022, 4:54 PM
    Hey, Does Flink emits any metrics regarding savepoints? There is a whole section for checkpoint but nothing for savepoint.
    c
    • 2
    • 5
  • k

    Krish Narukulla

    11/21/2022, 7:39 PM
    I am unable to read Hive table from GCS , do anyone run into error?
    Copy code
    Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem not found
            at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2638) ~[?:?]
            at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3341) ~[?:?]
            at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3373) ~[?:?]
            at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:125) ~[?:?]
            at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3424) ~[?:?]
            at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3392) ~[?:?]
    hive-site.xml is
    Copy code
    <configuration>
      <property>
        <name>hive.metastore.uris</name>
        <value><thrift://xxx:9083></value>
        <description>JDBC connect string for a JDBC metastore</description>
      </property>
      <property>
        <name>fs.s3n.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
      </property>
      <property>
        <name>fs.s3.impl</name>
        <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
      </property>
      <property>
        <name>fs.gs.impl</name>
        <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
      </property>
      <property>
        <name>fs.AbstractFileSystem.gs.impl</name>
        <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
        <description>
          The AbstractFileSystem for gs: (GCS) uris. Only necessary for use with Hadoop 2.
        </description>
      </property>
    </configuration>
    s
    • 2
    • 8
  • t

    Thiruvenkadesh Someswaran

    11/21/2022, 10:24 PM
    howdy i am trying to get the Basic-Checkpoint-HA example https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-checkpoint-ha.yaml#L47 i got the following error on the deploy
    Warning  FailedMount  89s (x21 over 28m)   kubelet            MountVolume.SetUp failed for volume "flink-volume" : hostPath type check failed: /tmp/flink is not a directory
    m
    c
    • 3
    • 7
  • c

    Chris Ro

    11/22/2022, 1:15 AM
    i’m using
    AsyncDataStream.orderedWait
    with a
    capacity
    (let’s say 50). my
    asyncInvoke
    method is making an HTTP call to look up some data for enrichment and it’s doing this by spawning a background thread that, when it completes, uses
    resultFuture.complete
    . if my async operator is processing all 50 records simultaneously, does the
    idleTimeMsPerSecond
    show as 100%, or 0%? this blog post seems to suggest custom threads (noted that they’re discouraged) don’t get counted correct for the idle and busy time metrics.
  • u

    박담

    11/22/2022, 2:45 AM
    Hi guys I am trying to process a stream using flink. The data is as below. example data) time=1669082219148, trans_time=2022-1-11-22T105659, flink_time=20221122105659148 time=1669082219148, trans_time=2022-1-11-22T105659, flink_time=20221122105659148 time=1669082220148, trans_time=2022-111-22T105700, flink_time=20221122105700148 The watertmark was created as follows by using flink_time (ymdhms.ms) or time (unix time) of the above data. WatermarkStrategy st =WatermarkStrategy .SystemData>forBoundedOutOfOrderness(Duration.ofSeconds(60)) .withTimestampAssigner((event, timestamp) -> event.getFlink_time()); I am trying to handle event time using TumblingEventTimeWindows. DataStream<SystemResultDto> aggStream=mainStream .keyBy((dto)-> { SystemData cdto=((SystemData)dto); return cdto.getTxn_id(); }) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new CustomProcess()) However, if it is processed based on the actual event time, it does not work. So, if I process it using process time, it is being processed. DataStream<SystemResultDto> aggStream=mainStream .keyBy((dto)-> { SystemData cdto=((SystemData)dto); return cdto.getTxn_id(); }) .window(TumblingProcessingTimeWindows.of(Time.seconds(10))) .process(new CustomProcess()); What's the problem?
    • 1
    • 1
  • e

    Emmanuel Leroy

    11/22/2022, 3:49 AM
    Hi, i have this config in my FLink session deployment with the operator:
    Copy code
    kubernetes.operator.periodic.savepoint.interval: 30m
        kubernetes.operator.savepoint.history.max.age: 24h
        kubernetes.operator.savepoint.history.max.count: "25"
    yet my savepoints don’t get deleted and I have hundreds of them… any idea why and how to troubleshoot?
    g
    • 2
    • 46
  • b

    Buddhike de Silva

    11/22/2022, 6:26 AM
    Hi Everyone, Private fields in operators such as something deriving
    RichFlatMapFunction
    class seem to get serialised. You can prevent this behaviour by adding
    private transient
    to those fields. What is the purpose of this serialisation? When is it appropriate to use
    private  transient
    and when is it not? Thank you so much in advance for shedding some light here 🙏🏾
    c
    k
    • 3
    • 5
  • s

    Suparn Lele

    11/22/2022, 6:30 AM
    Hi, We have one table A in database. We are loading that table into flink using Flink SQL JdbcCatalog. Here is how we are loading the data val catalog = new JdbcCatalog("my_catalog", "database_name", username, password, url) streamTableEnvironment.registerCatalog("my_catalog", catalog) streamTableEnvironment.useCatalog("my_catalog") val query = "select timestamp, count from A" val sourceTable = streamTableEnvironment.sqlQuery(query) streamTableEnvironment.createTemporaryView("innerTable", sourceTable) val aggregationQuery = select window_end, sum(count) from TABLE(TUMBLE(TABLE innerTable, DESCRIPTOR(timestamp), INTERVAL '10' minutes)) group by window_end It throws following error Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. The window function TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval[, datetime interval]) requires the timecol is a time attribute type, but is TIMESTAMP(6). In short we want to apply windowing aggregation on an already existing column. How can we do that Note - This is a batch processing Please help
    m
    d
    • 3
    • 2
  • j

    Jeremy DeGroot

    11/22/2022, 10:51 AM
    I should also mention WRT this, we're on Flink 1.15.2
    s
    • 2
    • 5
  • c

    chunilal kukreja

    11/22/2022, 11:32 AM
    Hi Team, With Flink 1.16.0. retry support has been added in Async I/O API. But it seems the documentation has typo in listing the AsyncRetryStrategies builder. In the code snippet it is written as;
    Copy code
    AsyncRetryStrategy asyncRetryStrategy =
    	new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
    		.retryIfResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
    		.retryIfException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
    		.build();
    Rather it should be;
    Copy code
    AsyncRetryStrategy asyncRetryStrategy =
    	new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
    		.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
    		.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
    		.build();
    If one follows the document snippet, it fails to recognise “retryIfResult” & “' retryIfException”. Can someone pls confirm, is this observation correct?
    e
    • 2
    • 2
  • a

    Aishwarya Raimule

    11/22/2022, 8:34 PM
    Hi all, I want to record the count of late arrived events in PyFlink 1.13.2. One a way to do this is by emitting the late events as a SideOutput, but I don’t find OutputTag or SideOutput in the source code of this PyFlink version. Can someone tell me how I can get the count of late events in PyFlink 1.13.2? Thanks in advance.
    d
    • 2
    • 1
  • u

    박담

    11/23/2022, 4:49 AM
    I am trying to design and implement a flink flow as shown in the attached figure. I have a question about the red part of the picture. I want to merge multiple data with different formats using java data object and send it to the sink as one data. Please let me know if there is a better way.
    👍 1
    • 1
    • 1
  • g

    Gaurav Miglani

    11/23/2022, 7:32 AM
    We are migrating our k8 jobs from flink 1.13 to flink 1.16 after output validation, one issue that i'm facing is that in flink 1.13 job ids were
    00000000000000000000000000000000
    as all our jobs use last-state upgrade mode, but in flink 1.16 we are getting some other job ids like
    ffffffffd85f74d40000000000000000
    , but on every run, it is same for a job, can anyone let me know logic for it or is it random uuid 🤔
    c
    g
    • 3
    • 12
  • u

    박담

    11/23/2022, 9:12 AM
    Hi guys I am trying to process a stream using flink. The flink flow is shown in the figure below. The data is as below.
    Copy code
    example data)
    time=1669082219148, trans_time=2022-1-11-22T10:56:59, flink_time=20221122105659148
    time=1669082219148, trans_time=2022-1-11-22T10:56:59, flink_time=20221122105659148
    time=1669082220148, trans_time=2022-111-22T10:57:00, flink_time=20221122105700148
    The watertmark was created as follows by using flink_time (ymdhms.ms) or time (unix time) of the above data.
    Copy code
    WatermarkStrategy st =WatermarkStrategy
    .SystemData>forBoundedOutOfOrderness(Duration.ofSeconds(60))
    .withTimestampAssigner((event, timestamp) -> event.getFlink_time());
    I am trying to handle event time using TumblingEventTimeWindows.
    Copy code
    DataStream<SystemResultDto> aggStream=mainStream
    .keyBy((dto)-> {
    SystemData cdto=((SystemData)dto);
    return cdto.getTxn_id();
    })
    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
    .process(new CustomProcess())
    However, if it is processed based on the actual event time, it does not work. So, if I process it using process time, it is being processed.
    Copy code
    DataStream<SystemResultDto> aggStream=mainStream
    .keyBy((dto)-> {
    SystemData cdto=((SystemData)dto);
    return cdto.getTxn_id();
    }) .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
    .process(new CustomProcess());
    • 1
    • 1
  • v

    Vishal bharatbhai Vanpariya

    11/23/2022, 11:42 AM
    HI Team, i have one datastream which has json data as below.
    Copy code
    {
      "ts": 1669099696691,
      "uuid": "fa8dfe26114c49acb9745ee8ac9bbfdb",
      "count": 95
    }
    type of this stream is
    DataStream<String>
    , but i want to convert it to keyedstream
    KeyedStream<T, K>
    where key will be count and data will be same json string. without applying and aggregation. can somebody guide me on this? thanks
    s
    • 2
    • 1
  • t

    Thiruvenkadesh Someswaran

    11/23/2022, 6:05 PM
    can i have more then 1 flink deployment in a cluster ..
    kubectl port-forward svc/basic-checkpoint-ha-example-rest 8081:8081 -n flink
    E1123 12:56:37.974939   96110 portforward.go:406] an error occurred forwarding 8081 -> 8081: error forwarding port 8081 to pod e33822576ad4f60ddbd13d462e29f6641c6cd07fd606136cd2613e020432b5ee, uid : exit status
    I have the HA and regular examples running
    s
    m
    m
    • 4
    • 6
  • r

    raghav tandon

    11/24/2022, 6:51 AM
    When my job is doing a shutdown using
    fixed-strategy
    , after that it is cleaning up state in Zk as well… How can i avoid that from happening? Due to this i am loosing the state of job and checkpoints as well
    • 1
    • 2
  • v

    Vishal bharatbhai Vanpariya

    11/24/2022, 7:03 AM
    Hi Team, can somebody please help me to write stream to s3 bucket? what dependency i should add? how to write flink-config.yaml file?
    m
    • 2
    • 5
  • l

    licho

    11/24/2022, 7:18 AM
    TO_TIMESTAMP(JSON_VALUE( data , '$.dt' ))
    dt is generated by
    LocalDateTime.now().toString()
    , can't it as a lateral key? flink version 1.16
    m
    • 2
    • 3
  • l

    licho

    11/24/2022, 9:46 AM
    Hi team, Temporal join not work!
    • 1
    • 2
  • s

    Sumit Khaitan

    11/24/2022, 10:09 AM
    Hi team, Is reactive mode (autoscaling) supported in case when we are using Flink Kubernetes operator ?
    s
    g
    +2
    • 5
    • 60
1...343536...98Latest