https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • j

    Jashwanth S J

    04/13/2023, 8:27 AM
    Hi Team, Trying to use custom docker image with flink installation through docker file. Ending up with below error even though tried to use args and command in podtemplate for jobmanager and taskmanager in FlinkDeployment spec
    Copy code
    Warning  Failed     3s (x3 over 21s)  kubelet            Error: failed to create containerd task: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: "/docker-entrypoint.sh": stat /docker-entrypoint.sh: no such file or directory: unknown
    Copy code
    jobManager:
        resource:
          memory: "1048m"
          cpu: 0.5
        podTemplate:
            spec:
              containers:
              - args:
                - -n
                - -c
                - /etc/supervisor/supervisord.conf
                command:
                - /usr/bin/supervisord
              tolerations:
               - key: "env"
                 operator: "Equal"
                 value: "dev"
                 effect: NoSchedule      
      taskManager:
        resource:
          memory: "1048m"
          cpu: 0.5
        podTemplate:
            spec:
              containers:
              - args:
                - -n
                - -c
                - /etc/supervisor/supervisord.conf
                command:
                - /usr/bin/supervisord
              tolerations:
               - key: "env"
                 operator: "Equal"
                 value: "dev"
                 effect: NoSchedule
    b
    • 2
    • 1
  • s

    Slackbot

    04/13/2023, 8:35 AM
    This message was deleted.
    m
    s
    • 3
    • 2
  • s

    Sumit Nekar

    04/13/2023, 8:41 AM
    Hello Team, We have a flink job , that reads from a kafka source and writes to cosmos db sink. It uses broadcast state. When we try to suspend the job with savepoint, two of the tasks are not getting stopped where as other tasks are in finished state. Savepoint is completed successfully. Having same issue with other two similar jobs also. Any thoughts on this?
  • j

    Jirawech Siwawut

    04/13/2023, 9:33 AM
    Hi Team. I would like to ask some question regarding this error i found on taskmanager log
    Copy code
    WARN  org.apache.flink.hive.shaded.parquet.hadoop.MemoryManager - Total allocation exceeds 50.00% (2,130,706,432 bytes) of heap memory
    What is the meaning of this error? Does it means that the resource used to write parquet exceeds 50% of HEAP? If yes, what are some root causes that it suddenly occurs after I have ran the job for months. I tried to restart all taskmanagers and everything seems to work fine
  • d

    Deepyaman Datta

    04/13/2023, 12:45 PM
    Hi! I'm trying to calculate percentages in a GROUP BY, with something like this:
    Copy code
    SELECT
      cc_num,
      merchant,
      (SUM(amt) OVER w) / (
        SUM(amt) OVER (
          PARTITION BY cc_num
          ORDER BY ts_ltz RANGE BETWEEN INTERVAL '30' MINUTES PRECEDING AND CURRENT ROW
        )
      )
    FROM transaction_amount
    WINDOW w AS (
      PARTITION BY cc_num, merchant
      ORDER BY ts_ltz RANGE BETWEEN INTERVAL '30' MINUTES PRECEDING AND CURRENT ROW
    );
    (approach similar as https://stackoverflow.com/a/6207658) If it's not clear, the idea above is to get the breakdown of transaction amounts by
    merchant
    for each
    cc_num
    . However, from https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/over-agg/:
    You can define multiple
    OVER
    window aggregates in a
    SELECT
    clause. However, for streaming queries, the
    OVER
    windows for all aggregates must be identical due to current limitation.
    (and I do get an error if trying it) Is there a way to get a similar result given current limitations?
  • s

    Simon Lawrence

    04/13/2023, 2:11 PM
    Hi there, I’m trying to get to grips with temporal table joins in the table API, and I cannot get a simple example to work. The code compiles and runs successfully. I would expect that the attached code would produce a table of currency converted “amounts” and that this would print to the console, but instead nothing appears to be happening. Any help with this would be appreciated.
    Table API
    j
    • 2
    • 8
  • e

    Eric Xiao

    04/13/2023, 10:10 PM
    👋 we're trying to migrate to the new Flink operator and we've noticed that both the pod ( and container names (
    flink-main-container
    for both TM and JM) have changed... to make the migration to the flink operator seamless we were hoping there was a way to make sure those could two values could remain consist but haven't been able to do some in the manifest file...
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      labels:
        name: {{ .name }}
        app: {{ .appName }}
        env: {{ .env }}
        runtime-component: trickle
      annotations:
        <http://cloudplatform.shopify.io/scale-down-for-maintenance|cloudplatform.shopify.io/scale-down-for-maintenance>: "true"
    spec:
      ...
      jobManager:
        ...
        podTemplate:
          spec:
            containers:
              - name: jobmanager
      taskManager:
        ...
        podTemplate:
          spec:
            containers:
            - name: taskmanager
      podTemplate:
        ...
        spec:
          ...
          containers:
            - name: flink-main-container
    This is an example of what my manifest file looks like. We would also like to have some reusable configurations defined in the general outer
    podTemplate
    as well, instead of having duplicate configs for both TM and JM
    podTemplate
    .
  • s

    Slackbot

    04/14/2023, 9:24 AM
    This message was deleted.
    m
    • 2
    • 3
  • c

    chunilal kukreja

    04/14/2023, 10:00 AM
    ####Urgent help required### During checkpointing, if the folder where snapshot is to be saved is already present. Like in my case “chk-1” is the folder where snapshot is to be saved is already present. I get below exception & post that job gets restarted.
    Copy code
    WARN  org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 1 for job 000000006e6b13320000000000000000. (0 consecutive failed attempts so far)
    org.apache.flink.runtime.checkpoint.CheckpointException: Failure to finalize checkpoint.
    
    Caused by: java.io.IOException: Target file file:/opt/flink/pm/checkpoint/000000006e6b13320000000000000000/chk-1/_metadata already exists.
        at org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream.getOutputStreamWrapper(FsCheckpointMetadataOutputStream.java:168)
    Expectation: Ideally it should either skip this folder name use another or overwrite the content of the existing folder. Can someone help me out to know if this is expected behaviour or there is some workaround available?
    m
    n
    • 3
    • 15
  • d

    Dimitris Kalouris

    04/14/2023, 11:36 AM
    Hey everyone, I am still new to flink and I am getting out of memory error in Flink while submitting large number of jobs, and then it fail before being submitting all of them. Do you know which memory part of flink I have to increase so that this doesnt happen? Is it the heap of jobmanager or taskamanger or something else? I tried changing some of these values but still go the same error. I am running flink in batch mode and using flink Table api TableEnvironment.executeSql() to submit them.
  • a

    Amir Hossein Sharifzadeh

    04/14/2023, 12:41 PM
    I am running a query against two topics and calculating the results. In the main class:
    Copy code
    tableEnv.createTemporaryView("tbl1", stream1);
    tableEnv.createTemporaryView("tbl2", stream2);
    String data_query = "select ....";
    Table raw_table = tableEnv.sqlQuery(data_query);
    DataStream<Row> mystream = tableEnv.toDataStream(raw_table);
    mystream.process(new DataProcessor()).setParallelism(4);
    In ProcessFucntion class:
    Copy code
    public class DataProcessor extends ProcessFunction<Row, List<double[][]>> {
    public void processElement(Row row, ProcessFunction<Row, List<double[][]>>.Context context, Collector<List<double[][]>> collector) throws Exception {
            int id = Integer.parseInt("" + row.getField(0));
            String data1 = ((String) (row.getField(1)));
            String data2 = ((String) (row.getField(2)));
    double[][] results = DataUtils.compute(id, data1, data2);
    }
    My questions: 1. How to send the results to the output? 2. How to access the output from the main file?
  • a

    AK

    04/15/2023, 11:41 AM
    Hello, I have a question about the custom implementation of the window strategy (Using Kafka as a source). According to this doc, it looks like the Flink supports per-partition watermark with custom event time from POJO (after key-by operation), which looks fine. In my specific use case, I have Kafka as a source that gets events from two different regions (by an upstream service). In some cases, one region would process the events at a slower rate due to some issue or strange scenario. In that case, the per partition watermark is not handling my case, because if one of the regions inserts data, then the window would close. Is there a way to handle my case? One of the hacky solutions is to have X partitions and use the first X/2 partition for a region X and similarly, the second X/2 for another region Y, in this case, I am able to solve my use case. Is there any other decent way to extend and support this using a custom watermark generator with per-partition + custom field (region)?
    ✅ 1
    d
    • 2
    • 11
  • m

    mohammadreza khedri

    04/16/2023, 7:30 AM
    Hello, I just run this example from the apache flink website and I got the following error:
    Copy code
    py4j.protocol.Py4JJavaError: An error occurred while calling o0.execute.
    : org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
            at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
            ...
    
    Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
            at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
            ...
    
    Caused by: java.lang.RuntimeException: Failed to create stage bundle factory!
    INFO:root:Starting up Python harness in loopback mode.
    ....
    
    Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
            at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
            ...
    
    Caused by: java.lang.IllegalStateException: Process died with exit code 0
            at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
            ...
  • s

    Slackbot

    04/17/2023, 5:17 AM
    This message was deleted.
    m
    • 2
    • 1
  • k

    Kush Rohra

    04/17/2023, 5:25 AM
    Hi Folks. I have a question. I am using flink and scala for my streaming application, it has data coming for all of our VMs, like CPU utilization, memory usage, etc. Now we have 2 category of VMs test and non test VMs. When I read data from Kafka, I need to filter out the data for only non test VMs so that we can monitor those and set up alerting systems in grafana. Now a list of non test VMs is present in DB table and its dynamic. I am using JDBCCatalog with cache to keep a copy of the VMs info handy when streaming data so that I can filter out required data. Now the error I am facing here is, my application is running fine on a platform when consumer parallelism = 8 and processing parallelism = 2(however the application is very slow here), As soon as I increase my processing parallelism to 4 my application starts to fail with the following errors: 1. org.apache.flink.table.api.TableException: Failed to execute sql 2. org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: This exception indicates that the query uses an unsupported SQL feature.Caused by: org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=LOGICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE]. Missing conversion is LogicalSink[convention: NONE -> LOGICAL] 3. java.util.ConcurrentModificationException None of these errors are there when processingParallelism = 2 Are these known issues with Flink, have any of you faced this before.
  • l

    Likhith Kanigolla

    04/17/2023, 6:36 AM
    I have three tables , I want to merge the all three tables and send the data to elastic search, currently i am using SQL-CLI in apache flink.
    Copy code
    CREATE TABLE customers_inv (
         `id` STRING,
         `first_name` STRING,
         `last_name` STRING,
         `email` VARCHAR,
         PRIMARY KEY (`id`) NOT ENFORCED
       ) WITH (
      'connector' = 'kafka',
      'topic' = 'dbserver1.inventory.customers',
      'properties.bootstrap.servers' = 'kafka:9092',
      'properties.group.id' = 'eventsGroup=customersinv',
      'value.debezium-json.schema-include' = 'true',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
       );
       
    CREATE TABLE products_inv (
         `id` STRING,
         `name` VARCHAR,
         `description` VARCHAR,
         `weight` FLOAT,
         PRIMARY KEY (`id`) NOT ENFORCED
       ) WITH (
      'connector' = 'kafka',
      'topic' = 'dbserver1.inventory.products',
      'properties.bootstrap.servers' = 'kafka:9092',
      'properties.group.id' = 'eventsGroup=productsinv',
      'value.debezium-json.schema-include' = 'true',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
       );
    
    CREATE TABLE orders_inv (
         `id` STRING,
         `order_date` STRING,
         `purchaser` STRING,
         `quantity` INT,
         `product_id` STRING,
         PRIMARY KEY (`id`) NOT ENFORCED
       ) WITH (
      'connector' = 'kafka',
      'topic' = 'dbserver1.inventory.orders',
      'properties.bootstrap.servers' = 'kafka:9092',
      'properties.group.id' = 'eventsGroup=ordersinv',
      'value.debezium-json.schema-include' = 'true',
      'scan.startup.mode' = 'earliest-offset',
      'value.format' = 'debezium-json'
       );
    How can i further proceed and send the data to elasticsearch I tried to create new table and insert data but didn't worked.
    Copy code
    CREATE TABLE users_sink (
         `name` STRING
       ) WITH (
      'connector' = 'kafka',
      'topic' = 'sink-topic-json',
      'properties.bootstrap.servers' = 'kafka:9092',
      'properties.group.id' = 'SinkGroup',
       'value.format' = 'json'
       );
       
    INSERT INTO users_sink   
    SELECT first_name from customers_inv;
    I am getting this error and could'nt get much information about it
    Copy code
    [ERROR] Could not execute SQL statement. Reason: 
    org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.users_sink' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[def
    ault_catalog, default_database, customers_inv]], fields=[id, first_name, last_name, email])
    Any other approches also will be helpful. Thanks in Advance
    g
    • 2
    • 6
  • b

    barak ben natan

    04/17/2023, 9:00 AM
    Hi guys, I am upgrading from Flink 1.13.6 to Flink 1.14.4 Getting this weird java.lang.ClassCastException: class org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to class org.apache.flink.table.planner.plan.cost.FlinkCostFactory (org.apache.calcite.plan.RelOptCostImpl$Factory and org.apache.flink.table.planner.plan.cost.FlinkCostFactory are in unnamed module of loader 'app') at org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:54) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:288) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) Found this ancient similar issue: https://issues.apache.org/jira/browse/FLINK-15333. But it didn't help.... Any advice?
  • u

    张夏昭

    04/17/2023, 9:43 AM
    Hi Folks. I have a question. I am using flink for my table api job.And I create my function named "mysplit".I want to get array result for input like "1;2;3" with my function.But I get a error: What's wrong with my sql job?:
    create view dataset_v as
    SELECT a.id as dataset_id,a.dataset_name_zh ,a.dataset_name_en ,a.introduction,
    (SELECT c.level_path from dict_detail c WHERE c.id  in mysplit(a.industry_type_ids,";")) as level_path,
    FROM dataset_info a
    left join usage_statistics b on a.id=b.dataset_info_id
    This is my funcion:
    Copy code
    @FunctionHint(output = @DataTypeHint("ARRAY<f1 INT>"))
    public static class MySplit extends TableFunction<List<Integer>> {
        public void eval(String str,String delimiter){
            List<Integer> ss=new ArrayList<>();
            for (String s : str.split(delimiter)) {
                ss.add( Integer.parseInt(s));
            }
            collect(ss);
        }
    }
  • n

    Nithin kharvi

    04/17/2023, 10:18 AM
    Hi, We are using flink job (version 1.16) to consume message from kafka and push it back to kafka after processing. We are encountering below error in the jobmanager and flink job restarts after that. Is this error related flink or any configuration to recover from this error ERROR akka.remote.Remoting [] - Association to [akka.tcp://flink@10.101.212.188:6122] with UID [590651498] irrecoverably failed. Quarantining address. java.util.concurrent.TimeoutException: Remote system has been silent for too long. (more than 2 days) at akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:454) ~[flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:249) ~[flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_75517d49-aee2-4205-be5b-dc2b88de8069.jar:1.16.0] at java.util.concurrent.ForkJoinTask.doExec(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool.scan(Unknown Source) [?:?] at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) [?:?] at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) [?:?]
  • t

    Thijs van de Poll

    04/17/2023, 3:53 PM
    Hi I have a single Dockerfile that includes PyFlink code to deploy different pipelines. Two of the pipelines somehow require conflicting
    .jar
    dependencies. Both pipelines work properly if I add the dependencies to the
    /opt/flink/lib
    folder. As a solution I wanted to add in one of the pipelines and additional
    pipeline.jars
    configuration which adds some more jars required for that specific pipeline. However, I keep on getting errors which indicate that the jars are not loaded correctly. I also tried to run the pipeline using
    ./bin/flink run --python pipeline.py --jarfile jar_dependency.jar
    , but that also did not work. Does anyone understand why this does not work? A potential workaround would be to to build different images for the different pipelines, but I rather not do that. Thanks in advance!
    m
    • 2
    • 5
  • h

    Hygor Knust

    04/17/2023, 9:49 PM
    Hi, I’m trying to deploy a
    SessionJob
    using
    flink-kubernetes-operator
    . However I’m receiving this event on the `SessionJob`:
    "Connection timed out (Connection timed out)"
    Looking in the operator logs I see this:
    Copy code
    flink-kubernetes-operator 2023-04-17 21:40:34,212 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
    flink-kubernetes-operator 2023-04-17 21:40:34,213 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
    flink-kubernetes-operator 2023-04-17 21:40:34,223 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
    flink-kubernetes-operator 2023-04-17 21:40:34,223 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
    flink-kubernetes-operator 2023-04-17 21:40:49,224 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
    flink-kubernetes-operator 2023-04-17 21:40:49,225 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
    flink-kubernetes-operator 2023-04-17 21:40:49,233 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
    flink-kubernetes-operator 2023-04-17 21:40:49,233 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
    flink-kubernetes-operator 2023-04-17 21:41:04,234 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
    flink-kubernetes-operator 2023-04-17 21:41:04,235 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
    flink-kubernetes-operator 2023-04-17 21:41:04,244 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
    flink-kubernetes-operator 2023-04-17 21:41:04,244 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
    flink-kubernetes-operator 2023-04-17 21:41:19,246 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
    flink-kubernetes-operator 2023-04-17 21:41:19,246 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
    flink-kubernetes-operator 2023-04-17 21:41:19,254 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
    flink-kubernetes-operator 2023-04-17 21:41:19,254 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
    flink-kubernetes-operator 2023-04-17 21:41:30,342 o.a.f.k.o.l.AuditUtils         [INFO ][data-team/flink-job] >>> Event  | Warning | SESSIONJOBEXCEPTION | Connection timed out (Connection timed out)
    flink-kubernetes-operator 2023-04-17 21:41:30,342 o.a.f.k.o.r.ReconciliationUtils [WARN ][data-team/flink-job] Attempt count: 3, last attempt: false
    flink-kubernetes-operator 2023-04-17 21:41:30,356 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] Starting reconciliation
    flink-kubernetes-operator 2023-04-17 21:41:30,356 o.a.f.k.o.s.FlinkResourceContextFactory [INFO ][data-team/flink-cluster] Getting service for flink-cluster
    flink-kubernetes-operator 2023-04-17 21:41:30,364 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler [INFO ][data-team/flink-cluster] Resource fully reconciled, nothing to do...
    flink-kubernetes-operator 2023-04-17 21:41:30,364 o.a.f.k.o.c.FlinkDeploymentController [INFO ][data-team/flink-cluster] End of reconciliation
    flink-kubernetes-operator 2023-04-17 21:41:30,375 o.a.f.k.o.l.AuditUtils         [INFO ][data-team/flink-job] >>> Status | Error   | UPGRADING       | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.net.ConnectException: Connection timed out (Connection timed out)","throwableList":[{"type":"java.net.ConnectException","message":"Connection timed out (Connection timed out)"}]} 
    flink-kubernetes-operator 2023-04-17 21:41:30,375 i.j.o.p.e.ReconciliationDispatcher [ERROR][data-team/flink-job] Error during event processing ExecutionScope{ resource id: ResourceID{name='flink-job', namespace='data-team'}, version: 1307816475} failed.
    flink-kubernetes-operator org.apache.flink.kubernetes.operator.exception.ReconciliationException: java.net.ConnectException: Connection timed out (Connection timed out)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:112)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:52)
    flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:145)
    flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:103)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics.timeControllerExecution(OperatorJosdkMetrics.java:80)
    flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.Controller.reconcile(Controller.java:102)
    flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.reconcileExecution(ReconciliationDispatcher.java:139)
    flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleReconcile(ReconciliationDispatcher.java:119)
    flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleDispatch(ReconciliationDispatcher.java:89)
    flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.handleExecution(ReconciliationDispatcher.java:62)
    flink-kubernetes-operator     at io.javaoperatorsdk.operator.processing.event.EventProcessor$ReconcilerExecutor.run(EventProcessor.java:406)
    flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    flink-kubernetes-operator     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    flink-kubernetes-operator     at java.base/java.lang.Thread.run(Unknown Source)
    flink-kubernetes-operator Caused by: java.net.ConnectException: Connection timed out (Connection timed out)
    flink-kubernetes-operator     at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
    flink-kubernetes-operator     at java.base/java.net.AbstractPlainSocketImpl.doConnect(Unknown Source)
    flink-kubernetes-operator     at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source)
    flink-kubernetes-operator     at java.base/java.net.AbstractPlainSocketImpl.connect(Unknown Source)
    flink-kubernetes-operator     at java.base/java.net.SocksSocketImpl.connect(Unknown Source)
    flink-kubernetes-operator     at java.base/java.net.Socket.connect(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.security.ssl.SSLSocketImpl.connect(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.security.ssl.BaseSSLSocketImpl.connect(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.NetworkClient.doConnect(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.http.HttpClient.openServer(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.https.HttpsClient.<init>(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.https.HttpsClient.New(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.getNewHttpClient(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect0(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.http.HttpURLConnection.plainConnect(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(Unknown Source)
    flink-kubernetes-operator     at java.base/sun.net.www.protocol.https.HttpsURLConnectionImpl.getInputStream(Unknown Source)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.artifact.HttpArtifactFetcher.fetch(HttpArtifactFetcher.java:59)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.artifact.ArtifactManager.fetch(ArtifactManager.java:61)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.uploadJar(AbstractFlinkService.java:724)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.service.AbstractFlinkService.submitJobToSessionCluster(AbstractFlinkService.java:207)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:70)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.deploy(SessionJobReconciler.java:41)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler.reconcile(AbstractFlinkResourceReconciler.java:115)
    flink-kubernetes-operator     at org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.reconcile(FlinkSessionJobController.java:104)
    flink-kubernetes-operator     ... 13 more
    Could anyone help me debug that?
    t
    m
    • 3
    • 15
  • u

    张夏昭

    04/18/2023, 6:14 AM
    Hi guys,I try to put "flink-connector-elasticsearch7_2.12-1.14.5.jar" in my flink/lib,but flink-client can't run with an error:
    Copy code
    Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/elasticsearch/table/ElasticsearchConnectorOptions
    	at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory.<clinit>(Elasticsearch7DynamicSinkFactory.java:61) ~[flink-connector-elasticsearch7_2.12-1.14.5.jar:1.14.5]
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_333]
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[?:1.8.0_333]
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[?:1.8.0_333]
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[?:1.8.0_333]
    	at java.lang.Class.newInstance(Class.java:442) ~[?:1.8.0_333]
    	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ~[?:1.8.0_333]
    	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) ~[?:1.8.0_333]
    	at java.util.ServiceLoader$1.next(ServiceLoader.java:480) ~[?:1.8.0_333]
    	at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_333]
    	at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:623) ~[flink-table_2.12-1.14.5.jar:1.14.5]
    	at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:378) ~[flink-table_2.12-1.14.5.jar:1.14.5]
    	at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:156) ~[flink-sql-client_2.12-1.14.5.jar:1.14.5]
    	... 8 more
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchConnectorOptions
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:387) ~[?:1.8.0_333]
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:418) ~[?:1.8.0_333]
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) ~[?:1.8.0_333]
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ~[?:1.8.0_333]
    	at org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch7DynamicSinkFactory.<clinit>(Elasticsearch7DynamicSinkFactory.java:61) ~[flink-connector-elasticsearch7_2.12-1.14.5.jar:1.14.5]
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[?:1.8.0_333]
    Any other approches also will be helpful. Thanks in Advance
    m
    • 2
    • 1
  • g

    Gintaras Matulas

    04/18/2023, 6:47 AM
    Hey. Do I need to add key by before adding sink if I want to ensure that records from previous keyBy operation go to the same sink writer when parallelism is greater than 1 ?
  • n

    Narges

    04/18/2023, 7:51 AM
    Hi im trying to use datastream with python this is my code but i got this error: TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars' from pyflink.common.serialization import Encoder from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import StreamingFileSink from pyflink.datastream.connectors import FlinkKafkaProducer from pyflink.common.serialization import SimpleStringSchema def tutorial(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) ds = env.from_collection( collection=[(1, 'aaa'), (2, 'bbb')], type_info=Types.ROW([Types.INT(), Types.STRING()])) producer_properties = {"bootstrap.servers": "localhost:9092"} ds.add_sink(FlinkKafkaProducer("my-topic", SimpleStringSchema(), producer_properties)) env.execute("tutorial_job") if name == '__main__': tutorial()
    d
    • 2
    • 1
  • m

    Max Dubinin

    04/18/2023, 12:23 PM
    Hey guys! Does anyone have a good explanation for the use-case of the
    KeyBy
    operator? I have a Kinesis stream with 350+ event types, each event is eventually streamed into a different sink (using side-outputs) Is it a good idea to use
    KeyBy
    before the
    process
    function that tags the events? Does it have any positive effect on the pipeline in terms of durability/performance?
    b
    • 2
    • 3
  • r

    Raghunadh Nittala

    04/18/2023, 12:30 PM
    Hey All, I’m sinking stream results to S3, the names of the files are in the format: part-14f0a382-e12f-4f96-b26e-670d18cd1168-0-1334 part-14f0a382-e12f-4f96-b26e-670d18cd1168-1-1484 part-14f0a382-e12f-4f96-b26e-670d18cd1168-2-1449 part-14f0a382-e12f-4f96-b26e-670d18cd1168-3-1486 We have the job parallelism as 4 and thats the reason 4 files are being saved to S3. What does the numbers - 1334, 1484,1449,1486 indicate. Are they the checkpoint ids?
  • r

    Raghunadh Nittala

    04/18/2023, 1:14 PM
    Hello All, I’m using the below SQL query to sink data to S3 in parquet format:
    INSERT INTO sink_table_s3
    SELECT event_id, event_type, event_name, DATE_FORMAT(TUMBLE_END(proc_time, INTERVAL '1' HOUR), 'yyyy-MM-dd') AS record_date, COUNT(*) results_count
    FROM source_table
    GROUP BY event_id, event_type, event_name, TUMBLE(proc_time, INTERVAL '1' HOUR);
    I am partitioning the table on event_id, event_type and date columns. I observed the parquet files are getting saved for an
    event_id
    ,
    event_type
    , but the date is not changing. The data being processed today is being saved to 2023-04-14 folder. As I am using
    proc_time
    to derive the date, I expect data to be saved to 2023-04-18 folder. PS. The deployment has been rolled back due to some errors in between, but running fine since a day. I’m curious what would lead to this behavior?
  • f

    Felix Angell

    04/18/2023, 1:54 PM
    Hey, has anyone run into this problem before:
    Copy code
    java.lang.NoSuchMethodError: 'org.apache.flink.metrics.MetricGroup org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()'
    	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:416)
    	at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:365)
    Essentially we've upgraded to 1.15.2 flink (on KDA) and we're using
    flink-sql-connector-kinesis
    v 1.15.3 which exposes a
    FlinkKinesisConsumer
    . We are using PyFlink and so we mapped this FlinkKinesisConsumer class with the
    jvm_gateway
    . On using this we get the above exception as of doing this upgrade. I can still resolve this
    getMetricGroup
    symbol manually if I were to add the dependency to this
    flink-sql-connector-kinesis
    pom in a typical java project, but in the PyFlink environment it doesn't seem to pick it up. Any ideas what is up here? Let me know if you need more details
    m
    • 2
    • 7
  • z

    Zachary Piazza

    04/18/2023, 2:33 PM
    Hi folks. I am currently trying to setup CDC pipeline, from MongoDB (using Debezium) to Flink to JDBC sink table in Postgres. I am able to see inserts and updates being processed successfully in the downstream Postgres table, however delete handling is not working correctly. I have tried changing the 'drop.tombstones' property to 'false' and changing the 'delete.handling.mode' to 'rewrite', 'none', and 'drop', none of which seems to fix the problem. I have also tried using both 'kafka' and 'upsert-kafka' for the source kafka table definition in Flink. I have read some blog posts on this topic but unfortunately most of the examples I've seen with debezium are using a source db like MySQL instead of a document database such as MongoDB.
    r
    • 2
    • 4
  • s

    Sucheth Shivakumar

    04/18/2023, 6:44 PM
    We tried to upgrade flink version to 1.17 and operator version to 1.17, but the
    upgradeMode: stateless
    doesn't seem to work as expected (read from the earliest offset). but was working fine with flink version 1.15 and operator version 1.15 Anyone has any idea why it is not working and what has changed ?
    g
    • 2
    • 1
1...737475...98Latest