https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Andrej Mitrovic

    06/02/2023, 8:31 AM
    Hello community, Does anyone know of any up to date Rust SDK for Apache Flink Stateful Functions? This one seems a bit dated: https://github.com/aljoscha/statefun-rust. But maybe it's still usable? I see there was one major new release of Apache StateFun since
    statefun-rust
    's last commit: https://flink.apache.org/2021/04/15/stateful-functions-3.0.0-remote-functions-front-and-center/. But I don't see any mention of API breaking changes.
    g
    • 2
    • 1
  • m

    Mourad HARMIM

    06/02/2023, 11:45 AM
    Any suggestion?
    d
    • 2
    • 6
  • y

    Yuliya Bogdan

    06/02/2023, 12:32 PM
    Hi all, I’m using
    FileSystem
    connector to read avro files from a path which may contain non avro files that have to be filtered out (no partitioning), what I assume is a common case. Is there an easy way to do it? I was looking for a way to pass custom file filter predicate to the default
    NonSplittingRecursiveEnumerator
    , or specify
    FileEnumrator
    provider to the
    FileSource
    without the need to implement custom connector, but it seems to be tricky because
    FileSource.FileSourceBuilder
    , which has a setter for the FileEnumerator, is initiated in private method in FileSystemTableSource.
    • 1
    • 1
  • j

    Jirawech Siwawut

    06/02/2023, 4:10 PM
    Hi all. Is it possible to set up init script when we start Flink SQL Gateway? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/rest/#rest-api. I see that we can do that with Flink sql client, but not sure about Flink SQL Gateway. My goal is to init and create kafka topic for testing so that people does not need to do that everytime.
  • t

    Tarek Ajjour

    06/02/2023, 6:15 PM
    Posting this here from other channel https://apache-flink.slack.com/archives/C03JKTFFX0S/p1685729088580199
    d
    • 2
    • 2
  • v

    Vitor Leal

    06/03/2023, 5:56 PM
    For UPDATEs, can I always assume the DELETE and the subsequent INSERT arrive in the same order?
  • h

    Hygor Knust

    06/03/2023, 8:58 PM
    Hi all, I have a Flink SQL job currently operational with the
    table.exec.resource.default-parallelism=16
    . It’s performing well generally, but there’s a join operation at the end of the job graph which seems to be causing a bottleneck. Upon retrieving the job
    INFO
    , I noticed that it’s using the
    GLOBAL
    ship strategy and it seems to be forcing the operator to maintain a parallelism of 1.
    Copy code
    {
      "id": 96,
      "type": "Join[93]",
      "pact": "Operator",
      "contents": "...",
      "parallelism": 1,
      "predecessors": [
        {
          "id": 88,
          "ship_strategy": "GLOBAL",
          "side": "second"
        },
        {
          "id": 94,
          "ship_strategy": "GLOBAL",
          "side": "second"
        }
      ]
    }
    I’ve tried searching through the documentation for more information on this, but I haven’t found anything yet. I would like to know why is it doing this, and if there is a way to configure it. Thank you!
    d
    r
    a
    • 4
    • 9
  • h

    Hangyu Wang

    06/05/2023, 2:53 AM
    Hi all, I try to create table through Table&SQL Api to read protobuf local file. The job is successfully finished, but the table has no data loaded, pretty wired. Anyone could help me?
    Copy code
    tableEnv.executeSql("CREATE TABLE metric (\n" +
                    "  metric_name STRING,\n" +
                    "  function_name STRING,\n" +
                    "  line_number INT\n" +
                    ") WITH (\n" +
                    " 'connector' = 'filesystem',\n" +
                    " 'path' = '" + parameter.get("input_file") + "', \n" +
                    " 'format' = 'protobuf',\n" +
                    " 'protobuf.message-class-name' = 'MetricTest',\n" +
                    " 'protobuf.ignore-parse-errors' = 'true'\n" +
                    ")");
    Table t = tableEnv.from("metric");
    t.execute().print();
    Here is result:
    Copy code
    ~/Downloads/flink-1.17.0/bin/flink run target/import-metric-event-1.0-SNAPSHOT.jar --input_file metric_test
    Job has been submitted with JobID 9128383c4d60cccaa252145478f31ee5
    Empty set
    And here is the protobuf file:
    Copy code
    metric_name: "test_metric_name"
    function_name: "test_function_name"
    line_number: 111
    Protobuf file:
    Copy code
    syntax = "proto3";
    
    package metric;
    
    message MetricTest {
      string metric_name = 1;
      string function_name = 2;
      uint32 line_number = 3;
    }
    m
    • 2
    • 1
  • s

    Sumit Khaitan

    06/05/2023, 7:44 AM
    Hi , Seeing this issue in our Flink Pipeline as well. https://issues.apache.org/jira/browse/FLINK-23886. Using Flink version: 1.13.6. I see that the status of the issue is still open. Any solution for this ?
    m
    • 2
    • 2
  • h

    Hangyu Wang

    06/05/2023, 10:02 AM
    Anyone could help out of this error? The iceberg-aws jar is already in the lib/ file.
    Copy code
    Caused by: java.lang.NoSuchMethodError: 'java.lang.Long org.apache.iceberg.util.PropertyUtil.propertyAsNullableLong(java.util.Map, java.lang.String)'
    	at org.apache.iceberg.aws.AwsProperties.<init>(AwsProperties.java:744) ~[iceberg-aws-1.1.0.jar:?]
    	at org.apache.iceberg.aws.s3.S3FileIO.initialize(S3FileIO.java:355) ~[iceberg-aws-1.1.0.jar:?]
    	at org.apache.iceberg.CatalogUtil.loadFileIO(CatalogUtil.java:295) ~[iceberg-flink-runtime-1.14-1.0.0.jar:?]
    	at org.apache.iceberg.jdbc.JdbcCatalog.initialize(JdbcCatalog.java:98) ~[iceberg-flink-runtime-1.14-1.0.0.jar:?]
    	at autox.sim.eval.datalake.CatalogUtil.initCatalog(CatalogUtil.java:26) ~[?:?]
    	at autox.sim.eval.datalake.ImportMetricEvent.main(ImportMetricEvent.java:60) ~[?:?]
    	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
    	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
    	at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-dist-1.17.0.jar:1.17.0]
    	... 13 more
    c
    d
    • 3
    • 5
  • h

    hueiyuan su

    06/05/2023, 3:08 PM
    Hi, Everyone. I would like to implement Flink as our data flow infra, which would consume data from AWS Kinesis and sink to AWS Timestream TSDB. Especially, because our data have many categories, I want to sink to corresponding table in DB based on data categories and schema. Could I implement it to achieve more performance and efficiency? If you any ideas or suggestions, please share to me. I will be appreciated it, thank you.
    a
    • 2
    • 2
  • r

    Raghunadh Nittala

    06/06/2023, 3:50 AM
    Hey Everyone, Is there any document to go through to understand more on the Explain plan in the SQL queries. The “Physical Execution Plan” shows ship_strategy as Hash/Forward. Also, when I generated “Optimized Physical Plan” for 2 queries, the results are showing same cumulative cost. Wondering why the cost of the queries doesnt change. Can someone please explain?
  • a

    Ammar Master

    06/06/2023, 4:08 AM
    Hi folks! Is there a way to set the parallelism for createTemporaryView when converting from
    DataStream
    to a
    Table
    ? I have tried disabling chaining for the upstream operators and setting
    table.exec.resource.default-parallelism
    but it doesn't respect it.
  • s

    sziwei

    06/06/2023, 5:28 AM
    HI Everyone , using aws emr (flink 1.14.2), when I use flink sql in my code to submit jobs, we find that we get an error: "Caused by: java.nio.file.FileAlreadyExistsException: /tmp ", is there a solution to this problem?
  • h

    Hangyu Wang

    06/06/2023, 9:31 AM
    Hi, team! I am trying to create flink table with connector='kafka'. Now I faced an error:
    Copy code
    Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/ConsumerRecord
            at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
            at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
            at java.base/java.lang.Class.getDeclaredMethod(Class.java:2473)
            at java.base/java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1452)
            at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:381)
            at java.base/java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:355)
            at java.base/java.security.AccessController.doPrivileged(Native Method)
            at java.base/java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:355)
            at java.base/java.io.ObjectStreamClass$Caches$1.computeValue(ObjectStreamClass.java:98)
            at java.base/java.io.ObjectStreamClass$Caches$1.computeValue(ObjectStreamClass.java:95)
            at java.base/java.io.ClassCache$1.computeValue(ClassCache.java:73)
            at java.base/java.io.ClassCache$1.computeValue(ClassCache.java:70)
            at java.base/java.lang.ClassValue.getFromHashMap(ClassValue.java:228)
            at java.base/java.lang.ClassValue.getFromBackup(ClassValue.java:210)
            at java.base/java.lang.ClassValue.get(ClassValue.java:116)
            at java.base/java.io.ClassCache.get(ClassCache.java:84)
            at java.base/java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:336)
            at java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:542)
            at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2020)
            at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
            at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
            at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
            at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
            at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
            at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
            at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
            at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
            at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
            at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
            at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
            at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
            at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
            at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:534)
            at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:522)
            at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:476)
            at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:383)
            at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:166)
            at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.<init>(RegularOperatorChain.java:60)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:688)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
            at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.ConsumerRecord
            at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
            at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
            at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
            ... 45 more
    And sure I have put the kafka-client.jar to the flink/lib/
    m
    • 2
    • 30
  • v

    Viktor Hrtanek

    06/06/2023, 2:15 PM
    Hello, it might be dummy question, but is it possible to pass additional parameters, besides e.g. Row processed, to map function in PyTable? The code below complains about
    AttributeError: 'Test' object has no attribute '_set_takes_row_as_input'
    and when i add it, then the error says
    TypeError: 'Test' object is not callable
    Copy code
    def method1(input):
        return "#".join([input, input])
    
    class Test(MapFunction):
    
        def __init__(self, testValue):
            self.key = None
            self.testValue = testValue
    
        # def _set_takes_row_as_input(self):
        #     self._takes_row_as_input = True
        #     return self
    
        def map(self, data, testValue, output_type=DataTypes.ROW([DataTypes.FIELD("FIRST_NAME", DataTypes.STRING()),
                                        DataTypes.FIELD("FIRST_NAM", DataTypes.STRING()),
                                        DataTypes.FIELD("FIRST_NA", DataTypes.STRING())])):
            return Row(
                FIRST_NAME = data['FIRST_NAME'],
                FIRST_NAM = data['EMAIL'],
                FIRST_NA = method1(self.testValue)
            )
    
    source_table = table_env.from_path("table")
    source_table.map(Test(testValue="DummyValue")).execute().print()
    then i tried with udf, but no success. Error says
    TypeError: __call__() got an unexpected keyword argument 'dummyVariable'
    Copy code
    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("FIELD_1", DataTypes.STRING()),
                                        DataTypes.FIELD("FIELD_2", DataTypes.STRING()),
                                        DataTypes.FIELD("FIELD_3", DataTypes.STRING())]))
        def func1(data: Row, dummyVariable: str) -> Row:
            return Row(
                FIELD_1 = data['FIRST_NAME'],
                FIELD_2 = data['EMAIL'],
                FIELD_3 = method1(dummyVariable)
            ) 
        source_table = table_env.from_path("table")
        source_table.map(func1(dummyVariable = "dummyValue")).execute().print()
  • i

    Iqbal Singh

    06/06/2023, 5:20 PM
    Hi Everyone, dumb question. I am looking to run a flink application across multiple machines to read data from different per node local directories and write to a common storage layer. Is this feasible design pattern for Flink. Having all the source files in a single shared storage is the preffered way, I am looking here if flink can pull from multiple locations and process the data.
  • d

    Daniel Packard

    06/06/2023, 5:34 PM
    Hi all - crossposting here for visibility: https://apache-flink.slack.com/archives/C03FAEU4MJB/p1686072640735839
    • 1
    • 1
  • a

    Amir Hossein Sharifzadeh

    06/06/2023, 7:27 PM
    Hi everybody, my code works fine on Mac but has an issue with RocksDB on Windows. Can someone help me? I get this error:
    Caused by: java.lang.Exception: Exception while creating StreamOperatorStateContext.
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:256)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:256)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Thread.java:834)
    Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for StreamMap_e4cb5bc1ac70d352198b4a1200f6b296_(4/12) from any of the 1 provided restore options.
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:353)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:165)
    ... 11 more
    Caused by: java.io.IOException: Could not load the native RocksDB library
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:977)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:448)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:99)
    at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:336)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    ... 13 more
    Caused by: java.lang.UnsatisfiedLinkError: C:\Users\...\AppData\Local\Temp\minicluster_8800a67d6f12e85c690a30d4b08db590\tm_0\tmp\rocksdb-lib-072fe938ffd58de9d936923ddbb55280\librocksdbjni-win64.dll: Can't find dependent libraries
    at java.base/java.lang.ClassLoader$NativeLibrary.load0(Native Method)
    at java.base/java.lang.ClassLoader$NativeLibrary.load(ClassLoader.java:2432)
    at java.base/java.lang.ClassLoader$NativeLibrary.loadLibrary(ClassLoader.java:2489)
    at java.base/java.lang.ClassLoader.loadLibrary0(ClassLoader.java:2689)
    at java.base/java.lang.ClassLoader.loadLibrary(ClassLoader.java:2619)
    at java.base/java.lang.Runtime.load0(Runtime.java:765)
    at java.base/java.lang.System.load(System.java:1835)
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:102)
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:82)
    at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.ensureRocksDBIsLoaded(EmbeddedRocksDBStateBackend.java:951)
    This is also my maven:
    <dependency>
    <groupId>com.ververica</groupId>
    <artifactId>frocksdbjni</artifactId>
    <version>6.20.3-ververica-2.0</version>
    </dependency>
  • t

    Tan Kim

    06/07/2023, 1:20 AM
    I’m trying to relay a topic from kafka to another kafka. This is the original record in source topic.
    Copy code
    "json": {
            "eventName": "event-ABC",
            ...
        }
    The source is json format and sink is avro format with confluent-schema registry. Here is my code.
    Copy code
    tableEnv.executeSql("CREATE TABLE source_table (..) WITH (
    'connector'='kafka', 
    'format'='json',
    )")
    
    tableEnv.executeSql("CREATE TABLE sink_table WITH (
    'connector'='kafka',
    'format'='avro-confluent',
    ..
    ) AS SELECT * FROM source_table")
    If I run this code without ‘value.avro-confluent.subject’ configuration, the record is something like this.
    Copy code
    {
        "json": {
            "org.apache.flink.avro.generated.record_json": {
                "eventName": {
                    "string": "event-ABC"
                },
               ..
             }
    }
    I don’t understand why flink-avro inserts “org.apache.flink.avro.generated.record_json” between
    json
    and
    eventName
    . Also
    eventName
    is not just ‘event-ABC’ but
    string: event-ABC
    . Is this bug? or something I missed?
    • 1
    • 1
  • h

    Hangyu Wang

    06/07/2023, 3:33 AM
    Hi team, if we have nested protobuf message, like
    Copy code
    message B {
       repeated message A = 1;
    }
    How can we extract message A with Table API?
    r
    • 2
    • 4
  • s

    Slackbot

    06/07/2023, 7:00 AM
    This message was deleted.
    m
    • 2
    • 4
  • n

    Nicolas P

    06/07/2023, 8:53 AM
    Hello there, I'm trying to implement a conditional upsert in a MySQL table from Flink. Currently, Flink outputs rows in a kafka topic and then kafka connect will upsert the rows in a MySQL table. What we want to achieve is to have a conditional upsert based on a timestamp
    updateTime
    field to only have the latest value (this way we could handle late messages properly). As far as I understood the kafka connector to MySQL doesn't handle conditional upsert nor stored procedure. It's not clear to me either the Flink kafka upsert table sink does handle conditional upsert or not? Would you know if it does? Would you have any other ideas on how to proceed?
    s
    • 2
    • 8
  • s

    Shubham Saxena

    06/07/2023, 11:33 AM
    We are trying to enable HDFS based file system via FileSink on flink. We are using a 3P cloud provider(like AWS, GCP, Azure) which takes away the overhead of setting up vanilla flink cluster running on kubernetes. We just submit the application jar with pipeline definition using vended API and cloud provider executes it on our behalf. However, FileSystem initialisation works differently than other connectors on flink. It gets initialised during TaskManager startup and filesystem object can be fetched at runtime in sink connector using the scheme defined in the file URI. If all hadoop dependencies and configs files are not present at the time of task manager startup, then there will be failures while fetching filesystem object in the sink initialisation after App jar submission. Since we have no control over task manager lifecycle as it is controlled by 3P cloud provider, we are not able to initialise FileSystem correctly for our use case. I would like to understand that why file system can't be initialised as part of operator or connector class. It seems like a conscious design choice that FileSystem initialisation call was intentionally made only once during task manager startup. What is the downside of initialising it again and again in connector/operator class.
    m
    • 2
    • 7
  • z

    Zhong Chen

    06/07/2023, 4:39 PM
    I have an app that uses Kafka sink. The app was running fine initially. I just found the below error logs from the task manager. I don’t understand why it suddenly started to complained about the version compatibility.
    Copy code
    2023-06-07 16:34:45,026 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: data enrichment counters source -> Flat Map -> Process -> (Sink: Writer -> Sink: Committer, Sink: Writer -> Sink: Committer) (1/1)#75129 (8a8c64ebda8a014a3032ac201948250e_cbc357ccb763df2852fee8c4fc7d55f2_0_75129) switched from INITIALIZING to FAILED with failure cause: java.lang.IllegalStateException: Failed to commit KafkaCommittable{producerId=238443, epoch=2, transactionalId=team-data-de-kong-counters-0-13868}
    	at org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl.signalFailedWithUnknownReason(CommitRequestImpl.java:77)
    	at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:119)
    	at org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl.commit(CheckpointCommittableManagerImpl.java:126)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmit(CommitterOperator.java:176)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.commitAndEmitCheckpoints(CommitterOperator.java:160)
    	at org.apache.flink.streaming.runtime.operators.sink.CommitterOperator.initializeState(CommitterOperator.java:121)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:283)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: java.lang.RuntimeException: Incompatible KafkaProducer version
    	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:266)
    	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:253)
    	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.resumeTransaction(FlinkKafkaInternalProducer.java:292)
    	at org.apache.flink.connector.kafka.sink.KafkaCommitter.getRecoveryProducer(KafkaCommitter.java:143)
    	at org.apache.flink.connector.kafka.sink.KafkaCommitter.lambda$commit$0(KafkaCommitter.java:72)
    	at java.base/java.util.Optional.orElseGet(Unknown Source)
    	at org.apache.flink.connector.kafka.sink.KafkaCommitter.commit(KafkaCommitter.java:72)
    	... 16 more
    Caused by: java.lang.NoSuchFieldException: topicPartitionBookkeeper
    	at java.base/java.lang.Class.getDeclaredField(Unknown Source)
    	at org.apache.flink.connector.kafka.sink.FlinkKafkaInternalProducer.getField(FlinkKafkaInternalProducer.java:262)
    	... 22 more
    m
    • 2
    • 18
  • d

    Daniel Packard

    06/07/2023, 5:35 PM
    Is there a way to tell how late a late-arriving event is? (I'm looking at a side output stream of late arriving data in a window operation, and trying to determine some stats about avg/p99/etc lateness)
    j
    d
    • 3
    • 5
  • c

    Carter Fendley

    06/08/2023, 3:07 AM
    👋 Extremely new user here, just trying to load some data from parquet files and finding the process really hard compared to Spark. Every option I see within Flink requires me to specify some sort of schema (example). But I would much prefer to just rely on Parquet’s built in schema…. any advice?
    m
    • 2
    • 5
  • s

    Sylvia Lin

    06/08/2023, 4:30 AM
    Hi team! I'm trying to mount a volume to
    flink-main-container
    , the volume is defined by some annotation injection. Here is the podTemplate snippet, but it gives volumeMount name not found:
    Copy code
    podTemplate:
       ....   
         containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /opt/flink/log
                  name: flink-logs
                - mountPath: /opt/conf
                  name: isc-confbundle
          volumes:
            - name: flink-logs
              emptyDir: { }
    --------
    Error: {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.client.deployment.ClusterDeploymentException: Could not create Kubernetes cluster \"event-router\".","throwableList":[{"type":"org.apache.flink.client.deployment.ClusterDeploymentException","message":"Could not create Kubernetes cluster \"event-router\"."},{"type":"org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException","message":"Failure executing: POST at: <https://192.168.0.1/apis/apps/v1/namespaces/data-infra-event-router/deployments>. Message: Deployment.apps \"event-router\" is invalid: spec.template.spec.containers[0].volumeMounts[1].name: Not found: \"isc-confbundle\". Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=[StatusCause(field=spec.template.spec.containers[0].volumeMounts[1].name, message=Not found: \"isc-confbundle\", reason=FieldValueNotFound, additionalProperties={})], group=apps, kind=Deployment, name=event-router, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Deployment.apps \"event-router\" is invalid: spec.template.spec.containers[0].volumeMounts[1].name: Not found: \"isc-confbundle\", metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={})."}]}
    So I tried manually add this volumes as below, then the error turns out to be duplicate volume name(so I guess the volume should be there?)
    Copy code
    podTemplate:
       ....   
         containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /opt/flink/log
                  name: flink-logs
                - mountPath: /opt/conf
                  name: isc-confbundle
          volumes:
            - name: flink-logs
              emptyDir: { }
            - name: isc-confbundle
              emptyDir: { }
    --------
    Error:  
    [ERROR][data-infra-event-router/event-router] Flink Deployment failed
    org.apache.flink.kubernetes.operator.exception.DeploymentFailedException: Pod "event-router-66b68d8b7b-sq6qr" is invalid: spec.volumes[6].name: Duplicate value: "isc-confbundle"
    Any advice here?
    ✅ 1
    c
    • 2
    • 3
  • s

    Sumit Singh

    06/08/2023, 7:22 AM
    Hi, I am using FlinkSQL to submit a job that uses upsert-kafka connector as sink to publish data to Confluent Topic but getting error that topic is not found. Any advice to resolve this
    Caused by: <http://org.apache.flink.kafka.shaded.org|org.apache.flink.kafka.shaded.org>.apache.kafka.common.errors.TimeoutException: Topic flink not present in metadata after 60000 ms.
    m
    d
    • 3
    • 7
  • h

    Hangyu Wang

    06/08/2023, 8:23 AM
    Hi team! How can we query the ARRAY column with Table API? For instance, I want to extract all the value equals to 1 from the ARRAY<INT> column.
1...858687...98Latest