https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • r

    Rajat Ahuja

    07/02/2023, 11:45 PM
    Hi team, I deployed a Session Cluster on top of k8s but when i try to run the job using the session Cluster using the name as defined it never gets to running state. Any idea what i might be doing wrong ? Any help would be appreciated Session Cluster
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: session-deployment-only-example
    spec:
      image: flink:1.16
      flinkVersion: v1_16
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "10"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    JOb
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: session-job-only-example
    spec:
      deploymentName: session-deployment-only-example
      job:
        jarURI: file:///Users/rxahuja/Downloads/flink-examples-streaming_2.12-1.16.1-TopSpeedWindowing.jar
        parallelism: 4
        upgradeMode: stateless
    status
    Copy code
    kubectl get flinksessionjobs | grep session-job-only-example
    basic-session-job-only-example                UPGRADING
    session-job-only-example                      UPGRADING
    s
    • 2
    • 1
  • k

    kiran kumar

    07/03/2023, 8:23 AM
    Hi All, Does FlinkDeployment supports rolling update ? We are getting below error when trying to do "kubectl rollout"
    Copy code
    + kubectl rollout status -f flink-deployment.yaml
    error: unable to decode "flink-deployment.yaml": no kind "FlinkDeployment" is registered for version "<http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>" in scheme "<http://k8s.io/kubernetes/pkg/kubectl/scheme/scheme.go:28|k8s.io/kubernetes/pkg/kubectl/scheme/scheme.go:28>"
    g
    • 2
    • 1
  • d

    Dheeraj Panangat

    07/03/2023, 10:08 AM
    Hi Team, With reference to this. Have another query : When check-pointing, it creates a folder with the JobID. And when we restart the Flink Application [restarts mean - either delete deployment and re-deploy or terminated the kubernetes pods] it creates a new JobID, which means a new checkpointing directory. How would it be able to read the state for the previous checkpoints ? Appreciate any inputs or if anyone share their understanding on this. Thanks.
  • r

    Rashmin Patel

    07/03/2023, 10:46 AM
    Hii All Upon restarting our flink pipeline, we have suddenly started facing this weird error.
    Copy code
    com.esotericsoftware.kryo.KryoException: Unable to find class: O_STATE
    Serialization trace:
    customerRecord (com.navi.ndp.pipelines.customerfunnel.models.CustomerMasterState)
    	at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
    	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
    	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    	at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:99)
    NO_STATE
    is the default value of one enum field in the case class. Has anyone faced such issue ever ?
  • o

    Oscar Perez

    07/03/2023, 10:46 AM
    hey, we want to use HybridSource with a KafkaSource that returns protobuf objects and also from a CSV that is of different nature. How can we achieve that? is there a way to implement mappers so that both sources end up returning the same object?
  • c

    Chris Tabakakis

    07/03/2023, 1:04 PM
    Hi everyone, I asked the question again 3 days ago, about getting a weird error that I can't find anywhere online, and have no idea how to counter. The full error log is here, and I reckon something's wrong with the BlobServer on my system. Any help would be appreciated, because I am entirely stuck.
    ✅ 1
    m
    • 2
    • 4
  • s

    Shengbo

    07/03/2023, 3:03 PM
    Hi, everyone, I submit a request for a jira account about three days ago. But there isn't any reply (maybe I haven't supplied enough information). I can't submit another one because the former one is under processing. Is there anyone can tell me what I can do for this? My email address is c1789036325@gmail.com.
    m
    • 2
    • 3
  • e

    Eugenio Gastelum

    07/03/2023, 3:58 PM
    Hi everyone, I've some issues trying to set up a local k8s flink deployment for testing. I've posted it on stacokverflow to make the doubt public too, it's here Also I might be doing something wrong when trying to use flink's CLI on the docker container , I've also posted it on a separate question here. Any ideas on what I've might got wrong?
  • y

    Yaroslav Bezruchenko

    07/03/2023, 6:21 PM
    I have a flink 1.16 Job. It has 1.37 GB of full checkpoint data. I have a config:
    Copy code
    state.backend.incremental: "true"
    But when I check my checkpoint dir I see next: savepoints have weigh of 38GB and checkpoints - 49GB. I had couple of restarts, with restore from savepoint. So I see 4 folders with checkpoints. Can you please suggest, what to do with extra data? Can I detect what I can delete safely? Is there a way for auto clean-up of checkpoint dir by Flink Operator? Thanks in advance
  • a

    Alex Bryant

    07/04/2023, 6:23 AM
    Hi Flink devs. We are trying to run a basic Flink job that subscribes to a RabbitMQ stream. Please note, this is the relatively recent feature of the
    stream
    data structure, as opposed to
    queue
    for RabbitMQ. We are setting up a basic job and have managed to get the connector working with the
    queue
    type, but not
    stream
    .
    Copy code
    version: '3.3'
    services:
      jobmanager:
        image: flink:1.17.0
        expose:
          - '8081'
        ports:
          - '8081:8081'
        command: jobmanager
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
          - JOB_MANAGER_RPC_PORT=6123
          - MQ_IP=${MQ_IP}
          - MQ_PORT=${MQ_PORT}
          - MQ_Q=${MQ_Q}
          - MQ_UN=${MQ_UN}
          - MQ_PW=${MQ_PW}
    
      taskmanager:
        image: flink:1.17.0
        depends_on:
          - jobmanager
        expose:
          - '6121'
        ports:
          - '6121:6121'
        command: taskmanager
        environment:
          - JOB_MANAGER_RPC_ADDRESS=jobmanager
          - JOB_MANAGER_RPC_PORT=6123
          - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=4
    
    networks:
      dev-network:
        driver: bridge
    Connector version:
    Copy code
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-rabbitmq</artifactId>
      <version>3.0.1-1.17</version>
    </dependency>
    Connecting to RabbitMQ:
    Copy code
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    System.out.println("Connecting....................");
    
    String MQ_IP = System.getenv("MQ_IP");
    int MQ_PORT = Integer.parseInt(System.getenv("MQ_PORT"));
    String MQ_UN = System.getenv("MQ_UN");
    String MQ_PW = System.getenv("MQ_PW");
    String MQ_Q = System.getenv("MQ_Q");
    
    // RabbitMQ connection configuration
    RMQConnectionConfig connectionConfig =
      new RMQConnectionConfig.Builder()
        .setHost(MQ_IP)
        .setPort(MQ_PORT)
        .setUserName(MQ_UN)
        .setPassword(MQ_PW)
        .setVirtualHost("/")
        .build();
    
    // Create RabbitMQ source
    RMQSource<String> source = new RMQSource<>(
      connectionConfig, MQ_Q, true, new SimpleStringSchema());
    
    DataStream<String> stream = env
      .addSource(source)
      .name("RabbitMQ Source");
    
    
    stream.print();
    Does anyone know if this is supported? If so, how can this be accomplished? We have tried using
    ("x-queue-type", "stream")
    in ClientProperties but were unsuccessful. We'd appreciate some support, even if the advice is to take a different approach. Thanks!
    m
    • 2
    • 5
  • j

    Jirawech Siwawut

    07/04/2023, 6:30 AM
    I open this PR long ago. Is there anyway to proceed review?
    m
    • 2
    • 10
  • s

    Sergey Postument

    07/04/2023, 1:33 PM
    Hi, we use flink-cdc connector in our apps
    Copy code
    "com.ververica" % "flink-sql-connector-postgres-cdc" % cdcConnectorVer % "provided",
          "org.postgresql" % "postgresql" % "42.4.0" % "provided",
    and we noticed if psql source table are empty (no data from the source) , flink cannot make an checkpoint what leads to job restart with the exception
    Copy code
    org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:205)
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:168)
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1939)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:1919)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:97)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2000)
    	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    	at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    	at java.base/java.lang.Thread.run(Unknown Source)
    here is the checkpoint configuration
    m
    l
    • 3
    • 2
  • a

    amarjeet pasrija

    07/04/2023, 2:32 PM
    Hi, While Python Table API and creating table using watermark the watermark strategy is not working, but while using sql-client the same table query is working fine. Can any please help me out to know what might be the potential issue
    m
    g
    • 3
    • 28
  • c

    Chris Tabakakis

    07/04/2023, 3:15 PM
    I'm trying to write a sample job that reads data from hdfs, so all i'm doing is using the env.readTextFile("hdfs://....") function. For the time being, I'm running this through the IDE to test it, and it used to work, however, in the process of fixing something else, I seem to have messed something up, because now I get an error "could not find a file system implementation for scheme 'hdfs'"
  • c

    Chris Tabakakis

    07/04/2023, 3:17 PM
    I'm trying to write a sample job that reads data from hdfs, so all i'm doing is using the env.readTextFile("hdfs://....") function. For the time being, I'm running this through the IDE to test it, and it used to work, however, in the process of fixing something else, I seem to have messed something up, because now I get an error:
    Copy code
    Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.
    	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:543)
    	at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409)
    	at org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction.run(ContinuousFileMonitoringFunction.java:214)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
    Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Cannot support file system for 'hdfs' via Hadoop, because Hadoop is not in the classpath, or some classes are missing from the classpath.
    Here is the full pastebin in case more information is needed.
    ✅ 1
    m
    • 2
    • 15
  • j

    James Watkins

    07/04/2023, 6:32 PM
    Hello! I’m trying to create an Iceberg sink in my Flink application and I’m able to successfully insert some dummy data into the Iceberg table with this simple line of code:
    Copy code
    tableEnv.executeSql("INSERT INTO flink_catalogue.flink_demo.flink_iceberg_sink VALUES (1, 1, 'dummy_merchant1', 1, '2022-04-01 00:00:00'), (2, 2, 'dummy_merchant2', 2, '2022-04-02 00:00:00');")
    but when I try and use the data stream from my Kafka source it’s not writing any data and there don’t seem to be any useful error messages in the stacktrace. Here is the INSERT INTO statement (note,
    flink_transaction
    is a kafka-backed Flink source table):
    Copy code
    tableEnv.executeSql("""
        INSERT INTO flink_catalogue.flink_demo.flink_iceberg_sink
        SELECT
            transaction_id
            , transaction_amount
            , merchant_name
            , user_id
            , created_at
        FROM flink_transaction
        """)
    I also checked if the data stream is producing the data I need in the right format by removing the
    INSERT INTO
    statement and it did print the results that I was expecting. Can anyone help me understand what I’m doing wrong here please? (I’ll add the full code in-thread)
    ✅ 1
    m
    • 2
    • 6
  • e

    Eugenio Gastelum

    07/04/2023, 7:16 PM
    Hello everyone, does anyone knows if there's a way to run pyflink .py files located on a windows machine that has a deployed flink on a local kubernetes cluster using minikube?
  • a

    Alex Bryant

    07/05/2023, 12:33 AM
    Here are the latest docs for Flink and the Apache Kafka connector. Looks like the version number aligns with the version of Flink for the connector: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/kafka/ Flink: 1.17.1 Kafka Connector: 1.17.1 Kafka: 3.5.0? Does the above combo of Flink and connector align with the latest stable version of Kafka, i.e., 3.5.0?
    m
    • 2
    • 1
  • s

    Slackbot

    07/05/2023, 6:49 AM
    This message was deleted.
    d
    • 2
    • 2
  • v

    Vitor Leal

    07/05/2023, 8:02 AM
    Hi all. Is there any way I can
    DESCRIBE
    a table that's already part of a job (not in the execution environment)?
    m
    • 2
    • 6
  • o

    Or Keren

    07/05/2023, 9:21 AM
    Hey all, For some reason when using the KafkaSink with DELIVERY_ONCE and restarting the application without a checkpoint (it had a
    last-state
    configuration, but we needed it in a fresh state, so we upgraded it with the
    stateless
    configuration), it takes a really long time to initialize the KafkaSink operator, could take more than an hour. From looking at the logs, it seems that it tries to send InitProductId for each previous checkpoint id it had incrementally from 0. Anyone knows why this is happening when trying to start with a clean state? How can we solve this?
    m
    • 2
    • 20
  • a

    Amenreet Singh Sodhi

    07/05/2023, 11:27 AM
    Hi Team, I am using Async operator, where I have embedded functions future defined. Just before this async IO operator, we have a global window i.e. is creating batches which are passed on to async io operator. I am seeing many of my batches are getting timed out w/o even getting processed. Any configuration or code change if you folks can suggest will be really helpful.
    👀 1
  • b

    Bruno Filippone

    07/05/2023, 12:01 PM
    Hello, I’m trying to figure out if there’s any way to set up a SQL Gateway on a Kubernetes cluster using the Kubernetes Operator. I can’t figure out whether it would be possible to run a SQL Gateway Deployment in Kubernetes and configure it to talk to one of the clusters managed by a FlinkDeployment kubernetes resource? I can’t seem to find a way to tell the SQL Gateway to interact with a specific job manager. What am I missing?
  • m

    Mikhail Spirin

    07/05/2023, 12:29 PM
    Hi All! Using Flink SQL abilities, i need to sink to kinesis from my source data this structure:
    Copy code
    {
      "str1": 970089079,
      "str2": 1688400374,
      "str3": {
        "str4": {
          "column1": {
            grouped_column2: 1,
            grouped_column2: 1,
            ....
          }
        }
      }
    }
    my approaches are in thread below…
    a
    • 2
    • 5
  • l

    Leong Wai Leong

    07/05/2023, 1:19 PM
    This message contains interactive elements.
    j
    • 2
    • 5
  • s

    sagar shinde

    07/05/2023, 3:57 PM
    Any one who can help me how to setup apache flink on kubernetes in HA Mode. If have any documentaion and steps it will highly appreciate. Thanks
    m
    • 2
    • 3
  • a

    André Midea Jasiskis

    07/05/2023, 8:13 PM
    Say that I have a Flink application with 3 stateful operators, each one of those has a
    uid
    explicitly set. I have operator chaining enabled, those 3 stateful operators got merged into one operator in the (looking in the Flink UI). For savepoint/checkpoint matters, are those operators still 3 individual "states" or are they merged into one and a new
    uidhash
    is given to them? If so is this new
    uidhash
    deterministic based on my other 3 explicitly set uids?
    d
    • 2
    • 12
  • p

    Pedro H S Teixeira

    07/05/2023, 10:33 PM
    any recipes on how to add a jar (i.e connectors) to Flink lib when using the kubernetes operator?
    s
    • 2
    • 2
  • d

    dev Jiang

    07/06/2023, 9:35 AM
    hello all, when i use flink datastream:pulsar-connector. I found that something error when consume data from pulsar. As long as the number of consumers is less than the number of pulsar topic-partitions, only one consumer will be processing data, and other consumers will not be able to process data properly so i send issues on : https://issues.apache.org/jira/browse/FLINK-32550
    👀 1
  • k

    Krutik Gadhiya

    07/06/2023, 10:14 AM
    Hello all, I am using Redpanda with Flink for streaming messages and processing them, I am giving group id but it does not seem to be taking it, any one know what is am doing wrong?
    Copy code
    KafkaSource<Map<String, Object>> logSource = KafkaSource.<Map<String, Object>>builder()
                    .setBootstrapServers(BOOTSTRAP_SERVER)
                    .setTopics(SOURCE_TOPIC)
                    .setGroupId("group2")
                    .setStartingOffsets(OffsetsInitializer.latest())
                    .setValueOnlyDeserializer(new LogDeserializer())
                    .build();
    also try to list groups using
    rpk group list
    , but that didn't show any groups
    s
    a
    • 3
    • 8
1...929394...98Latest