https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Richard Noble

    04/05/2023, 11:22 AM
    Any ideas?
  • p

    piby 180

    04/05/2023, 11:49 AM
    Hi, I am trying to run a simple wordcount example using Flink + Beam on kubernetes using Flink K8s Operator. I am getting this error and don't know what it means
    org.apache.flink.client.deployment.application.ApplicationExecutionException: The application contains no execute() calls.
    Could someone provide me a hint?
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: beamapp
    spec:
      image: beamapp:latest
      imagePullPolicy: Always
      flinkVersion: v1_15
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "1"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
      job:
        jarURI: local:///opt/flink/opt/flink-python_2.12-1.16.1.jar # Note, this jarURI is actually a placeholder
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/main.py"]
        parallelism: 1
        upgradeMode: stateless
    main.py
    Copy code
    import re
    import apache_beam as beam
    from apache_beam.options.pipeline_options import PipelineOptions
    
    input_file = '/opt/flink/usrlib/kinglear.txt'
    output_path = '/opt/flink/usrlib/output.txt'
    
    pipeline_options = PipelineOptions(
        runner='FlinkRunner',
        project='my-project-id',
        job_name='unique-job-name',
    )
    
    def main():
        with beam.Pipeline(options=pipeline_options) as p:
            output = (p | 'Read lines' >> beam.io.ReadFromText(input_file)
                  | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  | 'Combine per element' >> beam.combiners.Count.PerElement()
                  | 'Convert to string' >> beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
                  | 'Write output to file' >> beam.io.WriteToText(output_path))
      
      
      
    main()
    Dockerfile
    Copy code
    FROM flink:1.15
    
    # install python3: it has updated Python to 3.9 in Debian 11 and so install Python 3.7 from source, \
    # it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.
    
    RUN apt-get update -y && \
    apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev libffi-dev && \
    wget <https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz> && \
    tar -xvf Python-3.7.9.tgz && \
    cd Python-3.7.9 && \
    ./configure --without-tests --enable-shared && \
    make -j6 && \
    make install && \
    ldconfig /usr/local/lib && \
    cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
    ln -s /usr/local/bin/python3 /usr/local/bin/python && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*
    
    # Install dependencies
    COPY requirements.txt requirements.txt
    RUN pip3 install -r requirements.txt
    
    # add python script
    USER flink
    RUN mkdir /opt/flink/usrlib
    RUN mkdir /opt/flink/usrlib/src
    COPY . /opt/flink/usrlib/
    requirements.txt
    Copy code
    apache-flink==1.15
    apache_beam==2.27.0
    d
    s
    • 3
    • 3
  • r

    Raghunadh Nittala

    04/05/2023, 2:02 PM
    Hi Everyone I’m using ‘filesystem’ connector to sink data into S3 in ‘parquet’ format using TableAPI. What I observed is the partitionedBy columns are missing in the parquet file. Here are the queries I’m using:
    CREATE TABLE data_to_sink (
    record_id STRING NOT NULL,
    request_id STRING NOT NULL,
    source_name STRING NOT NULL,
    event_type STRING NOT NULL,
    event_name STRING NOT NULL,
    ``date` STRING,`
    results_count BIGINT
    `) PARTITIONED BY (record_id, source_name,
    date
    ) WITH (`
    'connector' = 'filesystem',
    'path' = '<S3 path>',
    'format' = 'parquet'
    );
    INSERT INTO data_to_sink
    SELECT record_id, request_id, source_name, event_type, event_name,
    DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '2' MINUTE), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
    FROM data_from_source
    GROUP BY record_id, request_id, source_name, event_type, event_name, TUMBLE(proc_time, INTERVAL '2' MINUTE);
    I can see the parquet files being created, but when I verified the schema using parquet-cli tool, the schema doesn’t show record_id, source_name,
    date
    fields. I verified the doc, but didn’t find any setting for this. Is this expected?
  • m

    Mehul Batra

    04/05/2023, 4:06 PM
    Hi Team we faced below error, Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 15 record(s) for topic-4:120000 ms has passed since batch creation Will flink retry to send those events or it just drops them and move forward?
    • 1
    • 1
  • d

    Dominik Prester

    04/05/2023, 6:47 PM
    Hello friends, I have a timestamp in zulu format (e.g.
    2023-04-05T11:40:00Z
    ) stored as a STRING column and I tried casting it to TIMESTAMP type with: •
    TO_TIMESTAMP(<col_name>, '%Y-%m-%dT%H:%M:%SZ')
    •
    TO_TIMESTAMP(<col_name>, 'yyyy-MM-ddTHH:mm:ssZ')
    but both result in NULL values. What am I missing?
    j
    • 2
    • 1
  • i

    Ivan Webber

    04/05/2023, 9:06 PM
    I am trying to use Flink 1.17.0 to write to ADLS Gen 2. I've tried building the container for the latest code in the release-1.17 branch and using the 1.17 container and just replacing the Azure plugin jar with my build to include the fix for FLINK-31612. I am getting the following error and wondering if anyone could point me in the right direction:
    Copy code
    org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
    The full stack trace (see my reply) makes me wonder if it is trying to allocate a local temporary file and failing. I am using the flink-operator and running on an AKS cluster.
    m
    • 2
    • 16
  • c

    craig fan

    04/05/2023, 9:11 PM
    Hello, more of some generic questions: • What happens if you change the source operator from Flink Kafka Consumer to KafkaSource? How does Flink handle this? We did not change the name or UID of the operator ◦ Asking cause I was playing with some of our Checkpoints and noticed what appears to be different naming conventions for different sources ◦ https://github.com/apache/flink/blob/d4e3b6646f389eeb395a4dbb951d13bab02cb8db/flin[…]va/org/apache/flink/streaming/api/operators/SourceOperator.java ◦ https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka[…]he/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java • Does changing versions affect anything related to this?
  • h

    Herat Acharya

    04/06/2023, 12:22 AM
    Hello , Is there any documentation on how to setup reverse proxy using NGINX to access flink rest api .. I have my flink in kubernetes and i want to do something like this https://stackoverflow.com/questions/56965460/how-implement-reverse-proxy-on-nginx-to-add-authentication-on-flink-job-manager are there any examples
  • k

    kingsathurthi

    04/06/2023, 6:14 AM
    📌What is the best practice for having Istio proxy in the flink kubernetes operator? because facing lot of issues due to istio proxy container. Only disabling the isitio injection makes application working fine. Is there any specific flink ports to make exclude(inbound/outbound) in the istio proxy?
  • a

    Ari Huttunen

    04/06/2023, 7:47 AM
    I see that for the streaming file sink you can
    Copy code
    OutputFileConfig config = OutputFileConfig
     .builder()
     .withPartPrefix("prefix")
     .withPartSuffix(".ext")
     .build();
    but I have written the file sink using SQL syntax. How can I make this generate files with suffix
    .parquet
    ?
    Copy code
    CREATE TABLE save_polystar_aggregates_core (
    ...
    ) 
    PARTITIONED BY (EVENT_DAY, EVENT_HOUR)
    WITH (
      'connector' = 'filesystem',
      'path' = 's3://.../core',
      'format' = 'parquet',
      'sink.partition-commit.delay'='1 h',
      'sink.partition-commit.policy.kind'='success-file',
      'sink.partition-commit.trigger'='partition-time'
    );
    Also, can I change the directory names from
    Copy code
    path
    └── EVENT_DAY=2019-08-25
    syntax to
    Copy code
    path
    └── 2019-08-25
    ?
    s
    d
    m
    • 4
    • 14
  • s

    Slackbot

    04/06/2023, 10:34 AM
    This message was deleted.
    s
    • 2
    • 2
  • t

    Tsering

    04/06/2023, 1:01 PM
    Good Morning, Afternoon and Evening! my Flink app is working fine with i use single parallelism but when i run it with more than one it throwing this exception
    key group from 0 to 6 does not contain 120.........
    is there any one who know what exactly is happening ? Thank you in advance 🙏.
    d
    • 2
    • 3
  • k

    Krzysztof Chmielewski

    04/06/2023, 2:56 PM
    Hi, Were there any specific tickets/changes for memory footprint optimization between Flink 1.13 - and 1.16.1? There is a user reporting a case, where he observed an OOM (expected) on both versions but on 1.16.1 OOM happens much later compared to 1.13. Its for streaming job. thx
  • f

    Felix Terkhorn

    04/06/2023, 3:05 PM
    Hi, this is more of a general sanity check than a specific bug or small-scale problem. Hopefully I have the right channel! We are considering using Flink’s
    Timer
    capabilities in such a way: (if our project is successful) we’ll have potentially billions of individual states that may need to use
    processElement
    /
    onTimer
    to schedule events into the future. Potentially the far future — like, a year into the future. Is Flink considered reliable for this type of long-term, delayed processing? Running a proof of concept with delays of a few seconds was super easy and successful, and on my local machine I pushed a ton of events through. But testing these sorts of very long term delays isn’t feasible, so we’re really interested to hear feedback from the community about when (in time), and if, such a
    Timer
    usage would begin to break down. How does Flink perform when scheduling things over multiple days & months? Are there additional best practices and considerations that we should adhere to if we’re using
    Timer
    s that are dated for execution far into the future? Are there any concerns around prioritization of the `Timer`s firing at their due date, in the case where individual jobs experience high levels of traffic & contention? Thanks very much for taking a look at my question!
  • e

    Elizaveta Batanina

    04/06/2023, 3:34 PM
    Hi! I have encountered some weird error when using flink udf in schema builder method
    column_by_expression
    :
    Copy code
    org.apache.flink.table.api.TableException: Expression 'parse_bq_datetime(window_start)' is not string serializable. Currently, only expressions that originated from a SQL expression have a well-defined string representation.
    Have anyone faced this problem? I will add more details in thread Thanks!
    d
    • 2
    • 5
  • n

    Nancy Yang

    04/06/2023, 3:47 PM
    Hi, I am having issue with writing checkpoints data out to volume on k3d/k3s. I am using this example yaml file. https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-checkpoint-ha.yaml The problem is with the "volumes" part, it complaint /tmp/flink is not a directory. If I commented "type: Directory". Then the flink app started to run but failed with the following error?
    Copy code
    Caused by: java.io.IOException: Failed to create directory for shared state: file:/flink-data/checkpoints/0fd46de24a2483b788f329c179239967/shared
    If I access the pod
    k3d-k3s-default-server-0
    , I do can see the "/tmp/flink" created. Have anybody met the same issue when deploying to k3d/k3s cluster running in docker? Thanks!
  • a

    Adam Augusta

    04/06/2023, 7:20 PM
    I get the impression that a udf AggregationFunction will never retract rows, correct? Even if all records are retracted from the group. So if I want even a scalar streaming aggregation to behave “correctly” in this sense, I’ll need a TableAggregationFunction, yeah?
  • j

    Jalil Alchy

    04/06/2023, 8:49 PM
    Hey folks, Currently seeing this kind of an error:
    Copy code
    Sink: Committer (1/1)#0 (0c5abffd2f065e1edef3036b20ec42a5) switched from RUNNING to FAILED with failure cause: java.nio.file.AccessDeniedException: <path>/part-8ad796e1-3c85-4b54-9eed-eaaf06621185-0.snappy.parquet: initiate MultiPartUpload on <path>/part-8ad796e1-3c85-4b54-9eed-eaaf06621185-0.snappy.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; Request ID: KC4SFHVXBGJC6XFW; S3 Extended Request ID: VQXppEZpXzKyDoc7u32AHkMAr608cMLxLOh4QIWwZVWIg0JBYgkMTxAT4M6+xmyVrzmMmnyMcUucHlBZmFBzSs/GrYJ+LmmlADwr4wjTjDs=; Proxy: null), S3 Extended Request ID: VQXppEZpXzKyDoc7u32AHkMAr608cMLxLOh4QIWwZVWIg0JBYgkMTxAT4M6+xmyVrzmMmnyMcUucHlBZmFBzSs/GrYJ+LmmlADwr4wjTjDs=:AccessDenied
    My sink is configured like so:
    Copy code
    return DeltaSink.forRowData(
                            new Path("s3a://<bucket/<path>"),
                            new Configuration() {
                                {
                                    set(
                                            "spark.hadoop.fs.s3a.aws.credentials.provider",
                                            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain");
                                }
                            },
                            FULL_SCHEMA_ROW_TYPE)
                    .withMergeSchema(true)
                    .build();
    Writes to my local work, but writes to S3 are failing. The credentials on the machine are admin for the account. Any thoughts?
    j
    k
    • 3
    • 60
  • d

    Dmitry Koudryavtsev

    04/07/2023, 9:32 AM
    Hi! We got following exception:
    Copy code
    java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore
       at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
       at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
       at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
       at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: org.apache.flink.util.FlinkRuntimeException: Could not retrieve JobResults of globally-terminated jobs from JobResultStore
       at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:196)
       at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198)
       at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188)
       ... 4 more
    Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: No content to map due to end-of-input
     at [Source: (org.apache.flink.core.fs.local.LocalDataInputStream); line: 1, column: 0]
       at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:59)
       at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4688)
       at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
       at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
       at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:190)
       at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118)
       at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100)
       at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194)
       ... 6 more
    We removed empty
    ..._DIRTY.json
    file from
    /flink/recovery/job-result-store/<our cluster ID>/
    dir (local filesystem) and flink successfully starts. Flink:
    1.15.3
    Is it possible to validate dirty job results file
    ..._DIRTY.json
    file length/content before opening and/or overwrite it automatically if file is broken ?
  • m

    Max Dubinin

    04/07/2023, 10:59 AM
    Hey guys, is there an example or a clear explanation anywhere of streaming POJOs to Parquet without Avro?
    d
    • 2
    • 5
  • n

    nick christidis

    04/07/2023, 2:20 PM
    Hello, apologies in advance for the lengthy question. ---------------------------------------------------- I have a problem and I need some opinions on how to deal with it, the least intrusive, the better. I have some re-design ideas but I would like to avoid those, as they have impact and most importantly there are other political reasons which will produce problem if I push for re-design. ---------------------------------------------------- Intro to the problem So in our business need, we wanted the KafkaSource to read from a topic if a predicate gets satisfied, and that predicate can change (true/false) during the run of the job. For example in pseudo code:
    Copy code
    void kafkaConsumeLogic() {
    
      kafkaSource.run();
      boolean kafkaSourcePaused = false;
    
      boolean okState;
    
      while (true) {
        
        okState = areWeInDesiredState();
        
        if (!okState && !kafkaSourcePaused) {
          kafkaSource.pause();
          kafkaSourcePaused = true;
        }
    
        if (okState && kafkaSourcePaused) {
          kafkaSource.resume();
          kafkaSourcePaused = false;
        }
    
        paceWait();
      }
    
    }
    So, we have achieved the above by leveraging HybridSource (https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/hybridsource/) and using the first registered source as a predicate-check Bounded source which: • it will check for that areWeInDesiredState() predicate/condition, and if all ok.... • ...then in the SourceReader it provides, in the pollNext method, will return -> InputStatus.END_OF_INPUT, so the next registered source will take over which is the KafkaSource. So the above pseudo code has been implemented like the following:
    Copy code
    class PredicateSourceReader<T> implements SourceReader<T, MySourceSplit> {
    
       ....
    
        @Override
        public InputStatus pollNext(ReaderOutput<T> output) {
            if (areWeInDesiredState()) {
                return InputStatus.END_OF_INPUT;
            }
            return InputStatus.MORE_AVAILABLE;
        }
    
      .....
    
    }
    and the wiring:
    Copy code
    PredicateSourceReader<SomeType> sourceActivator = new PredicateSourceReader<>(....);
    Source<SomeType> initialSource = new Source<>(sourceActivator);
    
    KafkaSource<SomeType> kafkaSource = buildKafkaSource(....);
    
    HybridSource<SomeType> hybridsource = HybridSource
                    .builder(initialSource)
                    .addSource(context -> kafkaSource, Boundedness.CONTINUOUS_UNBOUNDED)
                    .build();
    ---------------------------------------------------- Problem Unfortunately the above is not the most elegant + correct usage of HybridSource in my eyes, but the most important is that it does not work in the following case - which where I need some opinions or if I miss something: So, based on the above description if we satisfy the condition/predicate, we switch to kafka source, but there is another business need we need to satisfy, where for example when we identify another bad condition/predicate we fail on purpose with exception, and due to restart strategy (exponential) and failover strategy (region) this hybridSource gets restarted...which means due to re-init again, it should go again to first initialSource and kafkaSource should not run, but this is not the case, as we see kafka consumption still taking place. I have some ideas on my mind, but first I want to be sure, why after region restart, the kafkaSource is still open and not closed. Also I tried to extend KafkaSourceReader and in the pollNext method to wire there the logic, but KafkaSource is very closed to extend and only accessible through KafkaSourceBuilder which from framework user point of view does not give a lot of options.
  • a

    Adesh Dsilva

    04/07/2023, 5:27 PM
    Hi I am running into an index out of bounds issue reported here: https://stackoverflow.com/questions/65342630/flink-sql-read-hive-table-throw-java-lang-arrayindexoutofboundsexception-1024/65351402#65351402 Has this been fixed? If yes how can I configure it in ORC reader? This is my reader code:
    Copy code
    tableEnv.createTemporaryTable("myTable", TableDescriptor.forConnector("filesystem")
            .schema(schema)
            .option("path", "path-to-file.orc")
            .format("orc")
            .build());
    Exception I get:
    Copy code
    Caused by: java.lang.ArrayIndexOutOfBoundsException: 1024
    	at org.apache.orc.impl.TreeReaderFactory$TreeReader.nextVector(TreeReaderFactory.java:255)
    	at org.apache.orc.impl.TreeReaderFactory$DoubleTreeReader.nextVector(TreeReaderFactory.java:762)
    	at org.apache.orc.impl.ConvertTreeReaderFactory$DecimalFromDoubleTreeReader.nextVector(ConvertTreeReaderFactory.java:1297)
    • 1
    • 4
  • a

    Adam Augusta

    04/07/2023, 7:56 PM
    The reference docs and javadocs both state that my TableAggregateFunction needs emitValue or emitUpdateWithRetract. But if I implement the latter and not the former, I get a validation error. If I implement both, only emitValue is used. Poking around the Flink source code, ImperativeAggCodeGen seems agnostic to emitUpdateWithRetract. Alarmingly, there’s not a single Flink test that uses a class with a emitUpdateWithRetract method. Can anyone shed light on my confusion?
    m
    • 2
    • 2
  • d

    Duc Anh Khu

    04/07/2023, 10:19 PM
    hi, I'm using PyFlink 1.13.2 and running into some odd issue with
    KeyedProcessFunction
    in unit tests. When unit tests the function on its own (
    from_collection
    source and test sink), everything works fine. However, when it gets mixed into a bigger program, only
    process_element
    is being called and
    on_timer
    is not being called at all. The sources and sink in these 2 cases are exactly the same (
    from_collection
    and test sink). What are the possible causes of
    on_timer
    not being called? Update: In a bigger program with 2
    KeyedProcessFunction
    , Only the 1st function
    on_timer
    is called. The 2nd function is not.
    d
    • 2
    • 10
  • a

    Alex Brekken

    04/08/2023, 1:56 PM
    Hey all, I’m trying out the K8s operator and trying to deploy a simple job. For now, I’ve deployed the “basic-session-deployment” (https://github.com/apache/flink-kubernetes-operator/blob/main/examples/basic-session-deployment-only.yaml) and then uploaded the jar file for my job using the “Submit new job” section of the webUI. However, I expected to now see that jar file in the filesystem of the deployment pod but I can’t find it…. My next step was to deploy the job (using the FlinkSessionJob CRD) and reference the location of my jar file using the
    jarURI
    . So first question, any ideas where I should be looking for my jar file? And second, is this the right way to handle deployments? I’ll be deploying several jobs eventually, and assumed I would want to separate the FlinkDeployment from the actual jobs. (but maybe that’s not the best way?)
    m
    m
    +2
    • 5
    • 20
  • m

    Maksim Aniskov

    04/08/2023, 7:19 PM
    Hi all, Why for my Kafka consumer I see only records-lag-max metric. Where is records-lag metric?
  • t

    Tsering

    04/09/2023, 12:55 PM
    Hi All! I am running my Flink app on AWS KDA and there is no problem when running with one KPU but when I try run my app with more than one KPU then i am facing this error
    Copy code
    java.lang.IllegalArgumentException: key group from 0 to 6 does not contain 120
    at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
    at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
    at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
    at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
    at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
    at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
    at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:922)
    please let me know if you have any idea what’s going on here? I have been stuck here for a few days but literally found no clue.
    h
    d
    • 3
    • 7
  • g

    Guruguha Marur Sreenivasa

    04/09/2023, 7:10 PM
    Hi all, I'm having issues checking my Flink job logs both on the dashboard and on Datadog where logs are exported. When I click on the task manager's log, I see this error printed on datadog:
    Copy code
    2023-04-09 19:08:29,483 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler [] - Failed to transfer file from TaskExecutor 100.80.157.200:6122-0224ee.
    I'm using Flink 1.14.3. Any idea why this happens?
  • p

    piby 180

    04/09/2023, 10:58 PM
    Hi, I am struggling to use S3 as FileSink locally (in python). I have added the jar list in my code as follows:
    Copy code
    jar_list = """
        file:///home/ubuntu/environment/flink/lib/flink-sql-connector-kafka-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/flink-sql-parquet-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/flink-connector-files-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/flink-s3-fs-hadoop-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/hadoop-mapreduce-client-core-3.3.5.jar
    """
    
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    t_env.get_config().set("parallelism.default", "1")
    t_env.get_config().set("pipeline.jars", jar_list)
    t_env.get_config().set("pipeline.classpaths", jar_list)
    t_env.get_config().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
    t_env.get_config().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    I have also read somwhere that flink-s3-fs-hadoop jar needs to be placed in plugins dir in a separate folder and not in lib dir. So I have also tried setting up plugins dir but it didn't work
    Copy code
    os.environ["FLINK_PLUGINS_DIR"] = "/home/ubuntu/environment/flink/plugins"
    Here is the error I get
    Copy code
    Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3a'. The scheme is directly supported by Flink through the following plugin(s): flink-s3-fs-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See <https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/filesystems/plugins/> for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
    m
    • 2
    • 2
  • j

    Junqi Xie

    04/10/2023, 2:48 AM
    Hi all! I'm learning Statefun with DataStream API. However, the link in https://nightlies.apache.org/flink/flink-statefun-docs-release-3.2/docs/sdk/flink-datastream/ has expired and the content in this page has been quite out-dated. I'm wondering if anyone could give a more concrete example of Statefun DataStream API?
    g
    • 2
    • 1
1...717273...98Latest