https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • v

    Varun Sayal

    10/14/2022, 2:40 PM
    Does anyone have a good solution for adding more sources/processing without having to discard previous state?
    g
    • 2
    • 1
  • v

    Varun Sayal

    10/14/2022, 2:41 PM
    We keep having to do “clean” restarts as we enhance our application
  • k

    Krish Narukulla

    10/14/2022, 5:49 PM
    Does
    FlinkDeployment
    k8s operator support reading from s3/gcs? Is there a plan to support?
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: xxx
      namespace: %{K8S_NAMESPACE}
    spec:
      image:  %{IMAGE_REGISTRY}/dea/xxx:%{IMAGE_TAG}
      flinkVersion: v1_15
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
      job:
        jarURI: <s3://bucket/krish/airstream.jar>
        args: ["--config", "<s3://bucket/pipeline-test.yaml>"]
        parallelism: 2
        upgradeMode: stateless
    s
    s
    +3
    • 6
    • 15
  • a

    Angelo Kastroulis

    10/15/2022, 4:36 AM
    Hi everyone. I’m struggling a bit trying to get Flink SQL/table API to work with an S3 source in either parquet or orc format. I installed the
    flink-s3-fs-hadoop
    plugin
    (v1.15.2) in the plugins folder and also
    flink-shaded-hadoop-2-uber-2.4.1-10.0.jar
    into
    lib
    . But, I can’t get it working. The error I get is :
    Copy code
    Caused by: java.io.IOException: No FileSystem for scheme: s3a
    	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2385)
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
    	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    	at org.apache.orc.impl.ReaderImpl.getFileSystem(ReaderImpl.java:395)
    	at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:368)
    	at org.apache.orc.OrcFile.createReader(OrcFile.java:343)
    	at org.apache.flink.orc.shim.OrcShimV230.createReader(OrcShimV230.java:43)
    	at org.apache.flink.orc.shim.OrcShimV200.createRecordReader(OrcShimV200.java:98)
    	at org.apache.flink.orc.AbstractOrcFileInputFormat.createReader(AbstractOrcFileInputFormat.java:106)
    	at org.apache.flink.orc.AbstractOrcFileInputFormat.createReader(AbstractOrcFileInputFormat.java:52)
    	at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
    	at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
    for reference the parquet version:
    Copy code
    Caused by: java.io.IOException: No FileSystem for scheme: s3a
    	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2385)
    	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2392)
    	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:89)
    	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2431)
    	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2413)
    	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:368)
    	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    	at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38)
    	at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:469)
    	at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:119)
    	at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat.createReader(ParquetVectorizedInputFormat.java:78)
    	at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.checkSplitOrStartNext(FileSourceSplitReader.java:112)
    	at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:65)
    	at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
    	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
    It looks like Orc (Parquet does the same thing) is not using the plugin to load the file system, but rather something else, so it fails. If I remove the hadoop uber jar from the equation (which the docs never mention I need, in fact, over and over again, it’s never even included in examples with the configuration shows with s3-f3 (YARN, kubernetes, docker, plugins, you name it). I get this error:
    Copy code
    java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at org.apache.flink.orc.OrcFileFormatFactory$OrcBulkDecodingFormat.createRuntimeDecoder(OrcFileFormatFactory.java:146) at org.apache.flink.orc.OrcFileFormatFactory$OrcBulkDecodingFormat.createRuntimeDecoder(OrcFileFormatFactory.java:118) at org.apache.flink.connector.file.table.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:157) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:461) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:158) at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:122) at
    I’m just at a loss.
    k
    • 2
    • 5
  • k

    Krish Narukulla

    10/15/2022, 5:00 AM
    I am unable to write parquet data to gcs sink. Looks like my fat jar is having duplicate class.
    Copy code
    CREATE TEMPORARY TABLE table (
            `x` BIGINT,
            `y` STRING,
            `z` STRING
            ) WITH (
            'connector' = 'filesystem',
            'path' = '<gs://xxx-yyy-dev-1/dev/krish>',
            'format' = 'parquet'
     )
    
    
    Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'default' that implement 'org.apache.flink.table.delegation.ExecutorFactory' found in the classpath.
    
    Ambiguous factory classes are:
    
    org.apache.flink.table.planner.delegation.DefaultExecutorFactory
    org.apache.flink.table.planner.loader.DelegateExecutorFactory
            at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:553) ~[flink-table-api-java-uber-1.15.2.jar:1.15.2]
            at org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.lookupExecutor(AbstractStreamTableEnvironmentImpl.java:105) ~[flink-table-api-java-uber-1.15.2.jar:1.15.2]
    a
    • 2
    • 3
  • a

    Avinash

    10/15/2022, 6:30 AM
    Has anyone successfully been able to connect to an installation of the hive metastore to read tables from the hive catalog, in a pyflink program successfully? We’re using hive version 2.3.2, and I’ve been using this bundled jar
    flink-sql-connector-hive-2.3.6
    as mentioned here with no success. I’m facing some issues and would appreciate it if someone can gimme some ideas.
    k
    s
    • 3
    • 5
  • a

    Alex Riedler

    10/15/2022, 10:33 PM
    👋 Hi everyone I was trying to test my custom operators, and I noticed the classes referenced in the docs are no longer in the test-utils? and got moved to be test classes of streaming-java; was this intentional?
    • 1
    • 1
  • v

    Vamshi Gandrapu

    10/16/2022, 2:57 AM
    We are developing a polyglot streaming pipeline with Flink Statefun 3.2.0 and Flink v1.14.3, embedded functions in Java and remote functions in Python using Statefun stream builder api as below.
    Copy code
    StatefulFunctionEgressStreams egresses =
    				StatefulFunctionDataStreamBuilder.builder(Identifiers.LOGS_STREAMS)
    						.withDataStreamAsIngress(logsIngress)
    						.withFunctionProvider(Identifiers.LOGS_INGRESS, fn -> new LogsStatefulFn())
    						.withRequestReplyRemoteFunction(
    								RequestReplyFunctionBuilder.requestReplyFunctionBuilder(Identifiers.LOGS_REMOTE_FN,
    												 URI.create("<http://logs-svc:8000/statefun>"))
    										.withMaxRequestDuration(Duration.ofSeconds(15))
    										.withMaxNumBatchRequests(500))
    						.withEgressId(Identifiers.LOGS_ENGRESS)
    						.withConfiguration(statefunConfig)
    						.build(env);
    Python fn is setup with async RequestReply protocol with grpc/protobuf. We are able to successfully invoke remote fn and send a packed
    org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
    with message, however the pipeline is failing to parse the response sent by remote grpc python fn with an error below. I am expecting the response is inferred to a TypedValue. Not sure what is causing this issue.
    Copy code
    Caused by: java.lang.NoSuchMethodError: 'com.google.protobuf.Parser org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.parser()'                                                                                                              │
    │     at org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClient.parseResponse(DefaultHttpRequestReplyClient.java:76)
  • t

    Thien Nguyen

    10/17/2022, 7:39 AM
    Hi! everyone i have hive metastore and iceberg table exists work with Spark, and now i want using Flink connected to Hivemeta store to query on iceberg-table. where can i config Flink-sql client connect to hive metastore . i user Flink 1.15.2. Thanks all
  • a

    Alagappan Palanisamy

    10/17/2022, 8:36 AM
    haddop data Streaming not working version 1.15.2
  • s

    Slackbot

    10/17/2022, 8:36 AM
    This message was deleted.
    c
    a
    • 3
    • 2
  • a

    Alagappan Palanisamy

    10/17/2022, 8:37 AM
    can you give me any solution?
  • t

    Tiansu Yu

    10/17/2022, 8:43 AM
    com.amazonaws.handlers.*
    in
    flink-s3-fs-hadoop-1.13.2-jar
    shadows my own
    com.amazonaws.handlers
    . Details in 🧵
    • 1
    • 1
  • a

    Ameenur Rahman

    10/17/2022, 10:59 AM
    Copy code
    package org.apache.flink.streaming.examples.wordcount.util;
    
    //import com.bidsopt.gpb.adresponse.AdResponse;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.functions.RichMapFunction;
    import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    //import com.bidsopt.gpb.*;
    import com.twitter.elephantbird.mapreduce.input.LzoTextInputFormat;
    
    
    
    /**
     * Implements a word count which takes the input file and counts the number of occurrences of each
     * word in the file and writes the result back to disk.
     *
     * <p>This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to common
     * Java types for better usage in a Flink job and how to use Hadoop Output Formats.
     */
    @SuppressWarnings("serial")
    public class FileWordCountHadoop {
    
        public static void main(String[] args) throws Exception {
    
           
            final String inputPath = "<hdfs://localhost:9000/notes>";
            final String outputPath = "<hdfs://localhost:9000/notes_out>";
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            Job job = Job.getInstance();
            HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(
                    new TextInputFormat(),
                    LongWritable.class,
                    Text.class,
                    job);
            TextInputFormat.addInputPath(job, new Path(inputPath));
            // Create a Flink job with it
            DataStream<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
            
            text.print();
            env.execute("Word Count");
        }
    }
    as u can see from the above code i used env.createInput(hadoopInputFormat) but continuous file processing does not happen here for hadoop input format pls help with a solution
    m
    • 2
    • 10
  • m

    M Harsha

    10/17/2022, 11:38 AM
    Hi all, Was trying to bring up a flink(latest docker image) cluster on local Kubernetes(minikube ) picked up the resource definitions from official documentation - here Deployed it using the following commands
    Copy code
    $ kubectl create -f flink-configuration-configmap.yaml
        $ kubectl create -f jobmanager-service.yaml
       
        $ kubectl create -f jobmanager-session-deployment-ha.yaml
        $ kubectl create -f taskmanager-session-deployment.yaml
    The taskmanager pods keep restarting with the following log
    Copy code
    INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Could not resolve ResourceManager address <akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@flink-jobmanager:6123/user/rpc/resourcemanager_>.
    Tried a curl from the taskmanager pod
    Copy code
    flink@flink-taskmanager-76cb7df9d6-jl5dk:~$ curl flink-jobmanager:6123
    curl: (52) Empty reply from server
    Upon checking the logs at the job manager, found this:
    Copy code
    2022-10-17 11:34:50,695 ERROR akka.remote.EndpointWriter [] - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<akka.tcp://flink@flink-jobmanager:6123/]]> arriving at [<akka.tcp://flink@flink-jobmanager:6123>] inbound addresses are [<akka.tcp://flink@172.17.0.3:6123>]
    2022-10-17 11:35:03,582 ERROR akka.remote.EndpointWriter [] - dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient [Actor[<akka.tcp://flink@flink-jobmanager:6123/]]> arriving at [<akka.tcp://flink@flink-jobmanager:6123>] inbound addresses are [<akka.tcp://flink@172.17.0.3:6123>]
    Is any config missing? This issue seems to be same as FLINK-24031 Any workaround for this issue?
    g
    m
    • 3
    • 5
  • m

    M Harsha

    10/17/2022, 4:39 PM
    Hi all, I was trying to load test flink with 2 taskmanagers The job that I'm running consumes messages from Kafka I'm loading the Kafka topic with around 80k messages per second The Flink task that consumes messages from Kafka errors out after a while with OutOfMemory error, and the entire task manager crashes The task is then spawn on the other task manager, on which the same behaviour is seen Ideally, just the thread is supposed to be killed and not the task manager right? Any idea why this is happening?
    m
    d
    +3
    • 6
    • 15
  • k

    Kevin Lam

    10/17/2022, 5:16 PM
    Hi all, What do folks do when it comes to generic exception handling? Specifically, I'm investigating what it might take to report the exceptions that appear in the Exceptions tab of the Flink UI to some observability tool like bugsnag.
    e
    • 2
    • 1
  • a

    Abel Lamjiri

    10/17/2022, 6:08 PM
    Question on RocksDb backend / FlinkOperator in AWS EKS: Error:
    Copy code
    The application contains no execute() calls.
    When we switch to use RocksDb as state backend, using following Flink Configs:
    Copy code
    flinkConfiguration:
        state.backend: rocksdb
        state.backend.rocksdb.localdir: file:///data/rocksdb-hvf
        state.backend.incremental: "true"
        state.backend.rocksdb.timer-service.factory: heap
    we get the above Exception The following is part of volume mount and PVC:
    Copy code
    spec:
          serviceAccount: <%= var['serviceaccount'] %>
          containers:
            # Do not change the main container name
            - name: flink-main-container
              env:
                - name: ENABLE_BUILT_IN_PLUGINS
                  value: flink-s3-fs-presto-1.15.2.jar
              volumeMounts:
                - name: data
                  mountPath: /data
          volumes:
            - name: data
              persistentVolumeClaim:
                claimName: filesystem-pvc
    Stack Trace:
    Copy code
    2022-10-14T20:25:03.606223513Z Caused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: The application contains no execute() calls.
    2022-10-14T20:25:03.606226983Z 	... 14 more
    2022-10-14T20:25:03.607635319Z 2022-10-14 20:25:03,607 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
    2022-10-14T20:25:03.607650643Z java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: The application contains no execute() calls.
    ✅ 1
    d
    • 2
    • 4
  • a

    Abel Lamjiri

    10/17/2022, 6:22 PM
    Question on format for specifying an S3 bucket for checkpointing data: Is this the correct format for specifying a bucket:
    Copy code
    <s3p://pocstreamingstack-some-bluh-bluh-5kxj/checkpoint>
    Or, do I need to specify region, and aws endpoint as well? With the above, I’m getting
    Forbidden (Service: Amazon S3; Status Code: 403
    error.
    ✅ 1
    s
    • 2
    • 4
  • s

    Steven Zhang

    10/17/2022, 6:34 PM
    Hi, I'm trying to run through the flink sql runner example from the operator repo https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example except I'm trying to get it to work in session mode. When I create the flink session CRDS, they get stuck in UPGRADING and when I describe them, I see
    Copy code
    Status:
      Error:  java.io.FileNotFoundException: /opt/flink/usrlib/sql-runner.jar (No such file or directory)
    I
    kubectl exec
    into the deployment's pod and see the jar file in the specified path, so I'm not sure what I'm missing
    j
    • 2
    • 5
  • y

    Yaroslav Bezruchenko

    10/17/2022, 7:55 PM
    Hey, can someone help me with next task: I need to have unique id generator with guarantee for zero collisions (across all workers, but in one operator). Any ideas of how to approach it? I have done Snowflake Id generator previously, but it requires instance id to be initialized. Can we somehow get id of current worker, ideally sequential? P.S.: I can't use UUID, as our target IDs are Longs
  • j

    Jin S

    10/17/2022, 8:40 PM
    Hi all :) We’re looking at upgrading to Flink 1.15. Flink 1.15 now uses Kafka Client 2.8.1 by default https://issues.apache.org/jira/browse/FLINK-24765 Kafka 2.8.1 seems to work with Scala >2.12.13. https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.12/2.8.1 But Flink compiles with Scala 2.12.7. Does that mean if we’re using Flink with Scala, the Kafka Client will fall back on v2.4.1? I understand Scala API in Flink is not really the focus anymore from tickets like the following. https://issues.apache.org/jira/browse/FLINK-13414 But I wonder if Flink+Scala 2.12.15 will be looked into? (saw a draft PR linked to the ticket https://github.com/apache/flink/pull/19634 ) Thank you.
    k
    m
    • 3
    • 5
  • x

    Xi Cheng

    10/18/2022, 1:30 AM
    hey, may need some clarifications on the FlinkSQL
    Grouping Window
    Trigger frequency: for the
    SESSION(time_attr, interval)
    grouping window function https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-agg/#group-window-aggregation, it seems that the evaluation is only triggered upon window expiration, is that the case? it seems that https://issues.apache.org/jira/browse/FLINK-16868 custom triggers were not supported
  • s

    Sachin Saikrishna Manikandan

    10/18/2022, 6:41 AM
    Hello team, we are using Flink in our production deployment and are using the MySQL JDBC connector as a lookup join source. We have an incoming stream of around 50K messages per second, which we join with the mysql table to filter out the stream. When inspecting the Flink flamegraph UI, we are completely busy on the task that calls the lookup join, where the socket.read() calls take the highest time. This doesn't happen in other environments where the load is comparatively less, which is around 30K. Could this be a genuine load issue? If so, are there optimizations that we can apply. We have enabled lookup cache as well, along with TTL.
    k
    a
    +2
    • 5
    • 14
  • m

    Maher Turifi

    10/18/2022, 12:12 PM
    Hi, want to ask about using (scala or java code) in my PyFlink application. I'm using Kinesis Data Analytics on Flink to run my application, my application is written in Python. I'm tring to do Processing Time Temporal Join, and with the flink version supported in aws (1.13.2) I have to use temporal table function syntax as following:
    Copy code
    SELECT 
      o_amount, r_rate
    FROM
      Orders,  LATERAL TABLE (Rates(o_proctime))
    WHERE
      r_currency = o_currency
    Assuming Rates is a temporal table function, which can only be created in SCALA or JAVA as ".createTemporalTableFunction()" is still not supported in Python API, then it will register the function in Table API. I've created the Rates /the temporal table function/ in Scala and register it and now I'm able to use it in SQL(as in the above statement). I did run all of the statements in Zeppelin Notebook where I can mix different APIs(where I'm using SQL to create the dynamic tables, Scala to create the temporal table function, and Python for creating the UDFs) However my Question is: Can you give me any guidance on how to create this temporal table function (scala code) and use it in my pyflink application in kinesis. can I call the scala environment inside pyFlink application? I will appreciate any guidance or links or instructions, many thanks.
    x
    • 2
    • 7
  • s

    Sachin Saikrishna Manikandan

    10/18/2022, 12:55 PM
    What is the recommended way to enrich an incoming Kafka stream with records from a MySQL table? Is it better to join on the primary key and create a temporary view, or are there other alternatives?
    r
    • 2
    • 2
  • m

    M Harsha

    10/18/2022, 1:12 PM
    Hi all, Is there a way in which a job submitted via the SQL client can be restarted from the savepoint. Didn't find it in the official documentation Tried this but didn't work - https://stackoverflow.com/questions/66696413/flink-job-submitted-from-sql-client-sh-how-to-resume-from-savepoint Any leads on how to achieve this?
    m
    • 2
    • 2
  • d

    Denis Cosmin

    10/18/2022, 1:22 PM
    Hello everyone! Does the Flink operator support application parameters for jobs? I have the below yaml file and I want to pass
    --kafka_group_id test
    to the job
    Copy code
    job:
        jarURI: local:///opt/flink/usrlib/my-flink-job.jar
        state: running
        parallelism: 2
        upgradeMode: last-state
    ✅ 1
    c
    • 2
    • 2
  • k

    Kosta Sovaridis

    10/18/2022, 2:18 PM
    Hi everyone, I am having issues trying to use the avro & avro confluence registry connectors, it seems like the dependencies are not found inside my docker file. It works locally but once I try to dockerize it and run I keep getting class not found errors. Here is my gradle file:
    Copy code
    // ...
    configurations {
        flinkShadowJar // dependencies which go into the shadowJar
        // always exclude these (also from transitive dependencies) since they are provided by Flink
        flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
        flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
        flinkShadowJar.exclude group: 'org.slf4j'
        flinkShadowJar.exclude group: 'org.apache.logging.log4j'
    }
    
    
    dependencies {
        implementation "org.apache.flink:flink-table-api-java-bridge:${flinkVersion}"
        implementation "org.apache.flink:flink-table-api-java:${flinkVersion}"
        implementation "org.apache.flink:flink-clients:${flinkVersion}"
        // allows using Flink's web UI when running in the IDE:
        implementation "org.apache.flink:flink-runtime-web:${flinkVersion}"
        implementation "org.apache.flink:flink-table-planner-loader:${flinkVersion}"
        implementation "org.apache.flink:flink-table-runtime:${flinkVersion}"
    
        implementation group: 'org.postgresql', name: 'postgresql', version: '42.5.0'
    
        compileOnly "org.projectlombok:lombok:${lombokVersion}"
        annotationProcessor "org.projectlombok:lombok:${lombokVersion}"
    
        // --------------------------------------------------------------
        // Dependencies that should be part of the shadow jar, e.g.
        // connectors. These must be in the flinkShadowJar configuration!
        // --------------------------------------------------------------
        flinkShadowJar "org.apache.flink:flink-connector-kafka:${flinkVersion}"
        flinkShadowJar "org.apache.flink:flink-connector-jdbc:${flinkVersion}"
        flinkShadowJar "org.apache.flink:flink-avro-confluent-registry:${flinkVersion}"
        flinkShadowJar "org.apache.flink:flink-avro:${flinkVersion}"
    
    
        runtimeOnly "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
        runtimeOnly "org.apache.logging.log4j:log4j-api:${log4jVersion}"
        runtimeOnly "org.apache.logging.log4j:log4j-core:${log4jVersion}"
    
        testImplementation "org.apache.flink:flink-test-utils:${flinkVersion}"
        testImplementation "org.apache.flink:flink-table-test-utils:${flinkVersion}"
        testImplementation 'org.hamcrest:hamcrest-all:1.3'
        testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.1'
        testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.1'
    
        testCompileOnly "org.projectlombok:lombok:${lombokVersion}"
        testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
    }
    // ...
    Dockerfile:
    Copy code
    FROM gradle:7.4.2-jdk11 AS builder
    
    COPY ./build.gradle /opt/build.gradle
    COPY ./settings.gradle /opt/settings.gradle
    COPY ./src /opt/src
    RUN cd /opt; gradle installShadowDist;
    
    FROM apache/flink:1.15.0-java11
    
    RUN mkdir /opt/flink/usrlib
    
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.0/flink-sql-connector-kafka-1.15.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.0/flink-connector-jdbc-1.15.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro/1.15.0/flink-avro-1.15.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.15.0/flink-avro-confluent-registry-1.15.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/org/apache/avro/avro/1.10.0/avro-1.10.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://jdbc.postgresql.org/download/postgresql-42.5.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-databind/2.11.0/jackson-databind-2.11.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-core/2.11.0/jackson-core-2.11.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/com/fasterxml/jackson/core/jackson-annotations/2.11.0/jackson-annotations-2.11.0.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/6.2.2/kafka-schema-registry-client-6.2.2.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://repo.maven.apache.org/maven2/com/google/guava/guava/30.1-jre/guava-30.1-jre.jar> /opt/flink/lib/
    ADD --chown=flink:flink <https://packages.confluent.io/maven/org/apache/kafka/kafka-clients/6.2.2-ccs/kafka-clients-6.2.2-ccs.jar> /opt/flink/lib/
    
    COPY --from=builder /opt/build/libs/flink*.jar /opt/flink/usrlib/spend-report.jar
    Error:
    Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.formats.avro.AvroRowDataDeserializationSchema
    ClassLoader info: URL ClassLoader:
    file: 'usrlib/spend-report.jar' (missing)
    Class not resolvable through given classloader.
    I am really not sure what I am doing wrong, also I am having issues understanding why I need to declare the dependency in gradle as well as downloading it in the container.
    m
    k
    • 3
    • 4
  • t

    Tim Bauer

    10/18/2022, 3:43 PM
    Question on Table API Window Triggers. I'm trying out a simple aggregation: counting rows in a sliding window 1 minutes every 30 seconds. Watermark is set to
    rowtime - INTERVAL '10' SECOND
    . My understanding is that each window should be triggered 10 seconds after it ends. When I start my application connected to a local kafka with a dummy event producer triggered every 10s, nothing happens, not after 1:10 and not later either. 😖
    m
    • 2
    • 10
1...242526...98Latest