https://flink.apache.org/ logo
Join SlackCommunities
Powered by
# troubleshooting
  • j

    Jasmin Redzepovic

    03/07/2023, 12:06 AM
    Hi all! šŸ‘‹ Could someone please help me with this? I’m not sure whether this is a bug or if I misconfigured something. Using Flink
    1.15.2
    and Flink k8s operator
    1.2.0
    . Error:
    switched from INITIALIZING to FAILED with failure cause: java.lang.NullPointerException: Initial Segment may not be null
    I have implemented a Window Top-N job that uses Kafka topic as a source and another Kafka topic as a sink. When running the job locally in the IDE, everything works well and results are outputted to the Kafka sink. However, when I deploy the job to the Flink Kubernetes Operator, I get an
    Initial Segment may not be null
    error. Detailed stack trace in thread 🧵
    a
    • 2
    • 3
  • p

    Pratyush Sharma

    03/07/2023, 2:17 AM
    Hey šŸ‘‹ , Flink version : 1.13 State Backend: rocksdb incremental checkpointing We recently saw some data loss after a checkpoint completed followed by a TM shutdown. The resulting job was unable to recover with the following error. We recovered by rolling back to the latest -1 checkpoint.
    Copy code
    INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - deduplicate   (193/360) (ccd789f9a61f9a361926927ae4faa43d) switched from INITIALIZING to FAILED on  (dataPort=6121).
    [2023-03-03 09:00:03.121693] org.apache.flink.util.SerializedThrowable: Exception while creating StreamOperatorStateContext.
    [2023-03-03 09:00:03.121733] 	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:254) ~[s3_ha_store_deploy.jar:?]
    [2023-03-03 09:00:03.121829] 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) 
    [2023-03-03 09:00:03.121869] 	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441) 
    ~~~
    
    [2023-03-03 09:00:03.123298] Can't access /1197217.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1197217.sst: No such file or directory
    [2023-03-03 09:00:03.123347] Can't access /1197223.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1197223.sst: No such file or directory
    [2023-03-03 09:00:03.123394] Can't access /1197237.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1197237.sst: No such file or directory
    [2023-03-03 09:00:03.123441] Can't access /1163418.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1163418.sst: No such file or directory
    [2023-03-03 09:00:03.123514] Can't access /1118503.sst: IO error: while stat a file for size: /pay/flink/tmp/job_e3f516c4bdd0854e35d31b90fa9f3296_op_KeyedProcessOperator_62787344f3050171e45f898702868550__193_360__uuid_f62ff828-56a7-4a12-8abb-670f4ccc65ba/db/1118503.sst: No such file or directory
    Shutdown hook threw the following error:
    Copy code
    [2023-03-03 04:50:12.071855] WARN  og.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Exception while deleting local state directory for allocation id fa09ca77ce735c8e7383rbe61abbf07.
    [2023-03-03 04:50:12.071921] java.nio.file.DirectoryNotEmptyException: /pay/flink/taskmanager-state/localState/aid_fa09ca77ce73995c8e7383be61abbf07/jid_fa98b0d9e39594b63269faf0fe829f/vtx_62787344f3050171e45f8987068550_sti_192/chk_71201
    [2023-03-03 04:50:12.071972] 	at sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:242) ~[?:1.8.0_352]
    [2023-03-03 04:50:12.072021] 	at sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:108) ~[?:1.8.0_352]
    [2023-03-03 04:50:12.072073] 	at java.nio.file.Files.deleteIfExists(Files.java:1165) ~[?:1.8.0_352]
    [2023-03-03 04:50:12.072110] 	at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:332) ~[?]
    [2023-03-03 04:50:12.072167] 	at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308) ~[?]
    [2023-03-03 04:50:12.072223] 	at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:383) ~[?]
    [2023-03-03 04:50:12.072270] 	at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247) ~[?]
    [2023-03-03 04:50:12.072330] 	at org.apache.flink.util.FileUtils.cleanDirectoryInternal(FileUtils.java:361) ~[?]
    [2023-03-03 04:50:12.072377] 	at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:322) ~[?]
    [2023-03-03 04:50:12.072415] 	at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308) ~[?]
    [2023-03-03 04:50:12.072454] 	at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:383) ~[?]
    [2023-03-03 04:50:12.072491] 	at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247) ~[?]
    [2023-03-03 04:50:12.072531] 	at org.apache.flink.util.FileUtils.cleanDirectoryInternal(FileUtils.java:361) ~[?]
    [2023-03-03 04:50:12.072569] 	at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:322) ~[?]
    [2023-03-03 04:50:12.072614] 	at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308) ~[?]
    [2023-03-03 04:50:12.072654] 	at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:383) ~[?]
    [2023-03-03 04:50:12.072703] 	at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247) ~[?]
    [2023-03-03 04:50:12.072752] 	at org.apache.flink.util.FileUtils.cleanDirectoryInternal(FileUtils.java:361) ~[?]
    [2023-03-03 04:50:12.072798] 	at org.apache.flink.util.FileUtils.deleteDirectoryInternal(FileUtils.java:322) ~[?]
    [2023-03-03 04:50:12.072842] 	at org.apache.flink.util.FileUtils.deleteFileOrDirectoryInternal(FileUtils.java:308) ~[?]
    [2023-03-03 04:50:12.072889] 	at org.apache.flink.util.FileUtils.guardIfNotThreadSafe(FileUtils.java:383) ~[?]
    [2023-03-03 04:50:12.072924] 	at org.apache.flink.util.FileUtils.deleteFileOrDirectory(FileUtils.java:247) ~[?]
    [2023-03-03 04:50:12.072970] 	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.cleanupAllocationBaseDirs(TaskExecutorLocalStateStoresManager.java:289) ~[?]
    [2023-03-03 04:50:12.073012] 	at org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager.shutdown(TaskExecutorLocalStateStoresManager.java:237) ~[?]
    [2023-03-03 04:50:12.073049] 	at org.apache.flink.util.ShutdownHookUtil.lambda$addShutdownHook$0(ShutdownHookUtil.java:39) ~[?]
    [2023-03-03 04:50:12.073081] 	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_352]
  • k

    kingsathurthi

    03/07/2023, 4:11 AM
    Hi team, How can i change the parallelism value of an operator in run time of running job?
    s
    • 2
    • 3
  • c

    chunilal kukreja

    03/07/2023, 4:32 AM
    Hi Team, Any recommendations/suggestions, how to trigger savepoints in production system? I am using flink 1.16.0 version with native kubernetes deployment. And would like to know standard approach or one that is mostly used by folks in production to take savepoints & restore a job with the same. Thanks in advance !
    s
    • 2
    • 2
  • l

    Lauri SuurvƤli

    03/07/2023, 7:37 AM
    Hi team! I have been experiencing an issue with task manager heap memory when running Flink on EMR cluster. I have a simple stateless job that consumes messages from Kafka, applies a transformation on the message using ProcessFunction and produces the output back to Kafka. When there is constant flow of messages, then there are no issues and all the metrics seem to be fine. However if there are no new messages to be consumed from Kafka and the job just sits idle, then the task manager heap usage starts to slowly creep up. There have been occasions where it creeps up so far that I get a OutOfMemoryError and the job restarts. Do anyone have any ideas on what could cause this heap memory issue when there are no messages going through the system? It does not seem to be a problem when Flink is actively processing messages and only happens when the job sits idle for a long period time. Any pointers or guesses on what could cause this would be very helpful.
  • y

    Yang LI

    03/07/2023, 9:10 AM
    Hello team! Someone know how how disable taskmanager cpu limit factor in kubernetes instead of keeping it all the time and no possibility to do not put it? https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#kubernetes-taskmanager-cpu-limit-factor In my project if I don't set this limit in kubernetes, my flink cluster's cpu usage will be better
    s
    • 2
    • 5
  • d

    Dima Sheludko

    03/07/2023, 3:47 PM
    hi! does somebody know how to configure checkpointing to s3 using presto and aws iam without passing
    Copy code
    s3.access-key: your-access-key
    s3.secret-key: your-secret-key
    i have serviceAccount and has configured iam, but with default configuration receive 403 error currently i am trying to use
    hive.s3.*
    options to configure it, but still working on it and don`t sure it will help
    s
    c
    • 3
    • 7
  • v

    Vishnu Prashaanth Penu Sathiyamoorthy

    03/07/2023, 4:55 PM
    Hi, I'm trying to create a datastream in Pyflink. Reading from Azure ADLS2 and writing to ADLS2. Using version 1.17 as writing to ADLS2 is supported from 1.17 I believe. I'm getting the following error. • Added plugins for azure in azure-fs-hadoop • Tried adding plugin for fs-hadoop-shaded (not sure if this will help) Is there any sample project for streaming to/from Azure ADLS2. Thanks for your help
    d
    l
    • 3
    • 10
  • j

    Jeesmon Jacob

    03/07/2023, 6:55 PM
    Hi team, in our flink job we are taking checkpoint every 5 mnts. Is there any way to delay shutdown of a taskmanager (when taskmanager gets SIGTEM signal) until next checkpoint is taken or force checkpoint just before shutdown? The context is, in our kubernetes cluster we recycle nodes older than 7d. Candidates for recycling is discovered in a 20mnts interval so we don't know when exactly node recycling is going to happen. If task manager is killed just before next checkpoint we need to reprocess huge amount of events which we are trying to avoid. We know a node can go down unexpectedly but we are atleast trying to improve expected node recycling scenario. We are using flink kubernetes operator for job deployment. Thanks!
    m
    • 2
    • 4
  • s

    Superskyyy

    03/07/2023, 9:12 PM
    Hi team, if I want to implement an online machine learning algorithm (Stateful processing), is the table API sufficient? (I need to groupby/keyby the table source, then send each row to an algorithm). I'm confused when is the Datastream API comes into best usage in terms of stateful nature of jobs.
    s
    • 2
    • 1
  • s

    Sergio Sainz

    03/07/2023, 10:24 PM
    Hi Team, I think the fine-grained resource management is very cool feature. But it has a legend like ā€œMVPā€ in the documentation page: Note: This feature is currently an MVP (ā€œminimum viable productā€) feature and only available to DataStream API Do we know when will it become a GA feature (like non-MVP)? Thanks for attention! (https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/finegrained_resource/)
  • s

    Sergio Sainz

    03/07/2023, 10:34 PM
    Hi Team, I want to use elastic scaling, do you know the plan when elastic scaling reactive-mode will become a GA feature? (as it states it is MVP feature currently) https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/
    g
    s
    +2
    • 5
    • 15
  • s

    Sergio Sainz

    03/07/2023, 10:36 PM
    Hi Team. In regards to fine-grained resource management feature, if we create a slot sharing group asking for resource larger than task manager memory or CPU configured capacity (worker node), will job fail? Thanks for the attention ~
  • r

    RICHARD JOY

    03/08/2023, 12:48 AM
    Hi everyone, I’ve a question around secrets usage with flink image pull from private repository for Flinkdeployment. I used kubernetes.container.image.pull-secrets: ā€œ certified-v2ā€ under spec.flinkConfiguration but it’s not taking the secret it seems. Failing continuously with ImagePullBackOff authentication error while pod for Flinkdeployment spin up. Is it the right way to provide secrets ?
  • s

    sharad mishra

    03/08/2023, 1:23 AM
    Hi šŸ‘‹ , Is Rest API available in flink on `application-mode`(on yarn) ? or does it always require a cluster ?
    d
    k
    • 3
    • 2
  • g

    Gerald

    03/08/2023, 9:55 AM
    Hey, we are currently exploring Flink SQL for one of our data sync use cases and we are struggling with Casts from MULTISET to ARRAY for a nested property. Let me briefly explain the (simplified) setup: • Person and Address are Flink tables based on two Kafka source topics backed by Debezium, • PersonUpdated is a sink table writing to a Kafka topic
    Copy code
    CREATE TABLE Person (
        Id int,
        Firstname string,
        Lastname string,
        DateOfBirth date
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'cdc.FlinkResearch.dbo.Person',
      ...
      'format' = 'debezium-avro-confluent',
      'debezium-avro-confluent.url' = '...'
    );
    
    CREATE TABLE Address (
        Id int,
        Street string,
        ZipCode string,
        City string,
        Country string,
        PersonId int
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'cdc.FlinkResearch.dbo.Address',
      ...
      'format' = 'debezium-avro-confluent',
      'debezium-avro-confluent.url' = '...'
    );
    
    CREATE TABLE PersonUpdated (
        Id int,
        Firstname string,
        Lastname string,
        DateOfBirth date,
        Addresses ARRAY<ROW(Street string, City string, ZipCode string, Country string)>,
    
        primary key (Id) not enforced
    ) WITH (
      'connector' = 'upsert-kafka',
      'topic' = 'cdc.FlinkResearch.dbo.PersonUpdated',
      ...
      'value.format' = 'avro-confluent',
      'value.avro-confluent.url' = '...',
      'key.format' = 'avro-confluent',
      'key.avro-confluent.url' = '...'
    );
    The idea is to create PersonUpdated entries for every person including the person's addresses as a list. We have tried various ways of casts in our (insert) queries, but still no success:
    Copy code
    select p.Id, p.Firstname, p.Lastname, p.DateOfBirth, cast((select collect(row(Street, City, ZipCode, Country)) from Address where PersonId = p.Id) as ARRAY<ROW<Street string, City string, ZipCode string, Country string>>) as Addresses from Person p;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type RecordType(VARCHAR(2147483647) EXPR$0, VARCHAR(2147483647) EXPR$1, VARCHAR(2147483647) EXPR$2, VARCHAR(2147483647) EXPR$3) MULTISET to type RecordType(VARCHAR(2147483647) Street, VARCHAR(2147483647) City, VARCHAR(2147483647) ZipCode, VARCHAR(2147483647) Country) ARRAY
    Even more specific casts couldn't solve the issue:
    Copy code
    select p.Id, p.Firstname, p.Lastname, p.DateOfBirth, cast((select collect(cast(row(Street, City, ZipCode, Country) as ROW<Street string, City string, ZipCode string, Country string>)) from Address where PersonId = p.Id) as ARRAY<ROW<Street string, City string, ZipCode string, Country string>>) as Addresses from Person p;
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.calcite.sql.validate.SqlValidatorException: Cast function cannot convert value of type RecordType(VARCHAR(2147483647) Street, VARCHAR(2147483647) City, VARCHAR(2147483647) ZipCode, VARCHAR(2147483647) Country) MULTISET to type RecordType(VARCHAR(2147483647) Street, VARCHAR(2147483647) City, VARCHAR(2147483647) ZipCode, VARCHAR(2147483647) Country) ARRAY
    Is there a way to cast nested multisets to an array with plain Flink SQL? Or are there alternatives to the
    collect
    function that would avoid casting at all?
    m
    h
    • 3
    • 4
  • a

    Ari Huttunen

    03/08/2023, 10:15 AM
    It took a bit of trial-and-error to figure out the correct syntax, only to find out that this is not supported yet.
    Copy code
    save_sample_data = f"""
    CREATE TABLE save_sample_data
    PARTITIONED BY (EVENT_DAY, EVENT_HOUR)
    WITH (
      'connector' = 'filesystem',
      'path' = '{EGRESS_S3_PATH_FOR_SAMPLES}',
      'format' = 'parquet'
    )
    AS SELECT * FROM input_data
    WHERE MINUTE(ts) = 1 AND HOUR(ts) = 10;
    """
    table_env.execute_sql(save_sample_data)
    I tried to put the PARTITIONED.. at every location. šŸ˜„ I also would have wanted to sample a percentage of the input data.
    m
    • 2
    • 3
  • y

    Yuval Itzchakov

    03/08/2023, 11:16 AM
    I have a usecase where I need one stream to progress up to a certain point, and only then evaluate the elements of a different stream. Is there a straight forward way to do that?
  • y

    Yang LI

    03/08/2023, 1:36 PM
    Hello guys, one question about kafka offset and savepoint. Can you confirm that I change kafka consumer group name of a flink job and start with the savepoint generated by old flink job. I can't re-use the offset information saved in that savepoint, right? Or we don't really keep kafka offsets at all in savepoint ?
    s
    • 2
    • 5
  • m

    Mali

    03/08/2023, 3:12 PM
    hello everyone, I am trying to create Flink Deployment and FlinkSessionJob. My application is working well when i create deployment only. But i have a problem with jar files Here is my yaml file;
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: test
      namespace: flink
    spec:
      ingress:
        template:  "{{name}}.<http://test.com|test.com>"
        className: "nginx"
        annotations: {}
      image: <private pyflink repo>
      imagePullPolicy: "Always"
      flinkVersion: v1_16
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "1"
      serviceAccount: flink
      jobManager:
        podTemplate:
          apiVersion: v1
          kind: Pod
          metadata:
           name: pod-template
          spec:
            serviceAccount: flink
            containers:
            # Do not change the main container name
              - name: flink-main-container
                envFrom:
                - secretRef:
                    name: flink-secret
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
        replicas: 1
    
    ---
    
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: rabbitmq-to-kafka
      namespace: flink
    spec:
      deploymentName: test
      job:
        jarURI: local:///opt/flink/lib/flink-sql-connector-rabbitmq-1.16.0.jar;local:///opt/flink/lib/flink-sql-connector-kafka-1.16.0.jar
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/rabbitmq_to_kafka.py"]
        parallelism: 4
        upgradeMode: stateless
        
    ---
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: kafka-to-s3-raw
      namespace: flink
    spec:
      deploymentName: test
      job:
        jarURI: local:///opt/flink/lib/flink-sql-connector-rabbitmq-1.16.0.jar;local:///opt/flink/lib/flink-sql-connector-kafka-1.16.0.jar
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/kafka_to_s3_raw.py"]
        parallelism: 4
        upgradeMode: stateless
    I am getting ā€ resource id: ResourceID{name=ā€˜kafka-to-s3-raw’, namespace=ā€˜flink’}, version: 6850788} failed. org.apache.flink.kubernetes.operator.exception.ReconciliationException: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme ā€˜local’. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.ā€ It is working well when i put jobURI inside of deployment. I tried different alternatives but didn’t figure it out. Can you help me about this error ? Many thanks.
    k
    • 2
    • 6
  • s

    sharad mishra

    03/08/2023, 3:26 PM
    Hello, I’m using Flink(1.16) on Yarn in application mode. I’ve provided log4j.properties file under
    conf
    folder for Flink and I can see my
    taskmanager.log
    are giving output in expected format. However logs which are re-directed to
    taskmanager.err
    are missing
    timestamp
    field and not in expected format. e.g. one of taskmanager.err file(missing timestamp):
    Copy code
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 4
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 2
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: cluster.evenly-spread-out-slots, false
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: yarn.application.name, DCN-LogParser-Workflow
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: yarn.application.queue, default
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: $internal.application.main, ca.ix.dcn.job.JobRunner
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.memory.process.size, 8192m
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: metrics.reporter.prom.port, 9250-9260
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: execution.savepoint-restore-mode, NO_CLAIM
    [main] INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: execution.savepoint.path, <hdfs://blackhole-auditlog/data/dcn-log-parser/checkpointing/073f246a185b81d373d218cd80abefd5/chk-762>
    Here is the configuration for log4j file, that I’m using:
    Copy code
    #Define root logger options
    log4j.rootLogger=INFO, file, console
    
    #Define console appender
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    logrj.appender.console.Target=System.out
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    
    #Define rolling file appender
    log4j.appender.file=org.apache.log4j.RollingFileAppender
    log4j.appender.file.File=${log.file}
    log4j.appender.file.Append=true
    log4j.appender.file.ImmediateFlush=true
    log4j.appender.file.MaxFileSize=10MB
    log4j.appender.file.MaxBackupIndex=5
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    What could be causing this ?
    • 1
    • 1
  • a

    Amir Hossein Sharifzadeh

    03/08/2023, 3:28 PM
    Is there any real example of creating a datastream for Kafa Connector in Java? There are many examples online using FlinkKafkaProducer09, FlinkKafkaProducer11… but the latest version of flink api does not contain does classes and FlinkKafkaProducer class is deprecated!
    m
    • 2
    • 2
  • a

    Ari Huttunen

    03/08/2023, 6:38 PM
    I need a median function, and some quantiles. I wrote this
    Copy code
    @udaf(result_type=DataTypes.FLOAT(), func_type="pandas")
    def median_udaf(v):
        return v.median()
    unfortunately it doesn't work due to
    Copy code
    Pandas UDAFs are not supported in streaming mode currently.
    Please check the documentation for the set of currently supported SQL features.
    What can I do? We currently use 1.16, but I figure 1.17 is just around the corner so that could be used as well.
    d
    • 2
    • 15
  • a

    Amir Hossein Sharifzadeh

    03/09/2023, 12:33 AM
    When I run this code with IntelliJ IDEA:
    Copy code
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    I get this error: Exception in thread ā€œmainā€
    org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
    at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:109)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:101)
    at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122)
    at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94)
    my pom.xml also contains:
    Copy code
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>${flink.version}</version>
        <scope>test</scope>
    </dependency>
    where:
    Copy code
    <properties>
        <flink.version>1.16.0</flink.version>
    • 1
    • 1
  • u

    ęŽå®‡čˆŖ

    03/09/2023, 1:12 AM
    Hi all, I would like to know how to capture the event of the end of message consumption in flink?
  • z

    Zhiyu Tian

    03/09/2023, 2:34 AM
    Hi all, Are there some settings needs to be tuned, for the Flink WebUI says "No Available Metric"? Sometime the Flink WebUI shows the metrics, but sometimes it does not.
  • s

    sziwei

    03/09/2023, 3:49 AM
    k 1.16.0 Writing data to s3 with flink sql reports the following error:Caused by: java.lang.UnsupportedOperationException: Cannot sync state to system like S3. Use persist() to create a persistent recoverable intermediate point.
    s
    • 2
    • 3
  • s

    sziwei

    03/09/2023, 3:51 AM
    After I used presist(), I found that I still got this error
  • s

    Slackbot

    03/09/2023, 7:10 AM
    This message was deleted.
    m
    • 2
    • 2
  • k

    kingsathurthi

    03/09/2023, 7:45 AM
    where do I find the system requirement of flink kubernetes operator?
    g
    • 2
    • 3
1...626364...98Latest