https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    James Watkins

    06/08/2023, 10:12 AM
    Hello, I’m trying to enrich a
    transactions
    stream with a
    users
    stream, both Kafka source, using the
    avro-confluent
    value.format so that I can deserialise the Avro message using a schema registry. Looks like this format is only available in the Table API, so I have defined my source connectors using that. I’m trying to join the streams using an Event-Time Temporal Table Join. I added a primary key to the
    users
    table but I get the following error message:
    Copy code
    The Kafka table 'default_catalog.default_database.users' with 'avro-confluent' format doesn't support defining PRIMARY KEY constraint on the table, because it can't guarantee the semantic of primary key.
    Is it possible to use an Event-Time Temporal Table Join with the ‘avro-confluent’ data format?
    d
    m
    • 3
    • 19
  • o

    Or Keren

    06/08/2023, 11:11 AM
    Hey everyone, Using flink 1.17.1, Currently encounter an issue where I use a KafkaSink with EXACTLY_ONCE policy, and a KafkaSource with isolation.level="read_committed". When using read_uncommitted I can see messages in the consumer, but when using "read_committed" I don't get any message. The only reason for it to happen is that the producer does not commit the transaction, and all of the transactions gets aborted although the checkpoint completed (this is a theory, I have no way of confirming it). Does anyone have any idea what can I do to fix this?
    m
    • 2
    • 13
  • a

    Abhishek Gupta

    06/08/2023, 11:17 AM
    I have recently upgraded from version flink version 1.14.4 to 1.16.1 . On Flink UI I can observe that for 1.14.1 it was showing Bytes Sent and Records sent for all the source and sinks but for 1.16.1 it has stopped showing metrics for Bytes Sent and Records sent for source and sinks . I have attached the screenshots for the same job for both the versions . any pointers .
  • a

    Abhishek Gupta

    06/08/2023, 11:27 AM
    m
    • 2
    • 6
  • r

    Raghunadh Nittala

    06/08/2023, 2:32 PM
    Hey Everyone, We’re using Flink SQL to save data to S3, by using ‘filesystem’ connector. We’re added the partitions in the table as well. The issue we see is there are so many small files being created in S3. We are creating 1 parquet file per day in S3. Sink table query:
    Copy code
    CREATE TABLE sink_table_s3 (
      event_id STRING NOT NULL,
      event_type STRING NOT NULL,
      event_name STRING NOT NULL,
      eventId STRING NOT NULL,
      eventName STRING NOT NULL,
      `date` STRING
    ) PARTITIONED BY (eventId, eventName, `date`) WITH (
      'connector' = 'filesystem', 
      'path' = '<path>', 
      'format' = 'parquet',
      'auto-compaction' = 'true'
    );
    Insert query:
    Copy code
    INSERT INTO sink_table_s3 
    SELECT event_id, event_type, event_name, 
    DATE_FORMAT(proc_time, 'yyyy-MM-dd') AS `date`, event_id AS eventId, event_name AS eventName
    FROM source_table;
    I’m adding eventId, eventTime just to make sure those columns are also available in the Parquet file in S3. How can we avoid small files being created?
    m
    s
    • 3
    • 13
  • e

    Elizaveta Batanina

    06/08/2023, 3:53 PM
    Hello team! We are using
    watermark latency
    in our pipeline to track e2e latency (Is it the correct metric to use when using
    metrics.latency.granularity: "single"
    ? 🤔). Sometimes, latency is negative, what would it mean? In flink code watermark latency is defined as
    processing_time - watermark
    . Another question we have is that when using python udfs, PythonGroupAggregate Operator is used, which doesn’t track
    watermark latency
    . How we can use this metric with python? P.S. we are using flinnk 1.17
    d
    • 2
    • 1
  • g

    Gaurav Gupta

    06/08/2023, 6:22 PM
    Hello Everyone, I am trying to explore flink-cep library and trying to run a sample job to start, But its not producing any output. Please let me know if i am doing something wrong.
    Copy code
    package org.example;
    import org.apache.flink.cep.CEP;
    import org.apache.flink.cep.PatternSelectFunction;
    import org.apache.flink.cep.pattern.Pattern;
    import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import java.util.List;
    import java.util.Map;
    
    public class FlinkCEPExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Event> events = env.fromElements(
                    new Event(1, "start"),
                    new Event(2, "middle"),
                    new Event(3, "end")
            );
    
            Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
                    .where(SimpleCondition.of(event -> event.getName().equals("start")))
                    .followedBy("middle")
                    .where(SimpleCondition.of(event -> event.getName().equals("middle")))
                    .followedBy("end")
                    .where(SimpleCondition.of(event -> event.getName().equals("end")));
    
            DataStream<String> output = CEP.pattern(events, pattern)
                    .select(new PatternSelectFunction<Event, String>() {
                        @Override
                        public String select(Map<String, List<Event>> pattern) throws Exception {
                            StringBuilder result = new StringBuilder();
                            for (Event event : pattern.get("start")) {
                                result.append(event.getId()).append("-");
                            }
                            result.append(pattern.get("middle").get(0).getId()).append("-");
                            result.append(pattern.get("end").get(0).getId());
                            return result.toString();
                        }
                    });
            output.print();
            // Execute the job
            env.execute("Flink CEP Pattern Example");
        }
    }
    • 1
    • 2
  • p

    Pankaj Singh

    06/08/2023, 6:24 PM
    Hi team, I was exploring pyflink with KDA. Dependencies: Kafka (inhouse kafka, not on MSK), Pyflink, also have packaged all required python packages as per how to use external package within pyFlink, also created Uber jar for kafka and flink connectors. Properties (consumer - FlinkKafkaConsumer):
    Copy code
    "bootstrap.servers": "kafka-server:9094",
    "group.id": "fin-topic-name",
    "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username='<username>' password='passwd';",
    "sasl.mechanism": "PLAIN",
    "security.protocol": "SASL_PLAINTEXT"
    My program is reading from kafka topic and doing some transformations. Followed this doc https://docs.aws.amazon.com/kinesisanalytics/latest/java/gs-python-createapp.html for configuring KDA. Getting below error when deployed on KDA (but working on local), any idea? Exception: "exception-classifier.filters.user-exception-stack-regex.configuration, Caused by. org.apache.flink.runtime.checkpoint.CheckpointException.+Caused by. java.lang.InterruptedException. sleep interrupted.+at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run;Caused by. org.apache.flink.runtime.checkpoint.CheckpointException.+Caused by. org.apache.flink.kinesis.shaded.com.amazonaws.AbortedException.+at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run.+Caused by. java.lang.InterruptedException. sleep interrupted;Caused by. org.apache.kafka.common.errors.TimeoutException. Timed out waiting for a node assignment;Caused by. org.apache.kafka.common.errors.SaslAuthenticationException. Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-512;Caused by. org.apache.kafka.common.errors.TopicAuthorizationException. Topic authorization failed"
    j
    a
    • 3
    • 33
  • n

    Nihar Rao

    06/08/2023, 7:06 PM
    Hi, What's the recommended way of detecting Full GC count for taskmanager and jobmanager? I believe that these GC metrics won't give me that as I see only metrics for G1 Generation when I ran a application. Will enabling logging which is defined in the docs help me detect the count of Full GC? Thanks
    • 1
    • 1
  • a

    Ankam Praveen

    06/09/2023, 8:19 AM
    Hello, I’m planning on using S3 as a data source where the objects are partitioned by hour in the following format: s3://test-data/<yyyy-MM-dd>/topic/<HH>. I discovered I can use FileSource with the monitorContinuously function, but I’m trying to find a way to ensure that at any given time, it’s only checking for new files in the current hour directory. If I use s3://test-data/<yyyy-MM-dd>/topic as the base path to monitor for new files, it will require terminating the job every day to prevent it from monitoring old directories. Is there a more efficient way to achieve this so that I can limit the FileSource to only monitor and read from the current hour’s directory? Any advice or insights would be greatly appreciated! Thanks!
    s
    • 2
    • 7
  • j

    Juan Carlos Gomez Pomar

    06/09/2023, 8:31 AM
    Hi! Our team is getting several errors related to savepointing and S3, in different jobs.
    UnrecoverableS3OperationException
    AmazonS3Exception: The specified key does not exist.
    When we look in S3, the corresponding key does not exist. It happens after a restart like in this post https://lists.apache.org/thread/qtwho32yzhqhbx7jhnr556bslhpc0x9k. However, there is no error before the above mentioned exception. I can also see it pointed out in prestodb https://github.com/prestodb/presto/issues/18154#issuecomment-1284415321. • Have you find this error? • Is there a way to debug this error? Note: Apparently, this is happening in the jobs that use Ververica Platform High Availability feature, however, it is difficult to pinpoint if that is the true root cause since the error does not happen consistently. Any advice or insights would be greatly appreciated!
    👍 2
  • f

    Felix Hildén

    06/09/2023, 12:01 PM
    Hey! A logging issue: I’m running PyFlink locally simply by building a graph and calling
    execute
    , but for debugging I’d like to change logging levels. Currently I only see warning and up, and I haven’t found a way to show more. In a proper deployment on K8s all logs are visible thanks to configuring
    logConfiguration:log4j-console.properties
    . Is there a way to change the level with some configuration either in code or in a config file? The flink deployment is pointing to the same exact entry point in Python, so surely there must be, but merely having a
    log4j-console.properties
    file didn’t do the job. Where am I going wrong?
    c
    • 2
    • 7
  • h

    Hussain Abbas

    06/09/2023, 1:35 PM
    Since i never got my answer, asking again once more. The Flink Autoscaler stuff, does anyone has worked with Kinesis as source? I see in docs it works best with Kafka but what about Kinesis? Is it worth spending time to test it? Need suggestions?
    j
    d
    • 3
    • 7
  • g

    Gaurav Gupta

    06/09/2023, 2:15 PM
    Hey Everyone, Can anyone guide me regarding usage of flink-cep library. I am able to run a sample code with older version of library(till 1.11.6) but newer versions are not producing any output. Any suggestions ?
  • c

    Clen Moras

    06/09/2023, 2:30 PM
    hello i m looking around to see if anyone has gone through flink clientSQL setup on kubernetes. along with hive meta-store. Currently attempting to read csv from s3 via a hive-metatsore. I have got it working with trino-cluster, but since i already have a flink-ecosystem running, i would like to relegate it to flink if possible and not add another component to the mix. Any suggestion would be greatly appreciated.
    o
    • 2
    • 6
  • o

    Oscar Perez

    06/09/2023, 3:14 PM
    Hi! We are building a new job that initially, the first time, reads data from a CSV file to populate some state and afterwards will not be needing this CSV file and do a keyedcoprocessfunction on 2 streams and updates on the original state. I wonder how I can make the initial state the first time I read from the CSV file? Originally I thought of creating 3 streams and doing a union of the CSV file stream with the accountchange stream since they are of the same nature. See the code snippet for reference:
    Copy code
    val accountChangedStream = accountDataStreamFactory.get().keyBy { it.accountUserId }
    val accountActivityStream = accountActivityDataStreamFactory.get().keyBy { it.userId }
    val csvStream = factory.get().keyBy { it.userId }.process(ReadFirstTimeProcessor())
    val joinedStream = accountActivityStream.union(csvStream)
    val clusterChanges: SingleOutputStreamOperator<CustomerClusterEvent> =
        accountChangedStream
            .connect(joinedStream)
            .process(ActivityClusteringProcessor())
            .name(dataStreamName)
    v
    • 2
    • 3
  • s

    Stefan Bumpus-Barnett

    06/09/2023, 4:00 PM
    Hi Team, happy Friday! We are trying to run simple queries on a Confluent Kafka topic. We are able to successfully connect to the topic and run
    SELECT COUNT(*)
    queries. But, when we try to run
    SELECT *
    queries we get records with all
    null
    fields. The Kafka topic uses Confluent Schema registry to hold the schemas. I was wondering if the fields are not being deserialized properly. Any help would be greatly appreciated! Query:
    Copy code
    CREATE TABLE account
    (
        `ACCT_COND_ID`      INT,
        `ACCT_ID`           INT,
        `ACCT_COND_NM`      STRING,
        `ACCT_COND_STRT_TS` TIMESTAMP(3),
        `ACCT_COND_STOP_TS` TIMESTAMP(3),
        `SRC_ACCT_ID`       STRING,
        `UPDATE_TS`         TIMESTAMP(3),
        `SRC_ACCT_ID2`      STRING,
        `SRC_SYS_CD`        STRING,
        `SRC_ACCT_SYS_CD`   STRING
    ) WITH (
          'connector' = 'kafka',
          'topic' = '**********',
          'properties.bootstrap.servers' = '**********',
          'key.format' = 'avro-confluent',
          'key.fields' = 'ACCT_COND_ID',
          'key.avro-confluent.url' = 'https://**********',
          'key.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
          'key.avro-confluent.basic-auth.user-info' = '**********',
          'value.format' = 'avro-confluent',
          'value.avro-confluent.url' = 'https://**********',
          'value.avro-confluent.basic-auth.credentials-source' = 'USER_INFO',
          'value.avro-confluent.basic-auth.user-info' = '**********',
          'scan.startup.mode' = 'earliest-offset',
          'properties.security.protocol' = 'SASL_SSL',
          'properties.sasl.jaas.config' = '**********'
          );
    
    SELECT *
    FROM account
    LIMIT 10;
    Output: See attached pic
    ✅ 1
    s
    • 2
    • 3
  • a

    Amir Hossein Sharifzadeh

    06/10/2023, 7:31 PM
    Hy guys. Following Flink Joining documentation, I ended up creating my joined_stream on two streams but I get this error:
    Caused by: org.apache.flink.util.FlinkException: Long.MIN_VALUE timestamp: Elements used in interval stream joins need to have timestamps meaningful timestamps.
    at <http://org.apache.flink.streaming.api.operators.co|org.apache.flink.streaming.api.operators.co>.IntervalJoinOperator.processElement(IntervalJoinOperator.java:227)
    at <http://org.apache.flink.streaming.api.operators.co|org.apache.flink.streaming.api.operators.co>.IntervalJoinOperator.processElement1(IntervalJoinOperator.java:196)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordProcessorUtils.lambda$getRecordProcessor1$1(RecordProcessorUtils.java:88)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:254)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
  • a

    Amir Hossein Sharifzadeh

    06/10/2023, 7:33 PM
    Both streams have a common key.
  • a

    Amir Hossein Sharifzadeh

    06/10/2023, 8:41 PM
    I figured what was the issue: I shall assign
    assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy._create_());
    to both streams!
  • g

    guenter hipler

    06/11/2023, 7:30 AM
    Nicht mehr normal Buch Soziologe
  • p

    Parmveer Randhawa

    06/12/2023, 2:37 AM
    Getting an error stating: import pyflink no module named pyflink when submitting it to the cluster
    d
    • 2
    • 1
  • p

    Parmveer Randhawa

    06/12/2023, 2:37 AM
    When ran on command line it works
  • d

    dino bin

    06/12/2023, 8:36 AM
    Hey I have one question How many flinkdep can dose flink_kubernetest_operator have at most? thank you
  • z

    Zin

    06/12/2023, 8:45 AM
    Hello, I have a question about flink k8s operator. I would like to get flink job's jar artifact specified to the jarURI field of FlinkDeployment CRD from jfrog artifactory which requires basic auth. but I couldn't find any documents about that so could anyone please tell me if it's possible and how?
    • 1
    • 3
  • a

    Asish Upadhyay

    06/12/2023, 9:09 AM
    Hey all, I have one doubt.. So we are using flink k8s operator to configure our flink app. We are storing the checkpoint and savepoint info to azure storage. now suppose we delete our application or it goes down and we need to start it from the last savepoint. How can we do that? Any help will be appreciated. If unclear then will be glad to provide more information. Thank you! Cc: @Dheemanth Gowda
    s
    • 2
    • 1
  • u

    吉春

    06/12/2023, 9:27 AM
    Hi, when i submit job on yarn application mode with checkpoint opened and use my UDF,the job will failed and stack is following :
    Copy code
    org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
    	at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94) ~[flink-table-runtime-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:97) ~[flink-table-runtime-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:120) ~[flink-table-runtime-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:51) ~[flink-table-runtime-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:528) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:236) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:160) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:1028) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:56) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:61) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.submitAndGetJobClientFuture(EmbeddedExecutor.java:123) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.deployment.application.executors.EmbeddedExecutor.execute(EmbeddedExecutor.java:104) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2197) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:189) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.table.planner.delegation.DefaultExecutor.executeAsync(DefaultExecutor.java:121) ~[?:?]
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:921) ~[flink-table-api-java-uber-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:883) ~[flink-table-api-java-uber-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:109) ~[flink-table-api-java-uber-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at com.dlink.executor.Executor.executeStatementSet(Executor.java:351) ~[dlink-app-1.17-0.7.0-jar-with-dependencies.jar:?]
    	at com.dlink.executor.Executor.submitStatementSet(Executor.java:367) ~[dlink-app-1.17-0.7.0-jar-with-dependencies.jar:?]
    	at com.dlink.app.flinksql.Submitter.submit(Submitter.java:143) ~[dlink-app-1.17-0.7.0-jar-with-dependencies.jar:?]
    	at com.dlink.app.MainApp.main(MainApp.java:34) ~[dlink-app-1.17-0.7.0-jar-with-dependencies.jar:?]
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_121]
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_121]
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_121]
    	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_121]
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:303) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync$2(ApplicationDispatcherBootstrap.java:256) ~[flink-dist-1.17.0-xxx-RELEASE.jar:1.17.0-xxx-RELEASE]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_121]
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_121]
    	at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:171) ~[flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41) ~[flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
    	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) [flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
    	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48) [flink-rpc-akka_33a72385-51d3-4f53-84cc-668970eb251f.jar:1.17.0-xxx-RELEASE]
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_121]
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_121]
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_121]
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_121]
  • a

    Ari Huttunen

    06/12/2023, 9:39 AM
    We're saving Parquet data to S3 using the flink-s3-fs-hadoop plugin and checkpoints using the flink-s3-fs-presto plugin. We had problems with
    java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
    , and they get solved by adding the following to
    lib
    directory with Ansible, but is this how we're supposed to fix this? Edit: version 1.17.1, and the plugins were copied as instructed by the manual, we're using pyflink.
    Copy code
    flink_dependencies:
      - group_id: org.apache.flink
        artifact_id: flink-sql-parquet
        version: "{{ flink_version }}"
      - group_id: org.apache.hadoop
        artifact_id: hadoop-common
        version: "{{ hadoop_version }}"
      - group_id: org.apache.hadoop
        artifact_id: hadoop-mapreduce-client-core
        version: "{{ hadoop_version }}"
      - group_id: com.fasterxml.woodstox
        artifact_id: woodstox-core
        version: 5.3.0
      - group_id: org.codehaus.woodstox
        artifact_id: stax2-api
        version: 4.2.1
      - group_id: com.google.guava
        artifact_id: guava
        version: 11.0.2
      - group_id: commons-logging
        artifact_id: commons-logging
        version: 1.1.3
    m
    • 2
    • 12
  • z

    Zhang Zhao

    06/12/2023, 10:17 AM
    Hy guys.Can the same flink cluster connect to multiple S3 object storage service clusters? The endpoint, access-key, and secret-key are different. How to do that?
    a
    a
    • 3
    • 3
  • s

    slowratatoskr

    06/12/2023, 2:58 PM
    why can't i use
    SUM0
    in flink sql?
    m
    • 2
    • 21
1...868788...98Latest