https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    Jonathan Rozmarin

    07/06/2023, 11:35 AM
    This message contains interactive elements.
    a
    • 2
    • 3
  • r

    Rashmin Patel

    07/06/2023, 12:20 PM
    Hii All I observed that Flink KafkaSource is not auto-detecting the increase in partitions of a source topic ? Is it expected behaviour ?
    m
    • 2
    • 3
  • l

    Leong Wai Leong

    07/06/2023, 1:52 PM
    This message contains interactive elements.
  • n

    Nishanth S

    07/06/2023, 3:02 PM
    Hi all, When I try to stop a Flink 1.17.1 job with savepoint, I am getting the following error:
    org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint Coordinator is suspending.
    After this the job restarts, to recover from this exception. Is this expected, as I am able to reproduce this issue during every update to the job. Appreciate any pointers on this. Attached taskmanager and jobmanager error logs. Please let me know if any other logs are required.
    • 1
    • 2
  • a

    Adam Richardson

    07/06/2023, 6:14 PM
    • Do operators inherit their upstream operators' slot sharing group, if all upstreams share the same group? • Do operators inherit their upstream operator's parallelism if there is only a single upstream operator?
  • d

    Daiyan Chowdhury

    07/06/2023, 7:31 PM
    Hi, I am trying to download the asc file from flink website for v1.17.1 but my downloaded file’s hash does not match the provided hash from flink
    • 1
    • 1
  • s

    Sai Sharath Dandi

    07/07/2023, 12:12 AM
    Hi, I'm using a percentileUDAF which computes percentile by storing all the data in memory for the window. I know this is not the best way but I see very poor performance for the exact same UDAF in flink 1.12 vs flink 1.4/1.9. Is there any major change between the versions that could cause this issue? I have checked my configurations multiple times and nothing stands out in particular.
    m
    • 2
    • 9
  • a

    Amenreet Singh Sodhi

    07/07/2023, 6:52 AM
    Hi Team, I am deploying Flink cluster on Kubernetes in HA mode. But i noticed, whenever i deploy Flink cluster for first time on K8s cluster, it is not able to populate the cluster configmap, and due to which JM fails with the following exception:
    Copy code
    2023-07-06 16:46:11,428 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.
    java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) ~[?:?]
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) [?:?]
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    	at java.lang.Thread.run(Thread.java:834) [?:?]
    Caused by: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[event_executor-1.1.20.jar:?]
    	at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[event_executor-1.1.20.jar:?]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[event_executor-1.1.20.jar:?]
    	at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[event_executor-1.1.20.jar:?]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[event_executor-1.1.20.jar:?]
    	at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[event_executor-1.1.20.jar:?]
    	at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[event_executor-1.1.20.jar:?]
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ~[?:?]
    Once we reinstall/helm upgrade then this exception goes away. How can this be resolved, any additional configuration required to resolve this?
    g
    • 2
    • 2
  • и

    Иван Борисов

    07/07/2023, 7:15 AM
    Hello, I found that lib https://github.com/andrewuhl/RollingWindow/tree/master for R, do you know same for Flink? Is it possible to do same (same fast) on Flink?
  • y

    Yang Li

    07/07/2023, 10:07 AM
    Hello, I am using a flink job 1.17 with flink operator , I have enabled HA configuration in kubernetes with storage dir in aws S3, I recognise that sometimes I can have the logs about HA info missing (404 from aws S3)
    Copy code
    com.facebook.presto.hive.s3.PrestoS3FileSystem$UnrecoverableS3OperationException: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey;
    Has anyone experienced something like this before and has a insight about root of cause of this? 🙏
  • l

    Leong Wai Leong

    07/07/2023, 10:38 AM
    Hi all, when using flink HybridSource with FileSource, the file needs to be sorted in order?
  • v

    Vivien Budavölgyi

    07/07/2023, 1:29 PM
    Hello everyone, I am reaching because of a challenge we are facing with Apache Flink and the sqlserver-cdc connector. Our current setup involves pulling changes from tables in MSSQL, using the connector, and creating views from these tables. We then write these changes to a Kafka topic with the upsert-kafka connector. However, we are encountering some difficulties due to the complexity of the views and the large size of the SQL tables (some tables are ranging from 9 to 25 million rows). Running the Flink job with a parallelism of 1 is not feasible in our case, as it significantly affects the performance. On the other hand, running it in parallel introduces ordering issues. Considering these constraints, what is the best practice to handle this scenario?
    m
    • 2
    • 1
  • s

    sophia wu

    07/07/2023, 11:17 PM
    Hi guys. I am using Flink 1.15 DataStream api to do ETL job. I want to set my job as BATCH execution mode, so I use code provided in official webstie.
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    However, I encountered the following error when I run in Kineses Data Analytics:
    java.lang.UnsupportedOperationException at <http://org.apache.flink.runtime.io|org.apache.flink.runtime.io>.network.partition.ResultPartition.getAllDataProcessedFuture(ResultPartition.java:233)
    My whole code logic
    Copy code
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    DataStream<String> text = env.readTextFile("file:///path/to/file");
    
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
    DataStream<OutputType> result = text
    .map(/* map logic here */ )
    .keyBy(/* keyby logic here */)
    .reduce(/* reduce logic here */)
    
    result.writeAsText("filePath")
    Can I know why the error happen? which part has "`UnsupportedOperationException` "? Thanks in advance!❤️
  • m

    Marco Villalobos

    07/08/2023, 2:30 AM
    Hi Everybody. I have a Flink job deployed to Kubernetes trying to produce data to a AWS MSK Kafka topic, however, when I write to that topic, it never receives data. I tested my code using Docker Compose and Docker and it can send messages in that environment, but in the cloud deployment in Kubernetes, I notice the following. The cloud deployment in the metrics section shows the following metrics:
    Copy code
    0.my_sink_name_my_group_id__Committer.numRecordsIn = 343
    0.my_sink_name_my_group_id__Writer.numRecordsIn = 0
    The writer metric is always zero. Consequently, that topic never receives any messages. In my functional Docker Compose deployment where Kafka messages are received the
    Writer
    metric is always
    positive non-zero
    . Does anybody understand what
    Committer
    and
    Writer
    do, and what I should consider to resolve this issue? This job uses three kafka topics. It uses AT_LEAST_ONCE delivery guarantee for Kafka, and EXACTLY_ONCE checkpointing. 1. kafka source 2. kafka sink for telegraf 3. kafka sink to statefun (but writing to that topic fails as described above). This particular Kafka cluster is running with only two bootstrap servers (in case that makes a difference).
    s
    • 2
    • 14
  • o

    Or Keren

    07/09/2023, 2:14 PM
    Hey everyone 🙂 I have a usecase where I have a single flink application who listens to kafka topics with some consumer group id. I want to duplicate that app to listen to the same topics, and each duplicate should read all of the messages in the topic. Therefore, I need to change the consumer group id for each one of the consumers. When doing the migration (stopping the single app and initializing the duplicate ones), I want each of them to start at the last committed offsets by the previous single app consumer id. What's the best approach here?
    s
    • 2
    • 5
  • r

    Rajat Ahuja

    07/10/2023, 3:30 AM
    Hi Folks, I am trying to run Flink Sql to connect to my Session cluster running on k8s via K8s operator but somehow i am unable to do so. I have tried quite a few approaches but no luck. Writing this email hoping someone would chime in to fix my problem. Step 1) Set up the Session Cluster on k8s via K8s operator. Here's my yaml file apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: session-six-deployment-only-example spec: image: flink:1.16 flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "10" rest.server.max-content-length: "1209715200" ( Bold those properties as i think it is required to fix the issue) rest.client.max-content-length: "1209715200" serviceAccount: flink jobManager: resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 Step 2) Set up ingress file to access the web ui apiVersion: networking.k8s.io/v1 kind: Ingress metadata: annotations: kubernetes.io/ingress.class: k8-cps-dev name: my-docker-app-ingress-2 spec: rules: - host: flink-prod-dns.usb.cloud.bank-dns.com ( Assume i have set up the DNS ) http: paths: - backend: service: name: session-six-deployment-only-example-rest port: number: 8081 path: / pathType: Prefix Now i am able to access the UI via DNS flink-prod-dns.usb.cloud.bank-dns.com Step 3) I download the flink 1.16 bundle and updates its conf/fink-conf.yaml with following properties ( Keeping rest of the properties as it is) jobmanager.bind-host: flink-prod-dns.usb.cloud.bank-dns.com rest.address: flink-prod-dns.usb.cloud.bank-dns.com rest.server.max-content-length: 1209715200 rest.client.max-content-length: 1209715200 Step 4) ./bin/sql-client.sh embedded ( not calling via SQL GATEWAY as it is not supported until the lates version of flink 1.17) ( now any command that i run leads the following error) .
    select 1;
    WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/rxahuja/flink-kubernetes-operator/examples/flink-1.16.2/lib/flink-dist-1.16.2.jar) to field java.lang.Class.ANNOTATION WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: 413 Request Entity Too Large. Try to raise [rest.client.max-content-length] [ERROR] Could not execute SQL statement. Reason: org.apache.flink.runtime.rest.util.RestClientException: 413 Request Entity Too Large. Try to raise [rest.client.max-content-length] Since error suggests that i shall set the property rest.client.max-content-length. hence i updated my server and clients with rest.client.max-content-length and rest.server.max-conent-length but still getting this issue. i am not able to figure out how to get pass this issue. Thanks
    g
    • 2
    • 5
  • e

    Eugenio Gastelum

    07/10/2023, 6:08 AM
    Hello everyone, hope you had a great weekend! I am writing this to see if I can get some help. I have found some trouble following an official AWS flink Kinesis Analytics github sample repo about using UDF in a stream. This is the example I am trying to reproduce https://github.com/aws-samples/pyflink-getting-started/tree/main/pyflink-examples/UDF But It's not running. I've tried to see if I have implemented it differently but can't spot the difference. Does someone knows if I am doing something wrong? What I have done so far, is that I've tried doing two scripts: 1.
    flink_simple_functional.py
    This one does not uses the UDF and just does a very simple transformation like multiplying an integer column by 2 and sinking it to the console. This works fine 2.
    flink_simple.py
    This other one is a copy of the above mentioned code, but besides doing a multiplication by 2, it also derives another column more by using a UDF. That UDF just converts a string column to lowercase However, the first code works (the one that does not uses the UDF), and the second does not, despite it's a very simple UDF. Can someone spot what might be the mistake? Or if UDF are not supported anymore for AWS Kinesis? I am basically reproducing the AWS repo for flink-kinesis. Here I attach both of my flink files, as well as a third file which is the one I use to populate my Kinesis
    input_stream
    from which both the flink files fetch as source. Just in case it might help to take a look at how that stream looks like. It only has one shard and I confirm it has data
    stock.pyflink_simple_functional.pyflink_simple.py
  • c

    Cheguri Vinay Goud

    07/10/2023, 11:43 AM
    Hello, I've created flink deployment in native mode (kubernetes) and tried to submit 10 FlinkSessionJobs, but I see task manager pods are created for each of the flinksessionjob and are getting terminated and many of the submitted flink sessionjobs are stuck in upgrading state. Can someone share the configuration so that task managers are utilized by the other submitted jobs instead of getting terminated
    g
    • 2
    • 5
  • m

    Mikhail Spirin

    07/10/2023, 12:42 PM
    Hi all! Is there any option to control partition key in datastream api + kinesis sink + PyFlink? In Table API, i use PARTITIONED BY ( some_id ) and it works pretty well. In Datastream API, i see only to options when asetting up kinesis streaming sink - PartitionKeyGenerator.random() and PartitionKeyGenerator.fixed(), and i can’t find way to pass some part of item there. Is there such way?
    a
    • 2
    • 8
  • j

    Jagan Nalla

    07/10/2023, 2:21 PM
    I have below code and it is working if all the values in json object/dictinoary is string, if any field in the json object/dictionary is integer it fails. I'm not sure how to fix this, I hope some one have any idea to achieve this. Code:
    Copy code
    @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), <http://DataTypes.INT|DataTypes.INT>()],
         result_type=DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())
         )
    def p_record(first_seen,last_seen,src,rec_count):
        record_map ={
            "first_seen": first_seen,
            "last_seen": last_seen,
            "srcid": src,
            "rec_count": rec_count
        }
        return record_map
    
    query ="""select pdns_record(first_seen,last_seen,srcid,rec_count) from src_table"""
    
    ftable = st_env.sql_query(query)
    ftable.execute().print()
    Error:
    bytes_value = value.encode("utf-8")
    AttributeError: 'int' object has no attribute 'encode'
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
    at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
    at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:262)
    at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
    at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
    at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:318)
    at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:301)
    at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:834)
    at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at <http://org.apache.beam.vendor.grpc.v1p43p2.io|org.apache.beam.vendor.grpc.v1p43p2.io>.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    ... 3 more
    e
    d
    • 3
    • 48
  • s

    saqlain pasha

    07/10/2023, 4:41 PM
    Hi all! We updated the Flink version in our services from 1.13.6 to 1.17.0, but we are seeing considerable performance degradation. The same load that our services used to handle with 3 TMs with 2 CPU cores each (1.13.6 version) now requires either 3 CPU cores per TM or 5 TMs with 2 CPU cores each (1.17.0 version). I wanted to know if this is something that is expected or if anyone has any pointers regarding this
    s
    • 2
    • 6
  • e

    Eugenio Gastelum

    07/10/2023, 8:05 PM
    Hey everyon, I attach the java errors I get from trying to run the python UDF file in case it helps. I've attached it as a comment on this thread
    h
    • 2
    • 17
  • r

    Ramya Edpuganti

    07/10/2023, 9:28 PM
    I have flink application deployed in k8s in standalone mode. Below are configurations set I have used in flink-config.yaml jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 2 blob.server.port: 5124 jobmanager.rpc.port: 5123 taskmanager.rpc.port: 5122 queryable-state.proxy.ports: 5125 jobmanager.memory.process.size: 4092m jobmanager.memory.heap.size: 3092m taskmanager.memory.process.size: 3500m taskmanager.memory.task.heap.size: 1000m parallelism.default: 2 state.checkpoints.dir: gs://example web.upload.dir: /opt/flink/ state.backend: rocksdb state.checkpoints.dir: gs://example state.savepoints.dir: gs://example state.backend.incremental: true After submitting job with in few min JVM heap memory is filling up and getting restarted(I did enabled checkpoints). Job is running fine in my local but I am seeing this issue when submitting to k8s Any leads here will be very helpful. Thank you
    a
    • 2
    • 2
  • p

    Pappu Yadav

    07/11/2023, 10:10 AM
    Hi All, I have the below requirement for Flink batch mode. I have to backfill data for around 120 days, which involves below operations. 1. Read data for that day from s3 2. Process the data. 3. Push data to Kafka Sink. I am executing these steps in Loop from startDate to endDate. But instead of job executing in sequence for each day, Read step for all 120 days are being executed first and then 2nd Steps followed by 3rd Step. I want data to be pushed one day at a time in sequence manner. Can anyone help here Here is code snippet
    Copy code
    ApplicationConfig config = context.getApplicationConfig();
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // Set run time env to batch
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        // env.setParallelism(10);
        env.getConfig().setGlobalJobParameters(context.getParameterTool());
        LocalDate date = startDate;
        while (endDate.compareTo(date) >= 0) {
          StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
          for (String eventName : EVENT_TYPES_LIST) {
            // Map and Kafka Sink
            processData(tableEnv, date, context, eventName, env);
          }
          date = date.plusDays(1);
        }
    
     
        env.execute(config.getServiceIdentifier() + "{" + startDate + " to " + endDate + "}");
    Attaching screenshot of Execution Graph for reference, which shows it reading the data at once from s3
    a
    • 2
    • 4
  • p

    Pappu Yadav

    07/11/2023, 10:20 AM
  • a

    amarjeet pasrija

    07/11/2023, 11:07 AM
    Hi I am trying to do checkpointing on S3 and creating the config but it is not working, The config is not getting change for the JOB.
    Copy code
    s_env = StreamExecutionEnvironment.get_execution_environment()
    conf.set_string('checkpointing.mode', "EXACTLY_ONCE")
    conf.set_string('state.backend', "filesystem")
    conf.set_string('state.checkpoints.dir', checkpoint_location)
    
    environment_settings = EnvironmentSettings.new_instance()\
        .in_streaming_mode() \
         .with_configuration(conf) \
          .build()
    
    t_env = TableEnvironment.create(environment_settings)
    
    Even Tried.
    
    t_env.get_config().set('checkpointing.mode', "EXACTLY_ONCE")
    t_env.get_config().set('state.backend', "filesystem")
    t_env.get_config().set('state.checkpoints.dir', checkpoint_location)
  • n

    nitin

    07/11/2023, 11:40 AM
    Hi, my application is not coming up and I noticed below information in kibana, related to change is spec. However there no change upgrade mode is last-state only. Any idea why it is still having the STATELESS upgrade mode which is wrong value
  • n

    nitin

    07/11/2023, 11:41 AM
    Untitled
    Untitled
    g
    • 2
    • 5
  • a

    André Midea Jasiskis

    07/11/2023, 1:00 PM
    Hi! I have a nested table declaration using the TableAPI and I'd like to use a nested field as a primary-key, when doing that I get an error of column not found.
    Copy code
    // Caused by: org.apache.flink.table.api.ValidationException: Invalid primary key
    // 'PK_data.abc_event__id'. Column 'data.abc_event__id' does not
    // exist.
    The code snippet for the table declaration is the following:
    Copy code
    private static final DataType abcTransferOutRequestRow =
                    DataTypes.ROW(
                            DataTypes.FIELD("abc_event__id", DataTypes.STRING()),
                            DataTypes.FIELD("abc_event__cancelled_at", DataTypes.TIMESTAMP_LTZ()),
                            DataTypes.FIELD("abc_event__created_at", DataTypes.TIMESTAMP_LTZ()),
                            DataTypes.FIELD("abc_event__e_2e_id", DataTypes.STRING()),
                            DataTypes.FIELD("abc_event__failed_at", DataTypes.TIMESTAMP_LTZ()),
                            DataTypes.FIELD("abc_event__initiation_type", DataTypes.STRING()),
                            DataTypes.FIELD("abc_event__message", DataTypes.STRING()),
                            DataTypes.FIELD("abc_event__transfer_in_id_to_refund", DataTypes.STRING()));
    
            public static final Schema schema =
                    Schema.newBuilder()
                            .column("meta", EntitySnapshotPayload.metadataType)
                            .column("data", abcTransferOutRequestRow)
                            .primaryKey("data.abc_event__id")
                            .watermark("db__tx_instant", "db__tx_instant")
                            .build();
    I tried to work around that by using a columnByExpression but I get an error related to the column not being physical. Any ideas for work arounds or is that just a limitation of the table api?
    ✅ 1
    • 1
    • 1
  • d

    Daiyan Chowdhury

    07/11/2023, 8:19 PM
    Hi, how can I create a gpg key for flink 1.17.1?
    f
    m
    • 3
    • 3
1...939495...98Latest