https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • v

    Varun Sayal

    10/20/2022, 2:35 PM
    I’d like to see if a specific running version of our flink app has any stateful operators that don’t have uids specified
    c
    • 2
    • 1
  • e

    Eric Xiao

    10/20/2022, 2:38 PM
    We are exploring the Flink Table/SQL APIs and was wondering how Flink is able to determine a column's nullability from the
    DataStream
    ? i.e.
    Copy code
    val source: DataStream[...] = ...
      val inputTable = tableEnv.fromDataStream(rows)
    When we printed out the schema we noticed it had set all the string columns to nullable and all the integer/timestamp columns to not nullable.
  • n

    Nikhil Mishra

    10/20/2022, 5:42 PM
    Let me know if posting my stack over flow question here is not a right thing to do. Just wanted to post here for better reach
    s
    m
    • 3
    • 12
  • s

    Slackbot

    10/20/2022, 9:06 PM
    This message was deleted.
  • m

    Matt Fysh

    10/21/2022, 12:55 AM
    I’m hoping to produce a
    DataStream[RowData]
    type to pass into a Delta Sink, but the following is not working, I think because RowData is an abstract class. Does anyone know which class I should be using here?
    Copy code
    kinesis.map(value => {
      val jsonNode: JsonNode = jsonParser.readValue(value, classOf[JsonNode])
      new RowData(jsonNode.get("user_id").toString(), 1)
    })
    k
    • 2
    • 80
  • c

    chunilal kukreja

    10/21/2022, 4:39 AM
    Hi Team, I need to implement custom kafka sink in order to add checkpointing (using checkpointedfunction). But I am facing issue to sink element in overriden invoke method. Followed the sample: https://github.com/deadwind4/flink-training/blob/master/src/main/java/me/training/flink/state/BufferingSink.java Can someone please point to an eg or sample to do the same.?
    m
    • 2
    • 3
  • s

    Slackbot

    10/21/2022, 6:23 AM
    This message was deleted.
    b
    • 2
    • 1
  • s

    Soumya Ghosh

    10/21/2022, 7:33 AM
    Hi team, I am trying to use flink-elasticsearch to sink data to Elasticsearch (Table connector). In my project pom I’ve added this library and created fat jar, through IDE it runs fine on local. When I try to run it through docker image (via flink-kubernetes operator) it fails to run. On inspecting job manager logs we observed various ClassNotFoundException, so we tried to add flink-elasticsearch related libraries to flink lib path in docker image. When I added extra jars in dockerfile I marked flink-elasticsearch library as provided in project pom.xml. Dockerfile
    Copy code
    FROM flink:1.13.6-scala_2.12-java11
    RUN mkdir -p $FLINK_HOME/usrlib
    
    RUN wget <https://repo1.maven.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/1.13.6/flink-sql-avro-confluent-registry-1.13.6.jar> -P $FLINK_HOME/lib/
    RUN wget <https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.6/flink-connector-jdbc_2.12-1.13.6.jar> -P $FLINK_HOME/lib/
    RUN wget <https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch7_2.12/1.13.6/flink-connector-elasticsearch7_2.12-1.13.6.jar> -P $FLINK_HOME/lib/
    RUN wget <https://repo1.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch-base_2.12/1.13.6/flink-connector-elasticsearch-base_2.12-1.13.6.jar> -P $FLINK_HOME/lib/
    RUN mkdir $FLINK_HOME/plugins/s3-fs-presto && cp $FLINK_HOME/opt/flink-s3-fs-presto-1.13.6.jar $FLINK_HOME/plugins/s3-fs-presto/
    RUN mkdir  $FLINK_HOME/plugins/s3-fs-hadoop && cp $FLINK_HOME/opt/flink-s3-fs-hadoop-1.13.6.jar $FLINK_HOME/plugins/s3-fs-hadoop/
    RUN mkdir $FLINK_HOME/plugins/s3 && cp $FLINK_HOME/opt/flink-s3-fs-presto-1.13.6.jar $FLINK_HOME/plugins/s3/ && cp $FLINK_HOME/opt/flink-s3-fs-hadoop-1.13.6.jar $FLINK_HOME/plugins/s3/
    COPY my-project/target/jars/my-project-1.0-SNAPSHOT-jar-with-dependencies.jar $FLINK_HOME/usrlib/my-project-1.0-SNAPSHOT-jar-with-dependencies.jar
    Eventually I kept adding more and more transitive dependencies in docker image and after a point task manager is boots and fails with ClassNotFoundException for
    ClassDefFoundError: Could not initialize class org.elasticsearch.client.RestClient
    Any idea on how address this? As per flink’s documentation external connectors are not part flink distribution and should be either added in project’s fat jar or be placed in flink lib path.
    m
    s
    g
    • 4
    • 18
  • b

    Bastien DINE

    10/21/2022, 8:26 AM
    Hello everyone, how can I register a TypeInfoFactory without using the Annotation @TypeInfo on the bean (I can not modify bean code for severals reasons) TypeInfo java-doc says : "In a hierarchy of types the closest annotation that defines a factory will be chosen while traversing upwards, however, a globally registered factory has highest precedence (see
    TypeExtractor#registerFactory(Type, Class)
    )." The registerFactory does not exist anymore How can I register a global factory ? https://nightlies.apache.org/flink/flink-docs-release-1.15/api/java/org/apache/flink/api/common/typeinfo/TypeInfo.html
    👍 1
    c
    s
    • 3
    • 4
  • r

    Rashmin Patel

    10/21/2022, 9:48 AM
    Hii everyone Is there a way in flink to stream/send output of an operator (C here) to more than one downstream operators (D1 and D2 here)? /---->D1 Ex. A-->B-->C---| \---->D2
    d
    d
    • 3
    • 4
  • j

    Jasmin Redzepovic

    10/21/2022, 12:03 PM
    Hello everyone! 👋 We encountered an issue when we deployed Flink streaming job on k8s. Job is having extremely low throughput of 15 records per second per partition while reading data from Kafka topic. I tried changing various configuration parameters, but no luck. The most confusing thing is that when executing job locally through IDE, I get throughput of ~150 records per second per partition. Can someone please help with this or point me in the right direction? 🙂 (this is the first job I’ve ever deployed, so sorry if this is a noob question) More info in thread ⬇️
    s
    • 2
    • 5
  • t

    Tiansu Yu

    10/21/2022, 1:40 PM
    Encountered this problem while using keySelector
    Copy code
    Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple2' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.java.functions.KeySelector' interface. Otherwise the type has to be specified explicitly using type information.
    In general is it a bad idea to use Tuple2 + Lambda in keySelector at all?
    Copy code
    .keyBy(u -> new Tuple2<>(u.getUserId(), u.getProductId()))
  • r

    RM

    10/21/2022, 3:27 PM
    Hi all! We have a job graph something to the order of /---->C1---(hash)--->C2---(hash)-->C3 A--(rebalance)-->B---(hash)---| \---->D1---(hash)--->D2---(hash)--->D3 \--->E1---(hash)--->E2---(hash)----->E3 The moment I increased parallelism, the Flink logs complained the following which was easily addressed by bumping
    taskmanager.memory.network.max
    Copy code
    java.io.IOException: Insufficient number of network buffers: required 2, but only 0 available. The total number of network buffers is currently set to 65536 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
    Curious to know if there is an easier way to pre-determine the minimum network buffers needed as we scale out the job. Tried following this link https://stackoverflow.com/questions/49283934/flink-ioexception-insufficient-number-of-network-buffers but the math doesn't add up.
    #slots-per-TM^2 * #TMs * 4 * #no_of_shuffles
    ==> ( 8 * 8 ) * 10 * 4 * 10 ?
    d
    • 2
    • 2
  • a

    Abel Lamjiri

    10/21/2022, 10:23 PM
    Hi folks! Do I need any special configs for FlinkDeployment to signal Cluster Autoscaler to add nodes? When I run a Flink job with 3 task managers, 1 is scheduled and runs, but the other 2 stay unschedulable for a while:
    Copy code
    status:
      conditions:
      - lastProbeTime: null
        lastTransitionTime: "2022-10-21T22:07:46Z"
        message: '0/3 nodes are available: 1 node(s) had volume node affinity conflict,
          2 Insufficient cpu, 2 Insufficient memory.'
        reason: Unschedulable
        status: "False"
        type: PodScheduled
      phase: Pending
      qosClass: Guaranteed
    Although CA plugin is there, it does not add new nodes. CA does work fine if I scale up any normal deployment though.
    k
    • 2
    • 8
  • m

    Matt Fysh

    10/21/2022, 11:00 PM
    Hi all, I’m running AWS kinesis analytics trying to write to s3, when I first ran this job I got a S3 access denied error so I added the permissions to my role and then re-ran the job. it appears to complete successfully now, but I don’t see any data written to S3. Wondering what the missing piece here is:
    Copy code
    object MyJob {
        def main(args: Array[String]) {
            val env = StreamExecutionEnvironment.getExecutionEnvironment
            val input = env.fromElements("a", "b", "c")
            val sink: StreamingFileSink[String] = StreamingFileSink
                .forRowFormat(new Path("<s3://myflink-sandbox/tmp/d1>"), new SimpleStringEncoder[String]("UTF-8"))
                .build()
            input.addSink(sink)
            env.execute()
        }
    }
    
    MyJob.main(Array())
    s
    d
    d
    • 4
    • 14
  • t

    Thiruvenkadesh Someswaran

    10/22/2022, 6:21 PM
    Hi I am engineer(devops) tasked with setting up an HA flink cluster in our AWS EKS cluster. I am not clear exactly how to do this and ideally test killing job managers. Has anybody seen an examples of this ? (i tried googling but nothing obvious showed up)
    m
    s
    +2
    • 5
    • 9
  • k

    Krish Narukulla

    10/22/2022, 6:59 PM
    How to query
    array
    datatype using Flink SQL? I want to query
    people
    from below protobuf.
    Copy code
    message PersonRecord {
      string name = 1;
      int32 age = 2;
    
      enum PhoneType {
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
      }
    
      message PhoneNumber {
        string number = 1;
        PhoneType type = 2;
      }
    
      repeated PhoneNumber phones = 4;
    }
    
    // Our address book file is just one of these.
    message AddressBookRecord {
      repeated PersonRecord people = 1;
    }
    • 1
    • 1
  • a

    Akashkiran Shivakumar

    10/22/2022, 11:12 PM
    Hello, I am trying to run an example program word_count.py using aws emr's apache flink but it is failing for me. The same example I am trying to run using java, which executes without a flaw. Can someone help me with this ? https://stackoverflow.com/questions/74162504/not-able-to-run-simple-pyflink-word-count-py-on-aws-emr
    x
    • 2
    • 2
  • o

    Owen Lee

    10/24/2022, 6:02 AM
    Hello, When reading from multi-partitioned Kafka with WatermarksWithIdleness, does IdlenessTimer start immediately before all partitions are ready(=begin consuming records)? If so, if initializing/connecting to some Kafka partitions take more time than specified idleness time, output will be marked as idle and some records will be lost.
    ✅ 1
    • 1
    • 2
  • d

    ding bei

    10/24/2022, 6:07 AM
    hey guys, i am using StreamingFileSink to write to s3, it got an AccessDeniedException initiate MultiPartUpload on xxx. then i got into the eks pod ,try aws cli to multipart upload a file ,it works, then i write a service with aws-java-sdk (same version inside flink-s3-fs-hadoop),it works too, what might bethe problem here?
    a
    v
    • 3
    • 9
  • s

    salvalcantara

    10/24/2022, 9:40 AM
    Can someone provide some feedback on https://issues.apache.org/jira/browse/FLINK-29480, either on the ticket itself or just here in reply to my comment (or alternatively here: https://lists.apache.org/thread/bd8g562m9709y9jxjgg10qk7dc3ktlbf)? The main issue being discussed is that of adding support for skipping malformed messages when writing using the kafka connector. In the same way that one can simply return
    null
    on the deserializer, it makes sense to consider the same outcome/strategy on the serializer. However, as explained in the ticket, returning
    null
    on the serializer makes the job crash at runtime (instead of simply skipping the message).
    m
    • 2
    • 5
  • h

    haim ari

    10/24/2022, 11:58 AM
    Hello, Is there an option to store the flink checkpoint files in a k8s Volume and not on the k8s node (host path) ? Currently I have a flink session cluster and when a flink app crashes and tries to restore from checkpoints, it fails if the pod is scheduled on other node than the previous one
    g
    t
    h
    • 4
    • 8
  • s

    Steven Zhang

    10/24/2022, 6:38 PM
    Hi, I'm having trouble getting s3 working with the flink operator. I'm trying to have a SessionJob specify a Jar that i've built and uploaded to s3. I've been following this documentation https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview This is my dockerfile for my operator image (i downloaded the s3-presto jar to my local directory)
    Copy code
    FROM apache/flink-kubernetes-operator:1.2.0
    
    ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
    COPY flink-s3-fs-presto-1.15.2.jar $FLINK_PLUGINS_DIR/flink-s3-fs-presto/
    And I deployed my custom image into my k8s cluster. When I exec into the operator pod, I see the plugin present
    Copy code
    flink@flink-kubernetes-operator-855bb64d96-t6ztb:~/plugins$ pwd
    /opt/flink/plugins
    
    flink@flink-kubernetes-operator-855bb64d96-t6ztb:~/plugins$ ls
    flink-metrics-datadog  flink-metrics-graphite  flink-metrics-influxdb  flink-metrics-jmx  flink-metrics-prometheus  flink-metrics-slf4j  flink-metrics-statsd  flink-s3-fs-presto
    but my sessionJob still fails with
    Copy code
    org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'. The scheme is directly supported by Flink through the following plugins: flink-s3-fs-hadoop, flink-s3-fs-presto. Please ensure that each plugin resides within its own subfolder within the plugins directory.
    t
    p
    • 3
    • 2
  • z

    Zgeorge2

    10/24/2022, 7:00 PM
    Hello (and thanks for your help in advance). (Newbie to Flink) ... Is there a way to do this: Assume these classes ...
    Copy code
    class Foo {
        String attr; // will be used for the first keyBy
        String global; // several foo instances with the same attr value will have a common global value
    }
    class ReducedFoo {
            Set<Foo> fooList;
            String key; // all the fooList Foo instances have the same key
            String global; // several foo instances with the same attr value will have a common global value
    }
    class GlobalFoo {
            Set<Foo> fooList;
            String global; // several foo instances with the same attr value will have a common global value
    }
    Assume the following pipeline:
    Copy code
    Ex.  SRC<Foo> --> Some Oper1<Foo> --> keyBy(Foo::attr) --> reduce(ReducedFoo with Key = Foo::attr)
    My understanding of the above is that at the end of the above pipeline, depending on the number of unique
    Foo::attr
    values in the actual stream that's processed, I would get as many
    ReducedFoo
    instances (that carry the same
    Foo::attr
    value) e.g., assume that the stream has the following objects of class
    Foo
    generated at the
    SRC<Foo>
    starting point:
    Copy code
    Foo1(attr = AA; global = g1)
    Foo2(attr = BB; global = g1)
    Foo3(attr = CC; global = g1)
    Foo4(attr = AA; global = g1)
    Foo5(attr = BB; global = g1)
    Notice that all the
    Foo
    instances have the same value for
    global
    . Hence, assume also that the
    reduce
    operation at the end of the pipeline creates the following:
    Copy code
    ReducedFoo( List.of(Foo1, Foo4); key = AA; global = g1 )
    ReducedFoo( List.of(Foo2, Foo5); key = BB; global = g1 )
    ReducedFoo( List.of(Foo3)      ; key = CC; global = g1 )
    What I'd like to do here is "join" the "keyedStreams" that produce the three
    ReducedFoo
    instances into a single
    GlobalFoo
    . e.g., in the above pipeline - only one
    GlobalFoo
    would be produced (not 3, one for each unique value of
    attr
    ):
    Copy code
    GlobalFoo( List.of(Foo1, Foo2, Foo3, Foo4, Foo5); global = g1 )
    In other words - is there a way to take the pipeline and do a "keyBy" on the "global" attribute in
    ReducedFoo
    - so that the final result from the pipeline is a SINGLE
    GlobalFoo
    for each value of
    global
    . Is this possible - or am I completely misunderstanding what can be done with stream processing? (code snippets available are in the thread -> )
    c
    • 2
    • 2
  • e

    Erwin Cabral

    10/24/2022, 8:55 PM
    Hi. I am currently using Apache Flink K8s Operator to deploy a streaming pipeline written in Apache Beam using Kafka as input. I just wanted to know the best practice for doing blue-green deployment when updating the pipeline jar without losing any records (duplicates are fine) . Thanks in advance.
    m
    • 2
    • 4
  • m

    Matt Fysh

    10/25/2022, 12:51 AM
    I can’t seem to get python
    apache-flink
    to install on my machine, as it wants to build pandas and numpy from source which takes a long time and then eventually throws build errors (numpy). Does anyone know an existing docker env I can use to run pyflink?
    x
    • 2
    • 2
  • m

    Matt Fysh

    10/25/2022, 4:13 AM
    this may be a kinesis analytics issue, but I can’t seem to get my UDF to work (code and error in thread) I needed to return a pyflink.common.Row where I returned a dict
    x
    • 2
    • 4
  • p

    Prathit Malik

    10/25/2022, 5:24 AM
    Hi everyone, I have created a pipeline with lineage as below :-
    Copy code
    A join B---\
                \
                 \
    A join C------>---->E
                 /
                /
    A join D-->/
    A is common source being joined with source B, C, D while E is a common sink (i am using union all to combine above 3 streams into 1 and writing in sink) • A is initialised using Datastream API while B,C,D using table API • For join operation, A is being converted to a view by datastream to table API conversion. But when running the job, 2 separate DAGs are created, 1 with operators created with datstream API while other created using table API , is there a way to chain them in a single DAG ? Thanks in advance !
    m
    • 2
    • 7
  • m

    Matt Fysh

    10/25/2022, 6:43 AM
    Are there any workarounds to
    The precision specified in DataTypes.TIMESTAMP(p) must be 3 currently
  • i

    Ivan M

    10/25/2022, 10:02 AM
    Hey guys, I have a question related to batch jobs in Flink. Let me describe the case firstly. I need to send to some kafka topic the messages built from the result of some query (join of 2 filtered and grouped by streams). As I understand I need batch mode in that case, coz in streaming mode it sends to sink the changelog but not the "materialized" table itself. Now, I need to figure out how to run the batch job periodically. Is there any scheduler logic in Flink or I should implement it by myself (by k8s cron jobs for example)?
    l
    k
    • 3
    • 4
1...262728...98Latest