https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • c

    chunilal kukreja

    08/04/2022, 12:09 PM
    Hi Folks, What happens if I run the same flink application twice reading data from kafka stream having one partition only? Expectation: One Flink application would process the message while other one won’t get any message as they both are in same consumer grp. Actual: Both the app’s are getting the same message, even though the consumer group name is same.
    h
    • 2
    • 2
  • y

    Yui H

    08/04/2022, 12:28 PM
    Hi Folks Flink won’t be able to load the savepoint (It seems the savepoint becomes incompatible) if I renamed classes & moved files to different folder even though each operator is assigned to unique id and name? When I deploy Flink app, I see errors like •
    Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: …
    •
    Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: …
    •
    Caused by: java.lang.ClassNotFoundException: com.some.flink.pipeline.events.blah
    If so we cannot really rename / move files in Flink applications?
    ✅ 1
    h
    • 2
    • 5
  • q

    Qinghui Xu

    08/04/2022, 3:03 PM
    Hello Folks, We are running a session cluster on Mesos with Flink 1.13, on which around 10 jobs are running. The setup of the cluster: • Run in session mode • Use Mesos scheduler • Enable Zookeeper HA • Job scheduling timeout is set to 10 minutes, with tolerance of maximum 6 failures From time to time we see the scheduler cannot fullfill the resource requirement for the jobs: • A job is submitted with a requirement of 120 slots, but the jobmanager is only able to allocate 119 slots for it. • The job timeouts its scheduling after 10 minutes, and then retry. • It still only gets 119 slots out of 120 required, and so on, until it fails after 6 retries. • From the metrics of Mesos master, the jobmanager only negotiate with Mesos when job is submitted, and no negotiations during those retries. Does it ring a bell?
    • 1
    • 1
  • j

    Jeesmon Jacob

    08/04/2022, 3:15 PM
    Hi team, we are in the process of gathering details for migrating existing standalone flink deployments to kubernetes operator based deployments. Existing jobs are running in HA mode and checkpoints/HA metadata are stored in S3 bucket. We can also trigger a savepoint to store it in S3 bucket. In the new setup, location of S3 bucket where the checkpoint/savepoint/HA metadata stored are different (just an additional prefix with FlinkDeployment name so we can use bucket for multiple deployments). We are not planning to change flink version or job topology. So in this case, is it possible to copy over HA metadata and Checkpoint data to new location in S3 and start the FlinkDeployment to continue running? Anything else we need to worry about when migrating from standalone to kube native integration? Thanks.
    g
    m
    j
    • 4
    • 29
  • r

    RM

    08/04/2022, 4:16 PM
    Hi Users! We observed a strange behavior in our streaming job on restoring from an externalized checkpoint which was generated 10 mins prior to the deployment. On only one shard(say S1), the Kinesis iterator age shot up by 2 days, causing the global watermark to lag by 2 days. Looking through Cloudwatch logs, 2 days prior, I see a sudden drop in outgoing bytes from S1 despite a steady incoming rate. There were no restarts or checkpoint failures all along to the pipeline. Appreciate any pointers or insights as to why a streaming job would stop consuming from an open shard abruptly ?
    j
    • 2
    • 10
  • d

    Don Li

    08/04/2022, 7:13 PM
    Hello, is there a way to create a
    DataStream<String>
    from a
    FileSource
    without using the environment variable to generate the datastream? I have a file path inside of another
    DataStream<String>
    and I want to go through the lines of the file.
  • i

    Ivan M

    08/04/2022, 7:25 PM
    Hi all! I'm playing around with temporal joins. https://gist.github.com/rmt-ivan/f971ef02689f7b756219404e20a5e1bf But the last query doesn't produce any data. On the other hand when I try regular join I can see data. Basically I try to join cdc data (c) to fact table (a) and produce enriched events. What I'm doing wrong? :))
    a
    d
    • 3
    • 6
  • a

    Adrian Chang

    08/04/2022, 7:46 PM
    Hello, I am trying to use SQL Window Join but it's not producing any data My code is very similar to the documentation at https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/sql/queries/window-join/#innerleftrightfull-outer
    Copy code
    SELECT
        motion.window_start AS window_start,
        motion.window_end AS window_end,
        motion.serial AS serial, 
        motion.groupId AS motion_group, 
        motion.tsMs AS motion_tsMs, 
        CAST(location.tsMs AS STRING) AS location_tsMs, 
        location.resourceId AS location_group,
        motion.groupId AS resourceId
    FROM (
        SELECT * FROM TABLE(TUMBLE(TABLE stream_motion, DESCRIPTOR(tsMs), INTERVAL '1' MINUTES))
    ) motion
    LEFT JOIN (
        SELECT * FROM TABLE(TUMBLE(TABLE stream_location, DESCRIPTOR(tsMs), INTERVAL '1' MINUTES))
    ) location
    ON motion.groupId = location.resourceId AND motion.window_start = location.window_start AND motion.window_end = location.window_end;
  • r

    Roman Bohdan

    08/04/2022, 7:50 PM
    Hello guys, this channel is really helpful, thank you a lot. I would like to ask you about multiple taskmanager replicas, i want to run one jobmeneger and two taskmanagers togather to increase processing speed. I`m using kubernetes deployment and set replicas to 2.
    Copy code
    spec:
      replicas: 2
    in the result
    Copy code
    journey-jobmanager-t6x2v                          3/3     Running   0          63m
    journey-taskmanager-6fb96b7bbc-4s8fn              3/3     Running   0          24m
    journey-taskmanager-6fb96b7bbc-q26cp              3/3     Running   0          18m
    it was deployd successfully, but second taskmanager port didn`t start. It`s frozen and stay with current logs
    Copy code
    ger_0 .
    2022-08-04 19:30:30,072 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] [flink-akka.actor.default-dispatcher-6] Start job leader service.
    2022-08-04 19:30:30,076 INFO  org.apache.flink.runtime.filecache.FileCache                 [] [flink-akka.actor.default-dispatcher-6] User file cache uses directory /tmp/flink-dist-cache-003af8c3-14b3-4df4-974c-5c42e200acbf
    2022-08-04 19:30:30,081 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] [flink-akka.actor.default-dispatcher-6] Connecting to ResourceManager <akka.tcp://flink@journey-jobmanager:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)>.
    2022-08-04 19:30:30,405 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] [flink-akka.actor.default-dispatcher-6] Resolved ResourceManager address, beginning registration
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_33e6c155-85e6-4a50-9ee1-1e530ec97636.jar) to method java.nio.DirectByteBuffer.cleaner()
    WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    2022-08-04 19:30:30,499 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] [flink-akka.actor.default-dispatcher-15] Successful registration at resource manager <akka.tcp://flink@journey-jobmanager:6123/user/rpc/resourcemanager_*> under registration id 25c8fc1abec5034cf9e6d9fce90f9b7d.
    could you please suggest, what did i forget to check or how can i fix it, please.
    g
    c
    • 3
    • 17
  • c

    chunilal kukreja

    08/05/2022, 6:21 AM
    hi Team, As per the documentation https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#consumer-offset-committing, the concept of consumer group across multiple flink jobs doesn’t fits well. So is it safe to call out that multiple instances of same flink job won’t follow the concept of consumer group? All the scalability & parallelism has to be achieve within the job itself.
    s
    • 2
    • 1
  • f

    Felix Angell

    08/05/2022, 9:35 AM
    Heya, am I correct in assuming that the PyFlink Kinesis connectors was not added until recently (1.15) for data streams? Or is there a way I can do this on an earlier version (1.13). For now I've just copied over the connector since it seems to be a simple wrapper over the
    jvm_gateway
    - would this cause any problems?
    c
    • 2
    • 2
  • a

    Ali Zia

    08/05/2022, 2:05 PM
    Quick question - how are null values treated when doing aggregates on windows. For example, if I do AVG for a field, are NULL values just ignored in calculating the average or are they treated as 0? Thanks!
  • a

    Ashwini Padhy

    08/05/2022, 5:49 PM
    Hi Team, I have a wired scenario when we swapped the EC2 node/pod in kubernetes the flink job stopped working and did not restart automatically .How to handle such scenarios so that flink will restart the job and run from last checkpoint
    s
    • 2
    • 1
  • s

    Sergey G

    08/05/2022, 8:19 PM
    org.apache.flink.table.api.ValidationException: The UNION operation on two unbounded tables is currently not supported. at org.apache.flink.table.operations.utils.SetOperationFactory.failIfStreaming(SetOperationFactory.java:108) ~[flink-table-api-java-1.15.1.jar:1.15.1] I use StreamTableEnvironment. Do you know how to fix such issue?
    s
    • 2
    • 3
  • g

    Guoqin Zheng

    08/05/2022, 9:57 PM
    Hi team, quick question, I build a (batch) flink job to process data from s3 with table API. My s3 data is organized by date, ie.
    $s3_bucket/path/to/my/folder/yyyy/mm/dd
    . Now let say I want to process a range of dates’ data, e.g. 2022/08/01 ~ 2022/08/05. Is there an out of box solution to fetch all the data in a singe job, or I need run the job five times with each single date’ data? Thanks!
  • k

    Kyle Ahn

    08/06/2022, 9:09 PM
    Quick question: I am receiving the following in an application deployed to k8s.
    Copy code
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.formats.parquet.avro.AvroParquetWriters
    In my
    pom.xml
    , I have the following dependency.
    Copy code
    <dependency>
    			<groupId>org.apache.flink</groupId>
    			<artifactId>flink-parquet</artifactId>
    			<version>${flink.version}</version>
    		</dependency>
    Has anyone experienced the same issue?
    d
    o
    c
    • 4
    • 9
  • s

    Sigh

    08/07/2022, 2:07 PM
    Hi all, whats will be the problem usine rowdata in the datastream api
  • s

    Sigh

    08/07/2022, 2:07 PM
    ?
  • s

    Sigh

    08/07/2022, 2:07 PM
    I usualy use row
  • h

    Hunter Medney

    08/08/2022, 10:43 AM
    Hi all, I'm building a DataStream job with a RichFlatMap operation that updates keyed state based on each input element. Given: • keyed state will be a mix of scalars and lists (currently using a generic type with a pojo class) • some input elements will have no impact on the keyed state and nothing will be output from the operator in these cases • some input elements could clear the entire keyed state Which type of keyed state would best fit this scenario? AggregatingState would seem the best fit but I'm not sure how to handle the "no update" and "clear" cases within the AggregateFunction. For the "no update" cases, how would the operator know the AggregateFunction didn't update the accumulator? How would the "clear" case be handled?
    d
    • 2
    • 9
  • k

    kevin

    08/08/2022, 2:13 PM
    👋 Hello, team!
    👋 1
  • k

    kevin

    08/08/2022, 2:28 PM
    Hi team, I am a flink beginner and I have a question? I’m on a DataStream, doing keyby (using a custom KeySelector), then using window (TumblingEventTimeWindows), and finally doing reduce. When I use the same Batch data source, in a single thread, the number of keys obtained by using a custom KeySelector is inconsistent with the number of keys obtained by using a flink job (that is, the number of results after reduce). Does this comparison make sense? If it makes sense, why is it inconsistent and what are the possible reasons?
    c
    • 2
    • 12
  • j

    Jirawech Siwawut

    08/08/2022, 4:39 PM
    Hi team. I am trying to implement Hive sink using Table API. As far as i understand our Hive metastore is secured by Kerberos authentication. I copied hive-site.xml from our Spark application and follow instruction on Flink website to set
    hive_conf_dir
    I got following error when i tried to run Hive sink job. Does anyone know how to solve this? Here is error
    Copy code
    Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
    	at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:86)
    	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:95)
    	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:148)
    	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
    	... 29 more
    Caused by: java.lang.reflect.InvocationTargetException
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    	at org.apache.hadoop.hive.metastore.utils.JavaUtils.newInstance(JavaUtils.java:84)
    	... 32 more
    Caused by: MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: GSS initiate failed
  • j

    Jeremy Ber

    08/08/2022, 6:11 PM
    Any programatic way in DataStream API to get the flink version?
    c
    • 2
    • 4
  • a

    Adrian Chang

    08/08/2022, 7:05 PM
    Hello, Is
    ARRAY_AGG
    SQL aggregation function implemented in Flink ?
    • 1
    • 1
  • m

    Marco Villalobos

    08/08/2022, 8:33 PM
    Hi everybody, I emailed this question to the email list last week, but I have not gotten a reply. I'll ask here now. I want to know if it is possible through the SQL API to tumble windows by row size instead of a time-range. I know its possible through the DataStream API and also the Table API, but how about in the SQL API? The SQL API Documentation does not say anything about it, no examples, no references, no disclaimers. I would rather just use the SQL api without switching approaches. Thank you.
  • j

    Jody N

    08/08/2022, 11:31 PM
    Question about Kafka connector and security. I am using a Kinesis Data App that runs Flink 1.13 (latest they support) and am trying to connect it to a Confluent Cloud Kafka stream that requires SASL PLAINTEXT auth. The docs for the TABLE API connector don't mention this module as being available (though it is mentioned in 1.14) and I receive an exception (missing module) when I run the code. Is this scheme absolutely not supported w 1.13?
    h
    • 2
    • 11
  • j

    Jeff Levesque

    08/09/2022, 2:49 AM
    I'm trying to create a sliding window, using results from a previous tumbling window. However, I'm trying to define a
    trigger
    UDF within the sliding window, such that I can perform operations on the given field:
    Copy code
    input_table = table_env.from_path('tumbling_window_table')
        sliding_window_table = (
            input_table.window(
                Slide.over('8.hours')
                .every('1.minutes')
                .on('window_start')
                .alias('eight_hour_window')
            )
            .group_by('ticker, {}'.format('eight_hour_window'))
            .select('''
                ticker,
                trigger(min_price.collect, max_price.collect) as price,
                {0}.start as window_start,
                {0}.end as window_end
            '''.format(
                'eight_hour_window',
                'window_start'
            ))
        )
    When I run the above, where the UDF
    trigger
    function only returns the same
    min_price
    input value:
    Copy code
    return min_price
    The entire above
    select
    is ignored (no errors raised, just nothing outputting). However, if I replace the above
    collect
    callback with
    avg
    , or
    min
    , or
    max
    , then the above
    select
    will return results correspondingly. I'm not sure if there is a better aggregation function [1] that I can utilize? For example, let's assume I want to create my own averaging UDF (instead of using the
    avg
    aggregation callback). If I replace
    collect
    with the
    count
    callback, I notice the
    select
    output for the UDF returns incrementing integers every
    1.minutes
    . --- [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#aggregate-functions
    d
    • 2
    • 8
  • g

    Gyula Fóra

    08/09/2022, 4:39 AM
    Has anyone seen this error on Kubernetes?
    Copy code
    flink-main-container exceeded its local ephemeral storage limit "1Gi"
    Any idea how to configure the Flink pods to solve it?
    x
    • 2
    • 2
  • m

    Mustafa Akur

    08/09/2022, 9:45 AM
    Hello everyone. Is there any method that I can debug flink job (pyflink) using debugger. Specifically I have an error that occur during checkpoint recovery. I want to understand what causes the error without using logging. Kind regards. When I run python scripytit doesn't run as flink job hence I cannot observe the behaviour during recovery.
    d
    • 2
    • 5
1...111213...98Latest