https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Angelo Kastroulis

    10/11/2022, 1:33 AM
    Does anyone know if the intent of Table Store is to be SQL-CLI only? I can’t seem to be able to get the table environment in code to recognize it. It works perfectly from the CLI.
    • 1
    • 1
  • c

    czchen

    10/11/2022, 2:36 AM
    Does future created by
    SourceReader.isAvailable
    need to be thread-safe? We use static variable in
    SourceReader.isAvailable
    to pass available data for
    pullNext
    . From https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#isAvailable--, there will be more than one future. So we wonder if future returned from
    SourceReader.isAvailable
    need to be thread-safe.
    c
    • 2
    • 1
  • s

    Sucheth Shivakumar

    10/11/2022, 5:31 AM
    Hi All, Flink avro deserialization and serialization into GenericRecord, getting below exception : Not sure why is even Kryo in the picture. can someone point out what wrong im doing.
    Copy code
    com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
    Serialization trace:
    reserved (org.apache.avro.Schema$Field)
    fieldMap (org.apache.avro.Schema$RecordSchema)
    schema (org.apache.avro.generic.GenericData$Record)
    	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:321)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:80)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:77)
    	at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
    k
    s
    • 3
    • 7
  • a

    Aqib Mehmood

    10/11/2022, 10:32 AM
    Hi, I'm trying to use a WINDOW FUNCTION on the Flink table API. Here is the query
    Copy code
    "   SELECT \n" +
            "   sku,\n" +
            "   name,\n" +
            "   price,\n" +
            "   updatedAt,\n" +
            "   LAG(price) OVER (PARTITION BY sku ORDER BY updatedAt ASC) as last_price); "
    But I keep getting this error
    Copy code
    Caused by: org.apache.flink.table.api.TableException: OVER windows' ordering in stream mode must be defined on a time attribute.
    But the updatedAt column is already in TIMESTAMP_LTZ data type How can I fix this? TIA
    d
    • 2
    • 6
  • t

    Troy Coombs

    10/11/2022, 11:54 AM
    Hi Everyone! I’m new to Flink so I’m looking for a little guidance. We are building a pipeline by sourcing data from our Postgres DB into Kafka using the Debezium source connector (also using Avro for schema). What I would like to do is to connect OPenSearch to a topic to receive that data. I have 2 requirements though, 1. I want to be able to route the messages to certain indexes in OpenSearch based on the index retention policy. This is tied to the user’s retention policy plan, 3 day, 7 day, etc. This will require that we get the retention days for each message that is coming through the pipeline. Perhaps a call to our api and then cache it in Flink. 🤷‍♂️ 2. Instead of appending the messages to the index, I would like to use an upsert strategy. So if the message already exists in OpenSearch then update it, otherwise add. I’ve read about the dataset, datastream and table api connectors but I’m not clear on which approach is best for this scenario, nor about any potential pitfalls. Any guidance on this would be greatly appreciated! Thanks in advance!
    s
    • 2
    • 3
  • a

    Aqib Mehmood

    10/11/2022, 12:47 PM
    Hi, I'm using a LAG function in flink sql on the price column of datatype decimal(15,4) as below,
    Copy code
    "   SELECT \n" +
            "   sku,\n" +
            "   name,\n" +
            "   price,\n" +
            "   updatedAt,\n" +
            "   LAG(price) AS last_price FROM orders"
    I'm getting this error,
    Copy code
    Caused by: org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: LAG(DECIMAL(15, 4)) 
    If you think this function should be supported, you can create an issue and start a discussion for it.
    	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateCallExpression$10(ExprCodeGenerator.scala:837) ~[?:?]
    	at scala.Option.getOrElse(Option.scala:121) ~[flink-scala_2.12-1.15.0.jar:1.15.0]
    	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.$anonfun$generateCallExpression$8(ExprCodeGenerator.scala:834) ~[?:?]
    	at scala.Option.getOrElse(Option.scala:121) ~[flink-scala_2.12-1.15.0.jar:1.15.0]
    	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:838) ~[?:?]
    	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:487) ~[?:?]
    	at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58) ~[?:?]
    	at org.apache.calcite.rex.RexCall.accept(RexCall.java:174) ~[?:?]
    Is there something I'm doing wrong?
    m
    • 2
    • 1
  • k

    Kyle Ahn

    10/11/2022, 1:27 PM
    [Question] Is it possible to make a synchronous HTTP request using
    java.http.net.HttpClient
    in a WindowProcessFunction?
    c
    • 2
    • 4
  • t

    Tiansu Yu

    10/11/2022, 2:28 PM
    I have a small semi-static table (updated daily), which I want to join to my event stream to add one more field. Is there any recommended ways to do this?
    • 1
    • 2
  • j

    Jeremy Ber

    10/11/2022, 3:23 PM
    Hi I'm a bit confused about flink temporal table joins with processing time. The documentation suggests they are supported but this https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-19830 suggests it is not? I've read through some mailing lists and read the code but it's confusing to me in what scenarios it is supported vs not
    d
    m
    • 3
    • 28
  • p

    Prasanth Kothuri

    10/11/2022, 6:24 PM
    The code I inherited has pretty wide windows (e.g year and month) , what happens to the state if the application crashes half way through these windows ?
    d
    • 2
    • 3
  • s

    Steven Zhang

    10/11/2022, 7:47 PM
    Hi there, new to working with the flink operator. How strict is it that the
    flink
    service account needs to be in the
    default
    namespace? I deployed the helm chart into the
    flink-cloud
    namespace and i see the service account present in that namespace. I would assume that the operator would check the same namespace that it's deployed in for the service account, not the
    default
    namespace always
    g
    • 2
    • 3
  • l

    Lydian Lee

    10/12/2022, 12:33 AM
    Hi, What’s the suggestion for configure S3 as checkpoint with
    WebIdentityTokenCredentialsProvider
    ? 1. I’ve tested with presto plugin, but it failed to init at all: https://apache-flink.slack.com/archives/C03G7LJTS2G/p1665009572470739 2. Therefore I am testing with Hadoop, it does seem to authenticate successfully, but it always write 0 length data. The only suggestion I got is to use presto, but because of 1, I am unable to use presto. Now I am in a deadlock, and I have no idea how to fix it. Would someone please help me out of this situation? Thanks!
    t
    • 2
    • 6
  • t

    Tim Bauer

    10/12/2022, 8:55 AM
    Hi there, does anyone here have a tip on how to parse Scala
    Option
    types as nullable columns in the Table API? I'm having a
    Datastream[Row]
    with one column of
    Option[Instant]
    . I have provided the return type information explicitly with
    Types.OPTION(Types.INSTANT)
    . When I print the resulting table schema, I get
    RAW('scala.Option', '...')
    and subsequent operations fail. I found this SO Question and have contemplated simply using empty strings as a workaround and re-casting everything on Table API side but I wonder if people here who had similar problems might have found better ways? Thank you 🙏
    • 1
    • 1
  • j

    Jeesmon Jacob

    10/12/2022, 11:49 AM
    Just FYI that we are hitting CVE-2022-42003 in our container scanning of flink kubernetes operator (v1.1) for
    jackson-databind
    g
    c
    • 3
    • 10
  • d

    Duc Anh Khu

    10/12/2022, 12:23 PM
    hi, we have a Flink application (AWS KDA 1.13) that consumes from both Kafka and Kinesis. We ran into an issue where depends on the number of parallelism being set for application, a small number of tasks sometimes get stuck in
    Initializing
    state indefinitely which in turn, causes the application to stall (stop consuming from data sources). Has anyone come across this issue before? Atm, the application runs (with backpressure) if we have low number of parallelism per KPU, but if we scale it up by increasing parallelism or increasing parallelism per KPU, the issue occurs. AWS KDA aside, what can cause a task to stay in
    Initalizing
    state and how to debug it? Thanks
    h
    • 2
    • 1
  • h

    Hannah Hagen

    10/12/2022, 11:41 PM
    Hello! I'm running into an error running a pyflink tutorial job and wondering if anyone here might be able to help find my folly. I'm running Flink as a standalone cluster on a single container on a remote machine. When I submit one of the pyflink tutorial jobs:
    ./bin/flink run --python examples/python/datastream/word_count.py
    it runs successfully the first time, but then fails after that with the error:
    Failed to create stage bundle factory! INFO:root:Initializing Python harness:
    I'm not sure how to interpret this error and what might be the problem. Anyone have suggestions? It's also strange to me that the job succeeds the first time I submit it, but fails only on subsequent runs. 🤔 The only info I was able to find online is this post which suggests it is caused by a bug which was fixed in 1.15.0 (I'm on 1.15.2 so it should be fixed). I'm using Flink 1.15.2, python 3.8 and jdk 11. Thanks in advance for any help.
    x
    • 2
    • 14
  • k

    Krish Narukulla

    10/13/2022, 2:50 AM
    What is the recommended approach to define udf for flink. RegisterFunction is deprecated. If you dont use the registerFunction, it throws below error Code:
    Copy code
    public static void registerScalaUDF(String classPath, String funcName, TableEnvironment tableEnv,
                      ClassLoader classLoader){
      try{
        ScalarFunction udfFunc = Class.forName(classPath, false, classLoader)
            .asSubclass(ScalarFunction.class).newInstance();
        tableEnv.registerFunction(funcName, udfFunc);
        <http://logger.info|logger.info>("register scala function:{} success.", funcName);
      }catch (Exception e){
        logger.error("", e);
        throw new RuntimeException("register UDF exception:" + e.getMessage());
      }
    }
    
    Exception:
    Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match found for function signature dim_lookup(<CHARACTER>, <CHARACTER>, <CHARACTER>)
    x
    • 2
    • 1
  • c

    Carl Choi

    10/13/2022, 4:11 AM
    hello! I got an issue with statebackend. I did
    Copy code
    state.backend: rocksdb
    state.backend.incremental: true
    state.checkpoints.dir: hdfs:///flink-checkpoints # location to store checkpoints
    in my jobmanager and taskmanager. After that my jobs which had no issue before constantly restart with error NullPointerException, but not specific error message. I guess below the message might be related with my problem.
    Copy code
    2022-10-13 12:53:27,806 ERROR org.apache.beam.runners.fnexecution.control.FnApiControlClient [] - FnApiControlClient closed, clearing outstanding requests {1=java.util.concurrent.CompletableFuture@6b1a3cf9[Not completed, 1 dependents]}
    2022-10-13 12:53:27,814 WARN  org.apache.beam.sdk.fn.data.BeamFnDataGrpcMultiplexer        [] - Hanged up for unknown endpoint.
    2022-10-13 12:53:27,977 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Metrics [] - Metrics scheduler closed
    2022-10-13 12:53:27,977 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Metrics [] - Closing reporter org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.JmxReporter
    2022-10-13 12:53:27,977 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.metrics.Metrics [] - Metrics reporters closed
    2022-10-13 12:53:27,978 INFO  org.apache.flink.kafka.shaded.org.apache.kafka.common.utils.AppInfoParser [] - App info kafka.consumer for consumer-labs_lmlt08_enroll_cdc-6940 unregistered
    2022-10-13 12:53:27,978 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Custom Source -> Map -> *anonymous_datastream_source$1*[1] -> LookupJoin[2] -> Calc[3] -> LookupJoin[4] -> Calc[5] -> LookupJoin[6] -> Calc[7] -> LookupJoin[8] -> Calc[9] -> LookupJoin[10] -> Calc[11] -> ConstraintEnforcer[12] (1/1)#100 (ab8c8b80fe52ba2dd38838873d5b76d3) switched from RUNNING to FAILED with failure cause: java.lang.NullPointerException
    more stange thing is that Java is working properly but Python version cause the problem. Is anyone there to share precious knowledge?
    x
    d
    • 3
    • 12
  • d

    dario bonino

    10/13/2022, 10:08 AM
    Hi All, we are encountering some issue on one of our pipelines, which fails with an akka.framesize exceeded exception.
    Copy code
    the rpc invocation size 27551907 exceeds the maximum akka framesize
    While we initially increased the
    akka.framesize
    parameter, we also investigated the causes by following some of the advices found in the Flink mailing list. In particular, we found that some states in one of our operators are taking less space than the minimum required to generate a data file in checkpoints and are therefore in-lined in the root checkpoint metadata file. This behavior probably is at the root of the above error (this hypothesis seems confirmed by the size of the metadata file as well as by direct inspection of its contents). Although one possible solution could be reducing the value of the
    state.storage.fs.memory-threshold
    we are wondering if there are some mitigation actions that we can take at the operator level. Any suggestions?
    ✅ 1
    c
    • 2
    • 2
  • j

    Jin Yi

    10/13/2022, 11:54 AM
    context: i'm trying to do a top-n filter using the datastream api. the general idea is that i'm keeping track of the elements that are in the top n in the broadcast state. inside the
    KeyedBroadcastProcessFunction
    the
    processBroadcastElement
    function receives a tuple that is of the form (T element, boolean keep?) and accordingly keeps or evicts the element from the broadcast state. from there, the
    processElement
    method simply checks to see if the broadcast state contains the element to pass the element through. issue: the interesting thing is that the stream used to populate the broadcast stream needs to have a global count of all elements and a heap of the top n elements as state. the idea for this stream is that it should emit those (element, keep?) tuples whenever elements are added or removed from the heap. i implemented this is a
    RichFlatMapFunction
    and am relying on
    MapState
    (keyed state) to keep track of the counts and the heap. will this work? iiuc, this will not work as the heap is keyed state, so the heap will NOT be global across partitions. can i be weird and explicitly set the parallelism to 1 or effectively do it by using a key function that maps everything to the same key? are these a bad idea? is there a way to make this work?
    m
    d
    k
    • 4
    • 11
  • j

    Jirawech Siwawut

    10/13/2022, 12:40 PM
    Hi. Currently i am facing the issue where checkpoint time is quite high for GroupWindowAggregation and WindowJoin task. Is there any suggestions on this to reduce checkpoint time e.g. use Rocksdb backend? FYI. I run checkpoint every 1 minute.
    s
    • 2
    • 4
  • t

    Tommy Gunnarsson

    10/13/2022, 12:44 PM
    Hey 👋 a question regarding the kubernetes operator. We would like to separate on which instances that resources are running so we have dedicated machines for our flink application. We've tried setting matchLabels as well as nodeSelector, but the flink operator is ignoring it. Is there any way to assign the flink operator to certain nodes?
    j
    m
    • 3
    • 17
  • y

    Yegna Subramanian Jambunath

    10/13/2022, 5:30 PM
    Hi Everyone, I am using Kafka as data source and consuming using Pyflink consumer My data are only value containing list of lists or list of dicts Example: [ [0.33, 0.7, 1, “sample”], [0.33, 0.7, 1, “sample”]] Example: [{ “ID”: “S01", “value” : [0,1] , “version” : “v1.0.0”},{“ID”: “S01”, “value” : [0,1] , “version” : “v2.0.0"} ] I used JSONRow deserializer but cannot get what I want as the list of values can be one or more elements, I tried string serializer and deserializer which works but again mapping it to other data doesn’t work , please guide which deserializer is ideal for the above example data
  • k

    karthik

    10/13/2022, 5:49 PM
    Hi folks, when submitting a job to a session cluster via REST API, I run into this error
    Caused by: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
    old:[p-bbc27e21a2305f48c44c62091549f3b61d7cbcd2-01a55b50631d6f7791a0cf0600392315]
    new:[p-bbc27e21a2305f48c44c62091549f3b61d7cbcd2-14c2626071f3700e7b7328358464cb8d]
    c
    • 2
    • 5
  • j

    Jonathan Hoyt

    10/13/2022, 8:57 PM
    I’m trying to test out the flink-protobuf functionality in 1.16.0 RC1 and can’t seem to get it to find the class generated by protoc and then compiled into a jar using maven.
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.ClassNotFoundException: com.example.SimpleTest
    I’ve put up an example with all of the steps I’ve taken in https://github.com/jonmagic/flink-experiments/pull/1. I used all of the example code from the documentation at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/formats/protobuf/. Anyone successfully used flink-protobuf in the release candidate and have tips for how to get it to recognize the java protobuf descriptor classes that are compiled in the referenced jar?
    m
    k
    • 3
    • 12
  • y

    Yaroslav Bezruchenko

    10/13/2022, 9:20 PM
    Hey guys, I'm setting up new Flink application, trying to run it but I get:
    Exception in thread "main" java.lang.reflect.InaccessibleObjectException: Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module @544a2ea6
    which basically means that it can't serialize String, somehow. Any tip what to do in this case?
    m
    • 2
    • 4
  • j

    Jin Yi

    10/13/2022, 10:07 PM
    i'm debugging a TypeExtractor issue where i want to output some more information in the exception being thrown. i made some local changes to
    flink-core
    on the version i want. this package is transitively provided from
    flink-streaming-java
    afaict. my typical way of overriding the package for explicit dependencies doesn't seem to be working
    c
    • 2
    • 4
  • a

    Avinash

    10/14/2022, 6:07 AM
    Hi Guys 👋 I’m trying to access hive with the hive connector from a pyflink job, I’ve included the hive jar as follows,
    Copy code
    batch_table_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/avinashkumar/dev/flinky/flink-sql-connector-hive-2.3.6_2.12-1.15.2.jar")
        batch_table_env.get_config().get_configuration().set_string("pipeline.classpaths", "file:///avinashkumar/***/dev/flinky/flink-sql-connector-hive-2.3.6_2.12-1.15.2.jar")
    
        catalog_name = "flink_hive"
        default_database = "test_catalog"
        hive_conf_dir = "/opt/hive-conf"
        hive_catalog = HiveCatalog(catalog_name, default_database, hive_conf_dir)
        batch_table_env.register_catalog(hive_catalog)
    Getting the following error
    py4j.protocol.Py4JError: org.apache.flink.table.catalog.hive.HiveCatalog does not exist in the JVM
    Should I be using a different jar or a different way to bundle the jar or not setting the path right? Any help is greatly appreciated
  • d

    ding bei

    10/14/2022, 1:30 PM
    hey guys ,i am new to flink ,i run a flink job on k8s with kubernates operator. the thing is flink taskmanager keep writing things to its disk even though there is no coming data which means all operators not working at all. any idea what might cause it? and what is it that flink taskmanager write ?
    d
    • 2
    • 2
  • v

    Varun Sayal

    10/14/2022, 2:39 PM
    How do folks handle application evolution for stateful processing apps. One thing we have noticed is that adding a kafka source to our DAG invalidates the savepoint of the old application
    c
    • 2
    • 8
1...232425...98Latest