https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Abhinav sharma

    08/09/2022, 1:50 PM
    Can someone help me with flink and Java for data streaming from Kafka?
    c
    • 2
    • 21
  • a

    Adesh Dsilva

    08/09/2022, 3:35 PM
    Hi, what is the difference between doing everything in one operator vs multiple operators? For example:
    Copy code
    datastream.map(someFunctionThatDoesXandY).sinkTo(sink)
    vs
    Copy code
    datastream.map(someFunctionThatDoesX).map(someFunctionThatDoesY).sinkTo(sink)
    I guess Flink will automatically chain the second sequence and both should give similar performance? Is there any usecase to split your code into multiple operators?
    🙌 1
    l
    • 2
    • 3
  • t

    Ty Brooks

    08/09/2022, 3:59 PM
    Hi everyone. Has anyone else run into issues with seeing intermittent socket timeout exceptions from the schema registry client when trying to pull avro records from Kafka? When it fails, all the calls from all operators seem to fail together. FWIW I’m running a cluster of 3 registries on AWS m5.large ec2 instances, and all the routing infrastructure should be scalable (HAProxy behind an AWS ALB). Can’t really see any reason why a burst of a few dozen calls would be problematic given my infrastructure, but my flink app does seem to see less failures at a lower level of paralellism.
    s
    • 2
    • 6
  • s

    Stefan

    08/09/2022, 6:32 PM
    Hi everyone. I am new to Flink so please bear with me. I am using the SQL cli to insert into a Kafka table ( connector:upsert-kafka) after a multi table joins. The outcome on the Kafka topic uses the retract stream. Is there a way to configure the cli to use the upsert stream table to stream conversion? Or the only option is to rely on code to achieve that? Thanks
  • j

    Justin

    08/09/2022, 8:34 PM
    The Flink documentation for working with data serialized in Avro format states: "Currently, the Avro schema is derived from table schema." My requirement involves working with complex, deeply nested Avro schemas that are registered in the Confluent Schema Registry (CSR). My understanding is that even though the CSR contains the schema for the data in the Kafka topic, I still need to declare the schema in the Flink DDL using Flink's data types. Ksqldb didn't require this. Does anyone know of a utility that I can use to produce a Flink schema given an Avro schema as input? My initial thought is that something like this should exist or I am overcomplicating things. Thank you!
    s
    • 2
    • 4
  • j

    Jirawech Siwawut

    08/09/2022, 9:03 PM
    Hi everyone. Does anyone here come across similar issue here where Flink cannot create HiveMetastoreClient. I am trying to access using kerberos
    Copy code
    Caused by: MetaException(message:Could not connect to meta store using any of the URIs provided. Most recent failure: org.apache.thrift.transport.TTransportException: GSS initiate failed
  • d

    Derek Liu

    08/10/2022, 7:21 AM
    Hi, everyone. Does anyone can help describe how FlinkHiveSQLParserImpl.java is generated by JavaCC, what source files are involved in ?
    c
    • 2
    • 1
  • j

    Jaya Ananthram

    08/10/2022, 9:24 AM
    Hello, I have a few questions about the flink Kubernetes operators. 1. Are there any limitations on the number of namespaces that the operator can watch maximum also is there any limitation on the max number of jobs in a namespace/across namespace? I see some configs are available like thread count (based on CPU core) to fine-tune, although I would like to know if there are any hard limitations or any safest count to consider. I am doing some Poc so would like to know is there any hard/soft limit on this. 2. Does the Kubernetes operator support graphite metrics? In the docs, I see some examples of Slf4j and Prometheus. So I am wondering whether graphite is possible. Here the docs say, that
    "The Flink Kubernetes Operator (Operator) extends the Flink Metric System"
    so I assume it should support graphite. Am I right? 3. In the logs, I am seeing some exceptions while starting the job (that runs on top K8's operator) -
    Copy code
    Error while trying to split key and value in configuration file /opt/flink/conf/flink-conf.yaml:35: "$internal.application.program-args: "
    Is it something need to worry? or Just the dynamic configuration is skipped nothing to worry about the job behavior?
    g
    • 2
    • 15
  • a

    Abhinav sharma

    08/10/2022, 2:22 PM
    If we create a keyed stream over data stream based on a specific key, is the keyed stream data grouped and streamed based on the key?
    ✅ 1
    c
    • 2
    • 5
  • a

    Abhinav sharma

    08/10/2022, 2:36 PM
    Also can someone please explain what does 6> or 7> or 5> in the flink Java output mean?
    ✅ 1
    c
    k
    • 3
    • 6
  • a

    Akhlaq Malik

    08/10/2022, 3:39 PM
    Hi there, I've issue joining two streams (KafkaSource) based on EventTime. I do use the
    .join(otherStream).where(seletectedMutalKey).equalTo(selectedMutalKey).window(TumblingEventTimeWindows.ofSeconds(Time.seconds(20)))
    this is not emitting any data. when switching for e.g. to
    TumblingProcessingTimeWindows
    it emits data. I've also configured the
    .withTimestampAssigner
    to use the event time of the msg (long value in ms). Flink version 1.15.x
  • i

    Ivan M

    08/10/2022, 4:23 PM
    Hi all! I'm trying to create partitioned table via ververica and receive the following error:
    Cause: Partitioned tables are not supported yet.
    Flink version 1.15.0 Does it mean Flink doesn't support partitioned tables? If yes why it's in the documentation?
  • s

    sap1ens

    08/10/2022, 4:46 PM
    Hi folks! In Flink SQL, is there a function or other way to obtain the
    RowKind
    of the underlying Row/RowData object?
    s
    • 2
    • 1
  • j

    Jin Yi

    08/10/2022, 9:59 PM
    is there a way to guarantee better savepoint and recovery behavior when using flink sql or table api? outside of obvious underlying operator/function changes with the query itself, how stable are the flink-controlled operator mappings when optimization (if any) come into play when using flink sql/table api? back when we started with flink a few years ago, it was advised to use datastream api since the underlying datastream operator mapping wouldn't be as consistent as we expected. is this assumption still largely true?
    👀 2
    c
    d
    • 3
    • 4
  • j

    Jirawech Siwawut

    08/11/2022, 10:21 AM
    Hi. Does anyone here have seen this error before
    Copy code
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not find any factory for identifier 'hive' that implements 'org.apache.flink.table.planner.delegation.ParserFactory' in the classpath.
    Basically, I am trying to use HiveSink and get this. I am able to run on IntelliJ fine, but cannot make it work with Docker image. I only add
    flink-sql-connector-hive-2.3.6_2.12
    to build shaded jar.
    ✅ 2
    c
    • 2
    • 17
  • k

    Krishna Chaithanya M A

    08/11/2022, 12:28 PM
    Hi, We have a job where, we have to setup S3 for HA. I am new to this and trying to follow the template but facing an error Any guidance on setting up S3 will be helpful
    g
    d
    • 3
    • 4
  • a

    Adesh Dsilva

    08/11/2022, 3:19 PM
    Hi
    Copy code
    You can test whether your class adheres to the POJO requirements via org.apache.flink.types.PojoTestUtils#assertSerializedAsPojo() from the flink-test-utils.
    Where do I find this? It doesnt seem to be there in flink-test-utils?
    c
    k
    • 3
    • 5
  • a

    Adrian Chang

    08/11/2022, 6:14 PM
    Hello I am trying to use Table Aggregate Functions https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table/udfs/python_udfs/#table-aggregate-functions and call it from SQL, but I haven't found any example. Could you please provide me with one example of how to call the function in a
    SELECT
    statement ?
    x
    m
    r
    • 4
    • 5
  • a

    Aeden Jameson

    08/11/2022, 7:42 PM
    Does the Flink project have an aspirational release schedule anywhere? I'm specifically wondering when 1.16 will be released.
    c
    • 2
    • 2
  • i

    iLem0n

    08/11/2022, 8:48 PM
    Hello together, im currently try to unit tests for flink and was wondering whats the best way mocking up the KafkaSource/Sink implemented with the new DataSourceAPI. The old datasource api could be mocked up using simple SourceFunctions. But implementing the whole new API seems a little bit heavy?
  • s

    Samin Ahbab

    08/12/2022, 8:21 AM
    Hey Guys just looking for advice here. I have a kafka source giving a <date, string1,string2> and I am looking for a pipeline where every hour, I get the n most occurring
    string1
    , and for those n only, find the m most occurring <string1,string2> tuples, but am not sure how this would be represented with Flink functions. I was thinking that it would be done through doing multiple
    keyBy
    and
    aggregation
    . This feels like a simple case, but as I am new to flink, I am not really sure how to do this. Has anyone have any idea?
    a
    • 2
    • 8
  • a

    Aviv Dozorets

    08/14/2022, 4:23 PM
    trying it one more time - have anyone tried enabling rack-awareness of kafka consumer ? Enabled
    broker.rack
    on kafka side,
    client.rack
    during client init, but still getting:
    Copy code
    "[Consumer clientId=consumer-Consumer-perseus2-7, groupId=Consumer-perseus2] Discovered group coordinator kafka-us-east-1-stage-1.s1:9093 (id: 2147483646 rack: null)"
    s
    • 2
    • 7
  • t

    Tom Alkalak

    08/15/2022, 10:48 AM
    Hey everyone, I have a question regarding excluding metrics that are sent to one of the reporters we have defined. I am working with Flink
    1.14.3
    and both
    flink-metrics-datadog
    and
    flink-metrics-dropwizard
    . I am trying to have control over which metrics are being sent to DataDog. I have tried modified the JobManagers config to look like this:
    Copy code
    metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
    metrics.reporter.dghttp.excludes: <metricName>
    metrics.reporter.dghttp.useSubstringMatching: true
    but unfortunately I was unable to filter out the metrics I wanted. does anyone have a clue on if its even possible and how to achieve it?
    c
    • 2
    • 2
  • m

    Mustafa Akur

    08/15/2022, 10:54 AM
    Hello all, I have written a correlated sub-query in flink sql, However result is not as I expect it to be. I have asked a question on stackoverflow https://stackoverflow.com/questions/73359877/correlated-subquery-in-flink, has anyone experienced this or is this really the expected behavior? Kind regards.
  • p

    Prasanth Kothuri

    08/15/2022, 1:39 PM
    Hello, I am writing unit tests for flink jobs using scalatest and flink testharness, I think I am almost there, where accessing the output from the testharness I am unable to convert it back to the type (case class) I need before assert/comparision
    Copy code
    testHarness.getOutput.poll().asInstanceOf[myCaseClass].operatorDecision should contain ("threat")
    gives
    Copy code
    org.apache.flink.streaming.runtime.streamrecord.StreamRecord cannot be cast to stats.myCaseClass
    any clues on how to convert the output to what i want ? thanks a ton
    ✅ 1
    c
    • 2
    • 2
  • r

    Ron Cohen

    08/15/2022, 2:46 PM
    Hi! I'm trying to use a
    HashMap<String, String>
    and
    java.util.ImmutableCollections
    map, but getting
    it cannot be used as a POJO type and must be processed as GenericType
    I've read https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/fault-tolerance/serialization/types_serialization/ and tried to add type hints, but to no avail. Is it correctly understood that Flink does not support native serialization of any kind of map?
    c
    • 2
    • 11
  • k

    Krish Narukulla

    08/15/2022, 3:46 PM
    Is there any one good with table API? I have tried below code which is resulting error: `org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot find table '`default_catalog`.
    default_database
    Copy code
    val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tEnv = StreamTableEnvironment.create(env)
        tEnv.executeSql(sourceSql)
        tEnv.executeSql(sinkSql)
        tEnv.sqlQuery("insert into <sink table> select .... from <source table>")
        env.execute("FlinkApp")
    s
    j
    • 3
    • 18
  • s

    Stefan

    08/15/2022, 4:51 PM
    Hello, I am trying to perform a join over multiple tables created with kafka-upsert and all of them having a primary key. The join works but the output behaviour I need help to understand. Here is the code (with the query):
    Copy code
    val salesOrderFactory =
        s"""
           |SELECT VBAK.MANDT || VBAK.VBELN                    AS ID,
           |       'Sales Order'                               AS Type,
           |       TO_TIMESTAMP(VBAK.ERDAT || ' ' ||  VBAK.ERZET, 'yyyy-MM-dd HH:mm:ss') AS CreationTime,
           |       CAST (NULL AS TIMESTAMP)                    AS DeletionTime,
           |       VBAK.ERNAM                                  AS Creator,
           |       VBAK.OBJNR                                  AS ObjectNumber,
           |       VBAK.AUART                                  AS SalesDocumentTypeID,
           |       TVAKT.BEZEI                                 AS SalesDocumentType,
           |       VBAK_SQ.MANDT || VBAK_SQ.VBELN               AS QuotationID,
           |       T001.BUTXT                                  AS CompanyName,
           |       VBUK.GBSTK                                  AS Status,
           |       VBUK.BESTK                                  AS OrderConfirmationStatus,
           |       VBKD.VSART                                  AS ShippingType
           |FROM   VBAK AS VBAK
           |       LEFT JOIN VBUK AS VBUK
           |              ON VBAK.MANDT = VBUK.MANDT
           |                 AND VBAK.VBELN = VBUK.VBELN
           |       LEFT JOIN VBKD AS VBKD
           |              ON VBAK.MANDT = VBKD.MANDT
           |                 AND VBAK.VBELN = VBKD.VBELN
           |                 AND VBKD.POSNR = '000000'
           |       LEFT JOIN VBAK AS VBAK_SQ
           |              ON VBAK.MANDT = VBAK_SQ.MANDT
           |                 AND VBAK.VGBEL = VBAK_SQ.VBELN
           |                 AND VBAK_SQ.VBTYP = 'B'
           |       LEFT JOIN T001
           |              ON VBAK.MANDT = T001.MANDT
           |                 AND VBAK.BUKRS_VF = T001.BUKRS
           |       LEFT JOIN TVAKT AS TVAKT
           |              ON VBAK.MANDT = TVAKT.MANDT
           |                 AND VBAK.AUART = TVAKT.AUART
           |                 AND TVAKT.SPRAS = 'E'
           |WHERE  VBAK.VBTYP = 'C';
           |""".stripMargin
    
      val factoryQueryTable = tenv.sqlQuery(salesOrderFactory)
      val salesOrderTable   = tenv.from("SALESORDER")
    
      val stream = tenv.toChangelogStream(
        factoryQueryTable,
        Schema.newBuilder().fromResolvedSchema(salesOrderTable.getResolvedSchema).build(),
        ChangelogMode.newBuilder.addContainedKind(RowKind.INSERT).addContainedKind(RowKind.UPDATE_AFTER).addContainedKind(
          RowKind.DELETE,
        ).build,
      )
    
      stream.print()
    When i update a record on TVAKT I get the following output:
    Copy code
    >-D[1, Sales Order, 2021-10-13T12:16:24, null, USR-abc, 0, ZSV, BEZEI2, null, Company, GBSTK3, BESTK2, VSART3]
    >+I[1, Sales Order, 2021-10-13T12:16:24, null, USR-abc,  0, V, null, null, Company , GBSTK3, BESTK2, VSART3]
    >-D[1, Sales Order, 2021-10-13T12:16:24, null, USR-abc,  0, V, null, null, Company, GBSTK3, BESTK2, VSART3]
    >+I[1, Sales Order, 2021-10-13T12:16:24, null, USR-abc,  0, V, BEZEI2_updated, null, Company, GBSTK3, BESTK2, VSART3]
    I would expect to have only one +I (insert) or actually an +U(pdate after), so the first 3 entries I don't understand why they are created, ultimately I updated a record in TVAKT and there was no delete? In Kafka streams, I will get one record in the output with the updated value. How can I achieve the same with Flink? Thanks
    d
    s
    • 3
    • 32
  • s

    Saurabh Khare

    08/15/2022, 7:38 PM
    Hi, I am trying to expose Flink metrics on Wavefront dashboard (VMware Tanzu Observability) using statsD reporter and Telegraf server agent. I do see Flink default metrics on Flink UI and noticed that metrics that are emitted from source or sink operator are prefixed with test
    Source_
    and
    Sink_
    respectively. Though we see the metrics for source and sink operator in Flink UI it does not reflect in wavefront. I do see all the metrics correctly for the intermediate operator in wavefront. Can someone help me with some insights. Let me know if you need more details from my end. Thanks
  • i

    Ildar Almakaev

    08/15/2022, 8:10 PM
    Hello, community. I’ve got a question about Temporal Table Function and Temporal Join. I’m looking at this example and I was wondering if you could clarify me the following points: 1. Is it true that a left side table
    Orders
    will be stateless? 2. What about the function
    rates
    ? Will it delete old records from
    RateHistory
    table since its old records are not relevant for a new joins with
    Orders
    for a given PK? 3. How Flink manages the state under the hood? Overall, I’ve the following use-case. There is transactions data (hot Kafka topic, mostly append-only) which should be enriched with users data (slow-changing table). In KafkaStreams/ksqlDB it would be Stream-Table join. So I’d like to implement similar logic using Flink. What do you think about using Temporal Table Function for
    users
    table with a timestamp column
    PROCTIME()
    and then join with transactions table?
    d
    • 2
    • 13
1...121314...98Latest