https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • d

    David Wisecup

    06/12/2023, 6:21 PM
    Any thoughts why this timer won't fire?
    Copy code
    public class GroupedAttributesFunction extends KeyedProcessFunction<String, TransactionAttribute, GroupedAttributes> {
        @Override
        public void processElement(
                TransactionAttribute txnAttr,
                Context ctx,
                Collector<GroupedAttributes> out) throws Exception {
    ...
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + Time.days(maxWindowForAllRules).toMilliseconds());
        }
    
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<GroupedAttributes> out) throws Exception {
    
    
    // FIXME HELP I never enter this code!
    
            GroupedAttributes groupedAttributes = this.groupedAttributesValueState.value();
            if (groupedAttributes != null) {
                <http://log.info|log.info>("TODO clean out groupedAttribute state if older than 30 days");
    
            }
        }
    }
  • d

    David Wisecup

    06/12/2023, 7:42 PM
    I'm using event time. The time range is large enough to have had this triggered. I've also set it to 1 second but still didn't get the function called.
    d
    • 2
    • 8
  • a

    Arijit Dasgupta

    06/12/2023, 7:52 PM
    https://wicaksonodiaz.medium.com/setup-pyflink-development-environment-76d8491a9ad7 I would like to set up Flink locally and then build some test applications using PyFlink. I looked at this link but I am not sure how current it is. Any useful links for me to look at? • I wish to run Flink with PyFlink locally in a Docker container. • Ideally, I'd like to have a Python script running that is generating data in real time. I will process that data as an input stream and also create an output stream in PyFlink. I also see this Dockerfile, but it does not use the latest version on Flink? https://apache.googlesource.com/flink-playgrounds/+/HEAD/pyflink-walkthrough/Dockerfile
    • 1
    • 1
  • a

    Arijit Dasgupta

    06/12/2023, 8:20 PM
    https://github.com/pyflink/playgrounds Is there an update on something like this? I am trying to use the latest PyFlink libraries.
  • a

    Arijit Dasgupta

    06/12/2023, 9:14 PM
    I tried building a docker image with this Dockerfile.
    Copy code
    FROM flink:1.17.1
    
    
    # install python3 and pip3
    RUN apt-get update -y && \
    apt-get install -y python3 python3-pip python3-dev && rm -rf /var/lib/apt/lists/*
    RUN ln -s /usr/bin/python3 /usr/bin/python
    
    # install PyFlink
    RUN pip3 install apache-flink==1.17.1
    However, I received this error. What do I need to change? It's working fine on my Linux machine but receiving this error on my Mac
    #0 35.81  Downloading pemja-0.3.0.tar.gz (48 kB)
    #0 35.81 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 48.5/48.5 KB 14.6 MB/s eta 00000 #0 35.90 Installing build dependencies: started #0 39.29 Installing build dependencies: finished with status 'done' #0 39.30 Getting requirements to build wheel: started #0 39.36 Getting requirements to build wheel: finished with status 'error' #0 39.37 error: subprocess-exited-with-error #0 39.37 #0 39.37 × Getting requirements to build wheel did not run successfully. #0 39.37 │ exit code: 255 #0 39.37 ╰─> [1 lines of output] #0 39.37 Include folder should be at '/opt/java/openjdk/include' but doesn't exist. Please check you've installed the JDK properly. #0 39.37 [end of output] #0 39.37 #0 39.37 note: This error originates from a subprocess, and is likely not a problem with pip. #0 39.37 error: subprocess-exited-with-error #0 39.37 #0 39.37 × Getting requirements to build wheel did not run successfully. #0 39.37 │ exit code: 255 #0 39.37 ╰─> See above for output. #0 39.37 #0 39.37 note: This error originates from a subprocess, and is likely not a problem with pip.
    d
    a
    • 3
    • 4
  • a

    Amir Hossein Sharifzadeh

    06/12/2023, 9:21 PM
    Hi Flink Team: I am going to report an issue in
    MapState
    here: in my code I have created a map sate like
    MapStateDescriptor<String, Integer> totalMapStateDescriptor =
    new MapStateDescriptor<>(
    "totalMapState",
    Types._STRING_,
    Types._INT_);
    totalMap = getRuntimeContext().getMapState(totalMapStateDescriptor);
    After I insert some values like
    totalMap.put(rawPath, rawTotalChunk);
    I see that
    totalMap.get(rawPath)
    is null for one those values. In fact
    Iterable<String> signalKeys = totalMap.keys();
    always returns one key and other keys will be disappear! So, why is like that? I forgot to mention that I get
    rawPath
    from a joined_stream:
    public void processElement(Row left, Row right, ProcessJoinFunction<Row, Row, List<double[][][]>>.Context ctx, Collector<List<double[][][]>> out) throws Exception {String rawPath = String._valueOf_(left.getField(_SUBDIR_STR_)).toLowerCase();
    • 1
    • 2
  • r

    RICHARD JOY

    06/12/2023, 10:34 PM
    Hello everyone! I’m using kubernetes operator 1.4.0 and was running without any issue. However, now I see sporadic issue failing the operator to start up related to informer. Please help, if this can be turned off since I’m not using flinksessionjobs CR at all but only using flink application job.
    Copy code
    "InformerWrapper [ERROR] Informer startup error. Operator will be stopped. Informer: <http://flink.apache.org/b1beta1/namespaces/namespace/flinksessionjobs|flink.apache.org/b1beta1/namespaces/namespace/flinksessionjobs>
    
    java.util.concurrent.ExecutionException: java.net.ConnectException: Failed to connect to /10.9.20.1:443"
    g
    • 2
    • 2
  • i

    Ishan

    06/13/2023, 3:40 AM
    Hi, noob question, when I create a new instance of Hive catalog in a Flink SQL client - I expected it to persist when I exit the client and connect again ( reset the session ), I am not seeing that happen, is that expected? isn't catalog persisted in HMS?
    Copy code
    CREATE CATALOG myhive WITH (
        'type' = 'hive',
        'default-database' = 'mydatabase',
        'hive-conf-dir' = '/opt/hive-conf'
    );
    -- set the HiveCatalog as the current catalog of the session
    USE CATALOG myhive;
    cc @Danny Chen
    m
    • 2
    • 20
  • h

    Hu Guang

    06/13/2023, 6:29 AM
    Hi, folks. I have a batch job who has an operator takes a protobuf object as parameters. But the following error comes up:
    Copy code
    Exception in thread "main" java.lang.IncompatibleClassChangeError: class com.google.protobuf.Descriptors$OneofDescriptor has interface com.google.protobuf.Descriptors$GenericDescriptor as super class
    	at java.lang.ClassLoader.defineClass1(Native Method)
    	at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    	at java.lang.Class.getDeclaredMethods0(Native Method)
    	at java.lang.Class.privateGetDeclaredMethods(Class.java:2729)
    	at java.lang.Class.getDeclaredMethod(Class.java:2156)
    	at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1643)
    	at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
    	at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
    	at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
    	at java.security.AccessController.doPrivileged(Native Method)
    	at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
    	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
    	at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:490)
    	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
    	at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1213)
    	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1120)
    	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    	at java.util.LinkedHashMap.internalWriteEntries(LinkedHashMap.java:332)
    	at java.util.HashMap.writeObject(HashMap.java:1363)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1154)
    	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
    	at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
    	at org.apache.flink.runtime.jobgraph.JobGraphBuilder.setExecutionConfig(JobGraphBuilder.java:85)
    	at org.apache.flink.optimizer.plantranslate.JobGraphGenerator.compileJobGraph(JobGraphGenerator.java:264)
    	at org.apache.flink.client.PlanTranslator.compilePlan(PlanTranslator.java:87)
    	at org.apache.flink.client.PlanTranslator.translateToJobGraph(PlanTranslator.java:50)
    	at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
    	at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
    	at org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
    	at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:82)
    	at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1053)
    	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:958)
    	at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
    Can anyone tell me how could I fix it? Is it related to protobuf version issue?
    m
    • 2
    • 3
  • v

    Vivek

    06/13/2023, 1:58 PM
    Question on TwoPhaseCommittingSink. 1. Is there any open jira issue or existing requirement for supporting single parallelism for Committer, and preCommitTopology? My understanding is that existing Sink Comittter can only support same parallelism to that of Sink writer, and that there are additional checks in CommitterOperator/CommittableCollector for committables summary and committables received by each subtask commitable manager. I believe that single parallelism is potentially supported in postCommitTopology and that's how Global committer is translated for new sink. But we had requirement to have single parallelism before committer as the committer/postCommitTopology is invoked post snapshot/checkpoint.
    m
    • 2
    • 6
  • r

    Robert Steele

    06/13/2023, 2:27 PM
    Hello, I would like to disable web submit function through web UI, I see that if we use the web.submit.enabled=false will also disable the REST api. So I would like to use our Envoy proxy RPAC to deny traffic to url/#/submit page. I have tried this but the page will still load. Can someone help me understand what is happening on the server when I reach the url/#/submit page? Or which url I should be denying.
    a
    m
    • 3
    • 22
  • y

    Yaroslav Bezruchenko

    06/13/2023, 3:05 PM
    How to safely upgrade from Flink Kubernetes Operator 1.4.0 to 1.5.0 (or 1.6.0)? Ideally without downtime
    s
    • 2
    • 3
  • z

    Zhang Zhao

    06/13/2023, 3:48 PM
    Hello everyone!May I ask if you want to write the records of a mysql table into different hudi tables according to different field values in a flink job? For example, if the name field of a mysql table ='test1' is written into hudi's 'test1' table, if the name field of a record ='test2', Write to hudi's 'test2' table...
  • m

    Mingfeng Tan

    06/13/2023, 7:38 PM
    Hello, we hit an exception
    org.rocksdb.RocksDBException: file is too short (7451 bytes) to be an sstable
    . Searched this channel and this had happened to others as well. Anybody know how to resolve it?
  • z

    Zhong Chen

    06/13/2023, 10:04 PM
    I was trying to report metrics to dd by using the dd integration. I found the below error in the logging
  • z

    Zhong Chen

    06/13/2023, 10:04 PM
    Copy code
    ERROR org.apache.flink.runtime.metrics.ReporterSetup               [] - Could not instantiate metrics reporter dghttp. Metrics might not be exposed/reported.
    l
    • 2
    • 6
  • h

    Herat Acharya

    06/13/2023, 10:09 PM
    how do we increase the. task managers when you natively deploy flink ? using ./kubernetes-session.sh we have parallelism of 1 and num of task slots as 8 ... kafka is the source .. we do have a lot of messages.. but number of task manager is still 1 .. why does it stay 1 .. it should increase based on number of messages?
    • 1
    • 1
  • h

    HJK nomad

    06/14/2023, 3:43 AM
    any body here?
  • d

    dp api

    06/14/2023, 5:52 AM
    Just writing a simple code in pyflink that consumes data from a kafka topic and then transforms one of the columns in orders_sum sql_query. However I am not able to run this and gives error "Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath."
    def log_processing():
    env_settings = <http://EnvironmentSettings.in|EnvironmentSettings.in>_streaming_mode()
    t_env = TableEnvironment.create(env_settings)
    t_env.get_config().set("pipeline.jars", "file:///flink-sql-connector-kafka-1.17.1.jar")
    t_env.get_config().set("table.exec.source.idle-timeout", "1000")
    source_ddl = """
    CREATE TABLE restuarant_live_pending_orders(
    rest_id VARCHAR,
    status VARCHAR
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'live_order_status',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'rest_group',
    'scan.startup.mode' = 'specific-offsets',
    'scan.startup.specific-offsets' = 'partition:0,offset:0',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'format' = 'json'
    )
    """
    t_env.execute_sql(source_ddl)
    tbl = t_env.from_path('restuarant_live_pending_orders')
    tbl.print_schema()
    orders_sum = t_env.sql_query ("SELECT rest_id, SUM(CASE WHEN status = 'NEW' THEN 1 ELSE -1 END) AS status_count FROM %s GROUP BY rest_id" % tbl).execute()
    orders_sum.print_schema()
    d
    • 2
    • 15
  • d

    dev Jiang

    06/14/2023, 7:55 AM
    Hi.All.I used Flink1.16 to consume Kafka data and found a warning exception in the taskmanager. log file:
    Copy code
    2023-06-14 15:35:36,897 WARN  org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics [] - Error when getting Kafka consumer metric "records-lag" for partition "topic-21". Metric "pendingRecords" may not be reported correctly.
    java.lang.IllegalStateException: Cannot find Kafka metric matching current filter.
            at org.apache.flink.connector.kafka.MetricUtil.lambda$getKafkaMetric$1(MetricUtil.java:63) ~[flink-iceberg-sink-1.0.jar:?]
            at java.util.Optional.orElseThrow(Optional.java:290) ~[?:1.8.0_202]
            at org.apache.flink.connector.kafka.MetricUtil.getKafkaMetric(MetricUtil.java:61) ~[flink-iceberg-sink-1.0.jar:?]
            at org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.getRecordsLagMetric(KafkaSourceReaderMetrics.java:308) ~[flink-iceberg-sink-1.0.jar:?]
            at org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.lambda$maybeAddRecordsLagMetric$4(KafkaSourceReaderMetrics.java:231) ~[flink-iceberg-sink-1.0.jar:?]
            at java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1660) [?:1.8.0_202]
            at org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics.maybeAddRecordsLagMetric(KafkaSourceReaderMetrics.java:230) [flink-iceberg-sink-1.0.jar:?]
            at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:139) [flink-iceberg-sink-1.0.jar:?]
            at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) [flink-iceberg-sink-1.0.jar:?]
            at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) [flink-iceberg-sink-1.0.jar:?]
            at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-iceberg-sink-1.0.jar:?]
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_202]
            at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_202]
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_202]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_202]
            at java.lang.Thread.run(Thread.java:748) [?:1.8.0_202]
    Have you ever meet it?
    m
    • 2
    • 8
  • h

    Hangyu Wang

    06/14/2023, 8:41 AM
    Hi team! I am planning to stop the Flink job after reading a specific flag for a partition in Kafka. Is it possible for us to define the ending boundary of kafka?
  • k

    kingsathurthi

    06/14/2023, 10:27 AM
    Hi All, I want to understand webhook in flink operator. why it is required along with flink operator main container? is there any pointer to start with?
    s
    • 2
    • 3
  • m

    Michael Parkin

    06/14/2023, 11:18 AM
    Hi - small question - when using file compaction, is there any way to remove the
    compacted-
    prefix? https://github.com/apache/flink/blob/c270a741526def82699a9accbda2e99f42b5a121/flin[…]link/connector/file/sink/compactor/operator/CompactService.java https://github.com/apache/flink/blob/c270a741526def82699a9accbda2e99f42b5a121/flin[…]link/connector/file/sink/compactor/operator/CompactService.java
    s
    • 2
    • 2
  • b

    BX B

    06/14/2023, 2:28 PM
    Using Table & SQL API to build a stateful table which will just have 2 columns: (1) Rest_ID - will be 100 restaurants IDs. They will be randomly getting the Statuses either +1 or -1 implying a pending order count. Same rest_IDs can continue to have +1s and then -1s later (2) Status Count - as it implies will just be either 1 or -1. Data is ingested from an unbounded Kafka stream. And data looks like this -> {"rest_id": "6487f4c6fc13ae161d9008f6", "status": "-1"} {"rest_id": "6487f4c6fc13ae161d9008e5", "status": "1"}... I would like to showcase a stateful table (Apache Flink Runtime) where column 1 can have DISTINCT 100 rest_ID that is static and column 2 can be updating the status_count (sum of 1s and -1s statuses) as the data is ingested from kafka. I have written the boilerplate code below to ingest the kafka information (below)- I would like help to complete the code to create a stateful table and print out the results with changing status_count
    from pyflink.table import EnvironmentSettings, TableEnvironment
    from pyflink.table.expressions import *
    def log_processing():
    env_settings = <http://EnvironmentSettings.in|EnvironmentSettings.in>_streaming_mode()
    t_env = TableEnvironment.create(env_settings)
    ##### specify connector and format jars
    t_env.get_config().set("pipeline.jars", "file:///Users/karanbawejapro/Desktop/flink-sql-connector-kafka-1.17.0.jar")
    t_env.get_config().set("table.exec.source.idle-timeout", "1000")
    source_ddl = """
    CREATE TABLE restaurant_live_pending_orders(
    rest_id VARCHAR,
    status VARCHAR
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'live_order_status',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'rest_group',
    'scan.startup.mode' = 'specific-offsets',
    'scan.startup.specific-offsets' = 'partition:0,offset:0',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'true',
    'format' = 'json'
    )
    """
    tbl = t_env.execute_sql(source_ddl)
    tbl = t_env.from_path('restaurant_live_pending_orders')
    👀 1
    🙂 1
    • 1
    • 1
  • a

    André Luiz Diniz da Silva

    06/14/2023, 5:48 PM
    Hello everyone! how are you? I’m having a problem related I think to class loading. I ’m getting the following error:
    Caused by: java.lang.NoClassDefFoundError: com/google/common/util/concurrent/internal/InternalFutureFailureAccess
    (complete stack trace in the thread). The context is that I have a job that write parquet files and after adding a new sink to write data to a Kafka topic using avro with schema registry this error began to happen. Any ideia what could it be? am I missing some dependency? More details in the thread.
    m
    • 2
    • 15
  • m

    mralfredgui

    06/14/2023, 5:51 PM
    Hi, is there anyone familiar with running flink app in AWS kinesis analytics? I was trying to run a python flink app, it failed to start with the error message below. I couldn’t find any useful information there. Does anyone know how to troubleshoot? Thanks!
    Untitled
    a
    • 2
    • 4
  • i

    Ilya Sterin

    06/14/2023, 7:12 PM
    Hey all. We're working on transitioning our batch ETLs infrastructure to streaming using Flink. One of our stream jobs is written in Flink SQL. It's a query with probably 15 joins (inner and left). We're troubleshooting performance, missing data, and scale issues... First, trying to understand how to joins work with partitioning. How kafka partitions relate to flink joins, etc... 1. Do all the topics that we're joining on need to be partitioned on the same key and data collocated? I can't seem to find this info for Flink, but found information about this for KSQL which seems to state this. 2. If data isn't collocated and partitioned consistently across all join topics, does that mean we will be missing data? 3. How does a Flink SQL job with parallelism work with partitioned topics? Can someone shed some light onto this for us and/or point to some documentation/tutorials?
    m
    • 2
    • 8
  • m

    Matt Wagner

    06/15/2023, 12:37 AM
    👋 We're trying to enable event-time alignment on a pipeline using a Kinesis source and struggling with understanding whether this should be possible as of Flink 1.15. I see FLIP-182 describing adding this functionality to the newer Source interface added in FLIP-27, does that mean watermark alignment isn't available on pre-FLIP-27 sources?
    m
    • 2
    • 3
  • r

    Raghunadh Nittala

    06/15/2023, 3:51 AM
    Hi All, Is there a way we can get metrics when working with Flink SQL? When worked with DataStreams, I’m able to add out of the box metrics as mentioned in the Doc, but trying to figure out if that’s possible with SQL. Thanks
    m
    • 2
    • 4
  • z

    Zhang Zhao

    06/15/2023, 9:24 AM
    Hello everyone!May I ask if you want to write the records of a mysql table into different hudi tables according to different field values in a flink job? For example, if the name field of a mysql table ='test1' is written into hudi's 'test1' table, if the name field of a record ='test2', Write to hudi's 'test2' table...
1...878889...98Latest