https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • l

    licho

    04/10/2023, 8:11 AM
    how use CEP SQL implement only once timeout match? for example, after receive A data, not receive B in 5 minutes, and trigger a event,
    šŸ‘€ 1
  • s

    Sameer Chandra

    04/10/2023, 8:39 AM
    Hi All! We are planning on using the setUnbounded(OffsetsInitializer) in Flink Kafka connector to make sure that the Flink job stops reading after a specific timestamp offset for all partitions. I have a few questions regarding this as the documentation of Flink's behaviour is sparse. 1. I want to process data from Kafka based on timestamp offsets and would want to stop at specific timestamp partitions. This job will basically run once per day and read the data that was ingested into Kafka for that specific day and close processing once it reaches the end of day timestamp offset for all partitions. Since we have late events coming in and we want to process late events, it is imperative for us to store the state of the window aggregations for at least 3 days. Is there a way for me to take a savepoint at the end of the job and use that to restore processing the next day with only the unbounded offsets initializer changed? 2. If that is the case, is there any configuration that allows us to take a savepoint once the final offsets are reached?
    m
    • 2
    • 7
  • b

    Bharath Reddy

    04/10/2023, 9:15 AM
    Dear Flink Team, is it possible to use S3 as a backend for storing the jars and download them during the job execution, If so can someone provide and example of how to do this?
    v
    • 2
    • 5
  • p

    piby 180

    04/10/2023, 11:25 AM
    Hey! I want to store streaming data into parquet files on S3. I am testing it with the following code but it is not working. I have double checked the IAM role and permissions and there are no S3 access issues.
    Copy code
    from pyflink.table import EnvironmentSettings, TableEnvironment
    
    
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    source_ddl = f"""
                CREATE TABLE source_table (
                      user_id STRING,
                      order_amount DOUBLE,
                      ts TIMESTAMP(3),
                      WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
                ) WITH (
                  'connector' = 'datagen',
                  'rows-per-second' = '10'
                )
                """
                
    
    
    sink_ddl = f"""
            CREATE TABLE sink_table (
                  user_id STRING,
                  order_amount DOUBLE,
                  ts TIMESTAMP(3),
                  dt STRING
            ) PARTITIONED BY (dt) WITH (
              'connector' = 'filesystem',
              'path' = 's3a://<bucket-name>/flink/data',
              'format' = 'parquet',
              'auto-compaction' = 'true',
              'partition.time-extractor.timestamp-pattern' = '$dt',
              'sink.rolling-policy.file-size'='1MB',
              'sink.rolling-policy.rollover-interval'='60s',
              'sink.partition-commit.delay'='0s',
              'sink.partition-commit.trigger'='partition-time',
              'sink.partition-commit.policy.kind'='success-file'
    
            );
    """
    
    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)
    
    statement_set = t_env.create_statement_set()
    
    
    statement_set.add_insert_sql("INSERT INTO sink_table SELECT user_id, order_amount, ts, DATE_FORMAT(ts, 'yyyy/MM/dd/HH/mm') as dt FROM source_table")
    statement_set.execute().wait()
    The code is running indefinitely but there are no files on S3, Here are the logs
    Copy code
    2023-04-10 11:20:45,212 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
    2023-04-10 11:20:45,211 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
    2023-04-10 11:21:01,046 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
    2023-04-10 11:21:01,047 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
    2023-04-10 11:22:00,964 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
    2023-04-10 11:22:00,999 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
    d
    r
    • 3
    • 5
  • m

    mohammadreza khedri

    04/10/2023, 11:43 AM
    Hi, I just want to start a pyflink example that you can see on the apache flink official website: https://nightlies.apache.org/flink/flink-docs-master/api/python/examples/datastream/connectors.html#kafka-with-csv-format When I start this example, producing data to Kafka topic is done successfully. But an error occurred when consuming. This is my error:
    Copy code
    raise Py4JJavaError(
    py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
    : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    .
    .
    .
    Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness
    This is my question on the StackOverflow website: https://stackoverflow.com/questions/75976664/reading-from-kafka-with-pyflink-not-working
    d
    • 2
    • 44
  • p

    piby 180

    04/10/2023, 2:00 PM
    Hey guys, I am testing how to store streaming data to parquet files on S3 and have encountered the following issues: 1. S3 key is not getting parsed properly. "/" is replaced by "%2F" 2. How can I customize the filename stored in S3? I couldn't find any setting for it. 3. I get the following error when I use
    'sink.partition-commit.trigger'='partition-time'.
    `Caused by: java.time.format.DateTimeParseException: Text '2023/04/10/13/49' could not be parsed at index 4`Looks like it is not able to commit the partition. When I use
    'sink.partition-commit.trigger'='process-time',
    it works and I can see _SUCCESS empty files being commited. 4. Is it better to use DataStream API for S3 Sink? From the documentation, it looks like data stream API has the ability to write file prefix and suffix https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/ Here is my code:
    Copy code
    from pyflink.table import EnvironmentSettings, TableEnvironment
    
    
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    t_env.get_config().set("restart-strategy.type", "fixed-delay")
    t_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
    t_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
    
    jar_list = """
        file:///home/ubuntu/environment/flink/lib/flink-sql-connector-kafka-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/flink-sql-parquet-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/flink-connector-files-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/flink-s3-fs-hadoop-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/flink-s3-fs-presto-1.17.0.jar;
        file:///home/ubuntu/environment/flink/lib/hadoop-mapreduce-client-core-3.3.5.jar
    """
    t_env.get_config().set("pipeline.jars", jar_list)
    t_env.get_config().set("pipeline.classpaths", jar_list)
    # set the checkpoint mode to EXACTLY_ONCE
    t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
    t_env.get_config().set("execution.checkpointing.interval", "1min")
    
    # set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
    # you can also set the full qualified Java class name of the StateBackendFactory to this option
    # e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
    t_env.get_config().set("state.backend.type", "rocksdb")
    
    # set the checkpoint directory, which is required by the RocksDB statebackend
    t_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
    source_ddl = f"""
                CREATE TABLE source_table (
                      user_id STRING,
                      order_amount DOUBLE,
                      ts TIMESTAMP(3),
                      WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
                ) WITH (
                  'connector' = 'datagen',
                  'rows-per-second' = '10'
                )
                """
                
    
    
    sink_ddl = f"""
            CREATE TABLE sink_table (
                  user_id STRING,
                  order_amount DOUBLE,
                  ts TIMESTAMP(3),
                  dt VARCHAR
            ) PARTITIONED BY (dt) WITH (
              'connector' = 'filesystem',
              'path' = 's3a://<bucket_name>/flink/data',
              'format' = 'parquet',
              'auto-compaction' = 'true',
              'sink.rolling-policy.file-size'='1MB',
              'sink.rolling-policy.rollover-interval'='60s',
              'sink.partition-commit.delay'='0s',
              'partition.time-extractor.timestamp-pattern' = '$dt',
              'sink.partition-commit.trigger'='partition-time',
              'sink.partition-commit.policy.kind'='success-file'
    
            );
    """
    
    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)
    
    statement_set = t_env.create_statement_set()
    
    
    statement_set.add_insert_sql("INSERT INTO sink_table SELECT user_id, order_amount, ts, DATE_FORMAT(ts, 'yyyy/MM/dd/HH/mm') as dt FROM source_table")
    statement_set.execute().wait()
    d
    r
    • 3
    • 11
  • m

    Max Dubinin

    04/10/2023, 6:48 PM
    Hey guys, I’m a Flink newbie, it will be extremely appreciated if anyone can assist: I have a kinesis source that is created by SQL API (
    CREATE TABLE..
    ), then filtered twice by
    SELECT x,y,z FROM KinesisSource WHERE ..
    queries. Each of those
    select
    streams is inserted into a separate sink (also created with
    CREATE TABLE...
    ). I run this setup in a
    statementSet
    .
    Copy code
    KinesisSource -> SELECT * where name = 'A' -> Sink 'A'
                  -> SELECT * where name = 'B' -> Sink 'B'
    1. Does anyone know why I see two tasks with the name ā€œSink: endā€? It doesn’t seem that these are the two sinks I created because their bytes/records sent value is always the same (records sent of the first row divided by two) 2. ā€œrecords sentā€ value keeps increasing although I don’t send anything. Can anyone explain why? (This happens in a
    FlinkDeployment
    but not in a local cluster setup) 3. Is there a way to configure the job so I can see the source, the filters, and the sinks in the graph + their correct data? 4. Am I doing this wrong?
    • 1
    • 1
  • m

    Mali

    04/10/2023, 7:16 PM
    Hello everyone, I am trying to authenticate s3 via service account. I create a service account which name is ā€œflinkā€. I created IAM Role in AWS like ā€œs3 full accessā€. I added role arn as a annotation in ā€œjobServiceAccountā€ part inside of values.yaml ( I am using flink-k8s-operator ). I also added
    Copy code
    fs.s3a.aws.credentials.provider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
    fs.s3a.path.style.access: 'true'
    fs.s3a.endpoint: s3.<region>.<http://amazonaws.com|amazonaws.com>
    Note: I enabled HA. But i am getting following errors;
    Copy code
    -> Caused by: org.apache.flink.util.FlinkException: Could not create the ha services from the instantiated HighAvailabilityServicesFactory org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory.
    
    -> Caused by: java.nio.file.AccessDeniedException: s3://<bucket_name>/flink/recovery/test/blob: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by WebIdentityTokenCredentialsProvider : com.amazonaws.SdkClientException: Unable to execute HTTP request: sts.<region>.<http://amazonaws.com|amazonaws.com>
    What should i do ? (Flink version is 1.16.0)
    p
    • 2
    • 2
  • j

    Jakub Janowski

    04/11/2023, 8:16 AM
    Hi, is there any support for PubSub in the Python DataStream API? I see that there is a Java connector however I don’t know if cross-language usage exists in Flink as it does in Beam.
    d
    m
    • 3
    • 11
  • n

    Nikola Stanisavljevic

    04/11/2023, 9:24 AM
    Hi, There seems to be a problem with gcs plugin for version
    1.17.0
    . This error appears. If i try plugin of version
    1.16.1
    with flink 1.17 it works fine. So there seems to be a problem with this jar file build and its classes
    gcs 1.17 issue
    m
    • 2
    • 6
  • a

    Amenreet Singh Sodhi

    04/11/2023, 11:56 AM
    Hey Team, I am using NFS mount to store checkpoints from my flink job running in HA application mode with checkpoint retention count as 10. But i get the following error:
    Copy code
    WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 000000006e6b13320000000000000000. (0 consecutive failed attempts so far)
    org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
    
    Caused by: java.io.IOException: Target file file:/opt/flink/pm/checkpoint/000000006e6b13320000000000000000/chk-1/_metadata already exists.
    	at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
    And each time if it finds that the folder is already present for the checkpoint its about to create, it throws error, and Job restarts. How to handle it gracefully, or any way if flink job is able to rewrite metadata file in this folder, thus preventing the job to restart. Thanks in advance!
  • c

    chendan

    04/11/2023, 1:35 PM
    Hey team, I found a strange problem. I use flink SQL to do ETL. However sometimes I need to change SQLs dur to requirement change. I found that when I delete one SQL from the SQL queue or change the SQL sequence, the checkpoint still resume data ETL to perform the old SQL sequence. For example, originally I design SQL with four operators in sequence 1->2->3->4. 1 to 4 are the operator ids.Then I delete operator 2. I want flink to perform operators 1->3->4 in sequence. But in fact flink resumed by checkpoint to still perform 1->2->3->4. The checkpoint seems still remember the old SQL queue by ids and perform it regardless of new SQLs. The SQL changed, I want flink to know the new sequence, the SQLs with new ids assigned automatically. But it seems that flink did not assign new ids for them or new ids did not take effect. I think this is because I could not assigin name and id for SQL operators manually in Flink SQL. It seems that Flink SQL does not provide a way for user to assign operator's name and id manually. If I design ETL with datastream API this should not happen. So is this the defect of Flink SQL?
    m
    • 2
    • 10
  • a

    Alex Brekken

    04/11/2023, 1:38 PM
    Hi all, I’m trying to deploy my Flink app using the Flink k8s operator, and I’m a little confused about the ā€œflink-volumeā€ volume mount. I’ve created a PVC for the flink-volume. (I’m using longhorn as the storage layer if that matters) When I try deploying it, it creates a pod that starts up and goes to a ready state successfully. Then, a 2nd pod gets created which is named: <my-application-name>-taskmanager-1-1. (It’s using the same image and container as my application). However, this pod never is able to start because it’s unable to attach to the PVC which is already attached to the first pod. Since the PVC is
    ReadWriteOnce
    this isn’t surprising. If I use
    hostPath
    instead of a PVC, then everything works fine though I doubt that’s the recommended approach..? Why are both pods trying to use the same PVC? Do I need to have a PVC with ReadWriteMany? (I hope not.. šŸ™‚ ) Thanks for any help!
    m
    • 2
    • 3
  • v

    Virender Bhargav

    04/11/2023, 3:50 PM
    == Flink State Processor API | SavepointWriter == I am trying to use State Processor API to cleanup an existing savepoint. The Savepoint size is around 10 TBs. While writing the modified savepoint using SavepointWriter.write(), i am hitting a bottleneck. Wanted to understand the intent behind below piece of code. Why is the parallelism of below sink being forcefully set to 1? flink version : 1.15.1 https://github.com/apache/flink/blob/release-1.15.0-rc1/flink-libraries/flink-stat[…]i/src/main/java/org/apache/flink/state/api/SavepointWriter.java
    Copy code
    DataStream<OperatorState> existingOperatorStates = newOperatorStates.getExecutionEnvironment().fromCollection(existingOperators).name("existingOperatorStates");
                existingOperatorStates.flatMap(new StatePathExtractor()).setParallelism(1).addSink(new OutputFormatSinkFunction(new FileCopyFunction(path)));
                finalOperatorStates = newOperatorStates.union(new DataStream[]{existingOperatorStates});
  • t

    Thijs van de Poll

    04/11/2023, 4:18 PM
    Hi all, I have a question about Flink CDC for Postgres, and it’s parallelism. According to the documentation the parallelism should be set to 1 to ensure the ordering. How does this setting relate to the number of taskmanagers used in the cluster? Some background about my use case: • I use Table API, • Postgres CDC as source connector for a bunch of different tables (8), • In the pipeline I join the different tables and perform transformations/group by operations, • Opensearch as the sink Because there are multiple CDC sources, would it be possible to scale up the number of taskmanagers from 1 to, say 8 (=number of CDC connectors), to speed up the process?
  • g

    Guruguha Marur Sreenivasa

    04/11/2023, 4:55 PM
    Hi all, I wanted to understand a few things in Flink's Session clusters. 1. We observed that the JVM Metaspace keeps increasing as we deploy / cancel jobs on the cluster. Is this expected? Is there any documentation that explains this behavior? We tend to deploy new jobs almost every day and they are long running streaming pipelines. We want to capacity plan and make sure that enough memory is allocated on the job manager so it doesn't crash (it had happened before due to metaspace issue) 2. When we restart a particular pipeline, the checkpointed state is lost and we send duplicate data again as the
    MapState
    we use is empty on restart and it starts build up again. We are saving checkpoints on S3. Below is my checkpointing config:
    Copy code
    this.env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
        this.env.setStateBackend(new HashMapStateBackend());
        final CheckpointConfig config = env.getCheckpointConfig();
        config.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        config.setCheckpointStorage(checkPointBucket);
        config.setTolerableCheckpointFailureNumber(applicationConfiguration.getCheckpointFailureTolerance());
    3. Are there ways to optimize the pipelines so there is less data transfer across task managers as streaming data progresses through to further stages in the pipeline?
    j
    • 2
    • 1
  • j

    James Watkins

    04/11/2023, 10:16 PM
    Hello, I’m getting the following error when setting up the Flink SQL Client on my machine with a 3rd party connector. I am using Gradle to build the project and I have included that dependency in my build.gradle file:
    Copy code
    dependencies {
        ...
        implementation 'com.ververica:flink-sql-connector-postgres-cdc:2.2.1'
    This is the error I’m getting:
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'postgres-cdc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
    I will add more details in-thread for the steps I have taken. Any help would be much appreciated.
    āœ… 1
    m
    • 2
    • 10
  • a

    Amir Hossein Sharifzadeh

    04/12/2023, 4:50 AM
    I am running a process and wondering how to kill or stop the process:
    Copy code
    Table raw_table = tableEnv.sqlQuery(data_query);
    DataStream<Row> join_stream = tableEnv.toDataStream(raw_table);
    join_stream.process(new EMPADProcessor()).setParallelism(4);
    My Processor class:
    Copy code
    public class EMPADProcessor extends ProcessFunction<Row, String> {
    
        public EMPADProcessor() {
        }
    
        @Override
        public void processElement(Row row, ProcessFunction<Row, String>.Context context, Collector<String> collector) {
    
    
            try {
    
                int chunk_id = Integer.parseInt("" + row.getField(0));
    ...............
    I want to stop/kill process when
    chunk_id=100
  • a

    abhishek sidana

    04/12/2023, 7:08 AM
    Hi Team, i deployed flink application to aks cluster and its killing some of pods due to OOM issue but i couldn't find any logs to get an alert when pod is being killed by the task manager, is there way to log this when flink pods are restarted due to OOM issue?
  • m

    Mali

    04/12/2023, 7:23 AM
    According to my job manager log; Job manager is stucking at this situtation;
    Copy code
    INFO  org.apache.flink.runtime.blob.FileSystemBlobStore            [] - Creating highly available BLOB storage directory at s3://<my_path>/blob
    My flink.conf is ;
    Copy code
    s3.access-key: <my_access_key>
    s3.secret-key: <my_secret_key>
    s3.path.style.access: 'true'
    s3.endpoint: s3.<my_regino>.<http://amazonaws.com|amazonaws.com>
    #fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    #fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
    I tried to change ā€œs3.ā€ to ā€œfs.s3a.ā€ but nothing is changed. Anyone have idea about that ?
    m
    • 2
    • 1
  • i

    Iain Dixon

    04/12/2023, 10:17 AM
    Hello all, I’m a PhD student at Newcastle University and we’re doing some research in stream processing using flink. I was wondering if there was a way to modify the DEFAULT_TIME_SPAN_IN_SECONDS attribute in MeterView, and the UPDATE_INTERVAL_SECONDS in ViewUpdater. Is the best way to go about doing this to fork the Github repo and rebuild the source locally for my experimental runs, or is there a better way? Any information about how to go about that would be helpful, thank you!
    šŸ‘€ 1
    f
    • 2
    • 2
  • c

    Chirag Dewan

    04/12/2023, 11:21 AM
    Can anyone share any experience on running Flink jobs across data centers? I am trying to create a Multi site/Geo Replicated Kafka cluster. I want that my Flink job to be closely colocated with my Kafka multi site cluster. If the Flink job is bound to a single data center, I believe we will observe a lot of client latency by trying to access the broker in another DC. Rather if I can make my Flink Kafka collectors as rack aware and start fetching data from the closest Kafka broker, I should get better results. I will be deploying Flink 1.16 on Kubernetes with Strimzi managed Apache Kafka. Thanks.
    m
    • 2
    • 3
  • m

    Miguel Ɓngel FernƔndez FernƔndez

    04/12/2023, 11:47 AM
    Hi team, I have a multitenancy use case where data is ingested by kafka and stored in cassandra. Depending on the data, these are stored in different keyspaces (Multi-tenancy with shared database and separate schema). We currently have an implementation that incorporates multiple cassandra sinks (one for each keyspace), but this implies multiple Cassandra connections. Looking at the documentation, we have observed that the sink can be configured with a DefaultKeyspace. Is there the possibility of being able to modify that keyspace for each tuple? In this way, we would not need multiple connections with cassandra.
    • 1
    • 1
  • m

    Monika Hristova

    04/12/2023, 1:32 PM
    Hello, I am trying to set up Python Virtual Environment by following the steps here. I use the script from here but after executing it I get the following error. Can someone help me ?
  • c

    chunilal kukreja

    04/12/2023, 2:47 PM
    Hi Team, During checkpointing, if the folder where snapshot is to be saved is already present. Like in my case ā€œchk-1ā€ is the folder where snapshot is to be saved is already present. I get below exception & post that job gets restarted.
    Copy code
    WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 000000006e6b13320000000000000000. (0 consecutive failed attempts so far)
    org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
    
    Caused by: java.io.IOException: Target file file:/opt/flink/pm/checkpoint/000000006e6b13320000000000000000/chk-1/_metadata already exists.
        at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
    Expectation: Ideally it should either skip this folder name use another or overwrite the content of the existing folder. Can someone help me out to know if this is expected behaviour or there is some workaround available?
  • j

    Jirawech Siwawut

    04/12/2023, 4:29 PM
    Hi all. I am trying to use s3 for checkpointing, and i found this error when I tried
    Copy code
    org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.finalizeCheckpoint(CheckpointCoordinator.java:1346) ~[flink-runtime-1.15.1.jar:1.15.1]
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1241) ~[flink-runtime-1.15.1.jar:1.15.1]
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1133) ~[flink-runtime-1.15.1.jar:1.15.1]
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) ~[flink-runtime-1.15.1.jar:1.15.1]
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) ~[flink-runtime-1.15.1.jar:1.15.1]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_301]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_301]
    	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_301]
    Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket;
    Here is my setup
    Copy code
    flink-conf.yaml
    s3.access-key: accces_key
    s3.secret-key: secret_key
    s3.endpoint: <http://myendpoint.com|myendpoint.com>
    ---
    env.setStateBackend(new FsStateBackend("<s3://mybucket/checkpoint>"));
    I found that it is still connecting to default s3 endpoint from debug log
    p
    • 2
    • 5
  • b

    Bhupendra Yadav

    04/12/2023, 5:14 PM
    Hi everyone, we were trying to use s3 in jarURI for flinkSessionJob. But it always used to fail saying no s3 scheme implementation found for downloading jar, even though we put the hadoop-s3-fs jar in plugins directory & set HADOOP_CLASSPATH env var as well in pod. After doing all this, we saw that flink operator on startup used to log, no Hadoop dependency found on classpath. So we looked upon docker-entrypoint.sh for flink-operator and realized that hadoop_classpath env var was not added to java classpath argument. In order to fix it, we edited docker-entrypoint.sh to add out s3 jar path in classpath(-cp argument). After that we could see the hadoop-s3 jar was added on classpath(confirmed via logs) & it worked fine. I was wondering, is there any easier way to put a jar(s) on classpath in flink operator? Modifying the docker-entrypoint.sh might lead to more bugs if we upgrade operator and there are any changes in the entrypoint file. Thanks.
    p
    • 2
    • 5
  • h

    Hunter Medney

    04/12/2023, 7:32 PM
    I'm looking to use a temporal table function in Fllink 1.16.1 using a view / query as input instead of a normal source table as I need to perform a CROSS JOIN UNNEST on an array in the original source table (Kafka connector). Best I could come up with is to run the CROSS JOIN UNNEST query, convert to DataStream<Row> and convert back to a Table with PK and Watermark defined. The DataStream<Row> was an insert-only stream. The job runs but nothing is output when calling the temporal table function in a LATERAL table. I simulated similar output from the CROSS JOIN UNNEST query with a normal table via the datagen connector, and I was able to see the expected output in the LATERAL table. Are temporal table functions incompatible with DataStream-based tables?
    • 1
    • 1
  • j

    Jeesmon Jacob

    04/12/2023, 9:22 PM
    Hi team, any suggestion to recover from this error? We recently removed a class that defines our state earlier, now during upgrade we see this error because it still tries to look up the old class from the previous checkpoint. We tried with
    execution.savepoint.ignore-unclaimed-state: true
    but no luck. We are on Flink 1.15. Thanks!
    Copy code
    2023-04-12 21:04:57,615 WARN  org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - Exception while restoring operator state backend for CoBroadcastWithNonKeyedOperator_fda18ba392f9dc50769c0bd716347531_(1/8) from alternative (1/1), will
    retry while more alternatives are available.
    org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend
            at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:160) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:265) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:703) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:679) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:646) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) ~[flink-dist-1.15.3.jar:1.15.3]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) ~[flink-dist-1.15.3.jar:1.15.3]
            at java.lang.Thread.run(Unknown Source) ~[?:?]
    • 1
    • 1
  • s

    Slackbot

    04/13/2023, 12:46 AM
    This message was deleted.
    m
    • 2
    • 4
1...727374...98Latest