https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Sree

    07/08/2022, 10:19 PM
    Hi everyone, Can Flink SQL or Table API load views from the JDBC catalog? I have been trying to load a view from Postgres database into Flink table environment, but I'm getting an error message "object not found". Does Flink's JDBC connector support loading views from external systems such as Postgres?
    g
    m
    • 3
    • 9
  • h

    Haim Ari

    07/10/2022, 11:56 AM
    Hello, When using
    Copy code
    kind: FlinkDeployment
    Is it possible to specify
    Copy code
    imagePullSecrets
    I’d like to use a private custom flink image (using Operator)
    g
    • 2
    • 7
  • u

    仰望星空

    07/11/2022, 2:22 AM
    Does the postgresql-CDC driver have plans to support up to 14.4? See the description currently supported to 12.
    s
    • 2
    • 2
  • b

    Bastien DINE

    07/11/2022, 7:31 AM
    Hello, I am struggling with DataSet API, I need to chain 2 databases access, so I can easily do a source1 -> map2 The fact is that map should and can not be used for dbaccess, when the request is taking too long, we have timeout in akka connection between TM & JM I know we can not chain sources : source1 -> source2 And doing just one source for 2 requests is very complicated, as we need to do multiple requests (not just 2) Doing 2 jobs with 2 .execute() is not possible too as I need those requests to be done in the same flink job (source1 -> execute -> source2 -> execute) Can someone help me ?
  • o

    Owen Lee

    07/11/2022, 9:16 AM
    I've upgraded our flink app to 1.15.1 from 1.14.4 but TaskManager metrics are not sent to the Prometheus receiver. Seems like it is related to this issue which is solved but I am not using KafkaSource nor KafkaSink. Is anyone experiencing similar problem? https://issues.apache.org/jira/browse/FLINK-27487
    ✅ 1
    c
    • 2
    • 8
  • e

    Enrique Lacal

    07/11/2022, 10:20 AM
    Hey all! Great that this slack has been created for this community! Wanted to raise awareness on https://issues.apache.org/jira/browse/FLINK-28265 and on what the process is for raising related tickets on the JIRA board? 🙂 We also raised it on https://issues.apache.org/jira/browse/FLINK-25098 but I believe Till is no longer working on Flink?
    m
    • 2
    • 3
  • a

    Akhlaq Malik

    07/11/2022, 12:20 PM
    Hi there, a general question about Savepoint vs. Checkpoint. When would one trigger/use a savepoint? Although if I understood it correctly its doing the same thing behind the curtain as checkpoints which are already being triggered by Flink automatically.
    a
    • 2
    • 4
  • j

    Jaya Ananthram

    07/11/2022, 1:08 PM
    Hello 👋 Is there a way to use S3 file system (hadoop for streaming file sink) and Presto (checkpointing) for table API in Intellij IDE? For cluster, I see that we have to maintain the structure in the plugin folder (as stated here) But, what is the case for IDE? If I add the jar in pom.xml (hadoop and presto) I get the exception
    Could not find a file system implementation for scheme 's3p'
    . Is there any preferred way to do it IDE?
    a
    • 2
    • 5
  • a

    anusca

    07/11/2022, 2:03 PM
    Hi, I'm using Flink for the first time and have some problems. First of all, my goal is to connect Kafka with Flink. I want to consume from Kafka, process the data, and produce a new topic for Kafka. My architecture is something like this (see below) The problem starts with the consumer. I don't have any errors, but I can't consume data. Can someone help me?
    m
    • 2
    • 14
  • b

    Bhanu Vattikuti

    07/11/2022, 4:43 PM
    Hi, We have been trying to using a custom image for with we have placed imagePullSecrets inside the taskManager, podTemplate, spec section. But we see 403 Image pull error after deployment. Please guide us.
    g
    • 2
    • 4
  • b

    Bhanu Vattikuti

    07/12/2022, 12:10 AM
    Hi We see error "java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored." When we are trying to enable file based HA
    m
    • 2
    • 3
  • m

    Márk Bartos

    07/12/2022, 6:53 AM
    Hey. As far as i can see from the docs and github code, queryable state is not supported for python. • Any idea when will be? • Is there a common not ugly hack to get around this?
    m
    • 2
    • 10
  • j

    Jaya Ananthram

    07/12/2022, 7:09 AM
    Hello 👋, Any idea about this issue? Looks like a bug (although not 100% sure) and the exception is not helping much to debug from end user point of view. I also tried with a simple hello world S3 CSV streaming: datagen as source and S3 CSV file streaming (s3a) as sink which produces the same exception 🤔
    m
    • 2
    • 9
  • m

    Márk Bartos

    07/12/2022, 7:20 AM
    What parts of flink cannot be replaced by going with the stateful functions sdk? how would it impact performance?
  • b

    Bhupendra Yadav

    07/12/2022, 10:31 AM
    Hey everyone. tldr: We want to use a single standalone Flink Session Cluster on Kubernetes & multiple applications(~15) using this flink session cluster. In our current setup we have 1 flink session cluster on k8s(using GCP flink operator) per app & we have a shared efs that is mounted to the task/job manager, storing the jars uploaded by /jars/upload REST api and we are triggering a job via REST api /jar/:jarid/run. Our no. of application will only increase over time, so we want to move to single standalone Flink Session Cluster, but the challenge is that these applications require different env variables(e.g. jdbc url, aws access keys, FLINK_ENV_JAVA_OPTS etc) present on the task/job manager pods during their run. Two applications can have their different jdbc URL or other env vars, so we can't put them as env vars in our standalone flink session cluster deployment. Is there a way to achieve the above, like passing the env vars, flink prop. etc in submit job REST API or some better cleaner approach? Any suggestion or help will be appreciated. Please let me know if something is unclear or need to share more details.
    g
    m
    • 3
    • 23
  • j

    Jaya Ananthram

    07/12/2022, 11:40 AM
    Hello 👋, Is there a way to use custom format in table API especially when Kafka as source? I am referring to this docs, where it says the following,
    Copy code
    But there are some configurations that do not support to set, because Flink will override them, e.g. 'key.deserializer' and 'value.deserializer'.
    If I am fine to convert my custom format to Row format using my own custom deserialiser logic, is there a way to apply this by some how? Or any work around here to use custom format in table API?
    s
    • 2
    • 4
  • j

    Jeesmon Jacob

    07/12/2022, 12:23 PM
    Hi team, does flink kubernetes operator support stateful functions or plan to support it soon?
    g
    s
    • 3
    • 6
  • s

    salvalcantara

    07/12/2022, 3:15 PM
    Hi team! I have a question regarding Flink SQL, which I'm lately getting into. So far, my experience is with the DataStream API mostly. On that context, it was easy for me to generate metrics for my operators. However, I'm just wondering which level of control there is regarding monitoring & metrics when working with Flink SQL...Is it possible to define "metrics for your queries"? Whatever that means and assuming that it makes sense 😆. At least I should be able to generate typical metrics for common connectors, e.g., messages read/produced & things like accumulated lag for the case of the kafka connector, to put an example.
    m
    • 2
    • 4
  • c

    czchen

    07/12/2022, 3:23 PM
    Is it possible to add
    ownerReferences
    in flink-operator jobmanager deployment? We use Argo CD to manage
    FlinkDeployment
    , and Argo CD needs
    ownerReferences
    to display child resources properly.
    m
    y
    • 3
    • 16
  • u

    仰望星空

    07/13/2022, 1:29 AM
    Does CDC have plans to implement dynamic new tables for the PostgreSQL database? Just like mysql.
  • a

    Aqib Mehmood

    07/13/2022, 4:59 AM
    Hi Everyone, I'm using checkpoint while using pubsub as a source and I keep getting this error.
    Copy code
    org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold
    and
    Copy code
    org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2139) [flink-dist-1.15.0.jar:1.15.0]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
    	at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    	at java.lang.Thread.run(Unknown Source) [?:?]
    Job runs fine for a while and then this happens. I've tried increasing the CheckpointTimeOut limit but that does not seem to be working. I'm not sure what other steps we can take to cater this as this is causing many discontinuities in our job. This is our current configuration for this job.
    Copy code
    StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            streamExecEnv.enableCheckpointing(1000);
            streamExecEnv.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
            streamExecEnv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            streamExecEnv.getCheckpointConfig().setCheckpointTimeout(180000);
            streamExecEnv.getCheckpointConfig().setTolerableCheckpointFailureNumber(10);
            streamExecEnv.getCheckpointConfig().setCheckpointStorage(
                new FileSystemCheckpointStorage("file:///opt/flink/checkpoints/"));
            streamExecEnv.getCheckpointConfig().setExternalizedCheckpointCleanup(
                ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            streamExecEnv.disableOperatorChaining();
            streamExecEnv.setParallelism(1);
            streamExecEnv.setRestartStrategy(RestartStrategies.fixedDelayRestart(
            10, // number of restart attempts
            Time.of(15, TimeUnit.SECONDS) // delay
            ));
    Looking forward to your responses. TIA
    g
    a
    d
    • 4
    • 3
  • l

    laxmi narayan

    07/13/2022, 5:42 AM
    Hi Folks, I want to understand how the checkpointing works internally for state store : Let’s say, I have a counter service and I checkpoint every minute, so we will have a new checkpoint file being created every minute, what will happen in the following cases : • If I am incrementing counter every minute, I believe this will create a new checkpoint file rather than updating the previous file/state-store. • what happens if I clear / delete the state, will it remove all the previous check pointed data ? • is StateTTLConfig same as clearing the state, except it clears the state after a certain duration of config. • does the checkpoint file acts as a rolling update on certain keys and maintaining all the previous change-logs ? basically, I want to minimise the state-store size and I am expecting if I delete the key it should delete all the previous states which should result in smaller checkpoint file size than earlier in my checkpoint location, if not then how do we keep / reduce the checkpoint size for short lived events. any documentation would be helpful, thank you.
    d
    • 2
    • 2
  • m

    Márk Bartos

    07/13/2022, 6:47 AM
    Hey. Does anyone have any experience choosing kotlin instead of java? how did it went?
    s
    • 2
    • 2
  • h

    HAJI

    07/13/2022, 7:18 AM
    i have kafka source and file sink in flink. data can get from source, but can't sink... I tried print, that can doing... but can't create file.. \
    a
    • 2
    • 2
  • s

    salvalcantara

    07/13/2022, 8:49 AM
    Hey folks, I'm working on a SQL runner/dispatcher along the lines of the one within the examples here: https://github.com/apache/flink-kubernetes-operator/tree/main/examples/flink-sql-runner-example (although I'm currently using the VVP, not the native Flink K8s operator). Roughly speaking, I want to make it very easy for my users to deploy SQL-based jobs but at the same time I want to monitor those SQL jobs in order to being able to disable them if they start to misbehave and/or simply excess some thresholds for the consumed/produced data records. I was wondering which would be the best way to handle this based on available out-of-the-box metrics. Although I could start by disabling the jobs manually based on some monitoring/alerting, ideally that should be done automatically in the end. If I go with the VVP, I guess I will need to rely on their REST API (as I do know for submitting/deploying the SQL scripts). If, however, I go with the Flink K8s operator, would it be possible to extend it somehow to introduce the commented control loop for potentially disabling running jobs? I'm fairly new to the Flink K8s operator (well, to K8s operators in general for what it's worth).
    g
    • 2
    • 14
  • a

    anusca

    07/13/2022, 9:26 AM
    Hi folks! I'm trying to process data in real-time using Flink (in Python), and I have some questions:
    d
    • 2
    • 6
  • y

    Yingxin Tsao

    07/14/2022, 1:56 AM
    How can I use imagePullSecrets in spec.podTemplate? I want to pull image from a private docker repo using my secret.
    g
    j
    • 3
    • 14
  • h

    HAJI

    07/14/2022, 2:24 AM
    hello china 🙂 I want to debug pyflink and jar. I find ref:https://stackoverflow.com/questions/12493870/debug-python-and-java-at-the-same-time I want to use VScode... nobody tried yet?
    😞 1
    x
    • 2
    • 20
  • l

    laxmi narayan

    07/14/2022, 6:27 AM
    Few more doubts : • what happens if the state grows beyond the size of the container , is there any optimisation in RocksDB for swap the stale data out and load it from state-store checkpoint location on need basis ?
    u
    • 2
    • 1
  • a

    Adesh Dsilva

    07/14/2022, 9:09 AM
    Hello team I just started with Flink and I am stuck at this issue. Would be great if someone could help me here. I have a folder structure like this in my local filesystem:
    Copy code
    input/
       202207130945/
         data.avro
         data.done
       202207130950/
         data.avro
         data.done 
       ...
       202207140945/
         data.avro
         data.done
    I want to read all data.avro files for
    2022071309**
    . I see that AvroInputFormat takes only a single path and if I try setFilePaths() it gives exception:
    FileInputFormats with multiple paths are not supported yet.
    So I tried using the table API using partitionedBy:
    Copy code
    tableEnv.createTable("AvroSourceTable", TableDescriptor.forConnector("filesystem")
            .schema(schema)
            .option("path", "input")
            .partitionedBy("pt") // defined as .column("pt", DataTypes.CHAR(12))
            .format(FormatDescriptor.forFormat("avro").build())
            .build());
    and then
    Copy code
    tableEnv.from("AvroSourceTable").where($("pt").isEqual("202207130945"))
    But it gives this exception while writing to table
    Partition keys are: [pt], incomplete partition spec: {}
    Am I doing this wrong? Is there an easy way to read multiple directories in Flink?
    a
    • 2
    • 8
1...678...98Latest