https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Ari Huttunen

    02/03/2023, 1:42 PM
    Is there a way to calculate the first quartile, median, and third quartile? I have some Spark code using
    pyspark.sql.functions.percentile_approx()
    and if possible, I'd like to do the same in Flink. It would be enough to be able to calculate this in specific time-based windows, like for 1 hour.
  • k

    kingsathurthi

    02/03/2023, 5:19 PM
    Hi All, Im using flink operator 1.3.0 where i have flink deployment. but flinkdeployment job status showing as reconciling
    Copy code
    [user@server ~]$ oc get flinkdeployments
    NAME      JOB STATUS    RECONCILIATION STATUS
    kmfcore   RECONCILING   DEPLOYED
    and Im getting below error in the taskmanager, what could be the issue
    [user@server]$ oc logs kmfcore-taskmanager-1-1 | grep -i fail
    2023-02-03 13:31:45,443 INFO org.apache.flink.configuration.GlobalConfiguration          [] - Loading configuration property: execution.submit-failed-job-on-application-error, true
    2023-02-03 13:31:49,839 INFO org.apache.flink.runtime.net.ConnectionUtils                [] - Failed to connect to [/192.168.9.97:6123] from local address [localhost/127.0.0.1] with timeout [100] due to: Invalid argument (connect failed)
    m
    • 2
    • 2
  • s

    Sumit Nekar

    02/03/2023, 5:40 PM
    Hello, I am getting the following error with the job deployed in application mode. This results in restart of some tasks.
    Copy code
    INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - JobManager for job 6f180ddd63b2de76c0148a53c092f02f with leader id a7b5b93e86521793b3e017e64695426c lost leadership.
    Please suggest if we can tune any parameters prevent this issue. JobManager is configured with HA (single job manager) and deployed using flink k8s operator (native k8s - Flink 1.13.6)
    m
    • 2
    • 2
  • j

    Jirawech Siwawut

    02/04/2023, 7:21 AM
    Hi all. Does anyone know how to connect to Kerborized Hive from Flink SQL Client? I am getting this error `Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)`I am able to connect it with normal Flink on Java code, but i do not know how to set this on Flink SQL Client.
  • g

    Gerald Schmidt

    02/04/2023, 8:16 AM
    Hello, I have a question about persistence for a standalone jobmanager/taskmanager deployment. We have configured savepoints and checkpoints to be written to a dedicated bucket in S3. That being the case, is there any need to have persistent volumes attached to our jobmanager and taskmanagers?
    s
    • 2
    • 2
  • s

    Siddhesh Kalgaonkar

    02/05/2023, 9:29 AM
    đź‘‹ Hello, team! I have a question about the readFile implementation. From the official docs: IMPLEMENTATION: Under the hood, Flink splits the file reading process into two sub-tasks: directory monitoring and data reading. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, non-parallel (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the
    watchType
    ), find the files to be processed, divide them in splits, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read multiple splits, one-by-one. So what do you mean by readers here? Is it some kind of interdependent task under TM or something else?
    s
    • 2
    • 3
  • s

    Siddhesh Kalgaonkar

    02/05/2023, 3:11 PM
    Why flink-clients library doesn't have support for scala 2.12 above the 1.14.6 version. If I don't use that library I get some executor factory error. Am I missing anything?
    m
    • 2
    • 8
  • a

    Ari Huttunen

    02/05/2023, 7:18 PM
    I'm looking forward to calculating a histogram with fixed bins from a streaming data column. Does someone have example code I could take a look at? I tried Google, but it provides too many false hits.. and yes, this is an alternative for calculating quantiles.
    • 1
    • 1
  • a

    Ari Huttunen

    02/05/2023, 7:49 PM
    Eh? Apache Beam can calculate approximate quantiles, and you can run Beam on Flink. Does this mean that that functionality only works on Spark, or something else?
  • j

    Joe Crowley

    02/06/2023, 4:38 AM
    hi all, i'm getting a problem with my session cluster attempting to fetch a non-existent file over and over and is causing my jobs to crash. It then reattempt the same file after restart. Im using the FLIP-27 file system connector library with the google storage plugin included in Flink. Could someone give me some ideas on how to best fix this situation?
    s
    • 2
    • 5
  • t

    Tony Yeung

    02/06/2023, 8:03 AM
    Hi all. Trying Flink CDC MySQL connector. Found 2 options
    scan.incremental.snapshot.chunk.size
    &
    scan.snapshot.fetch.size
    which are similar. Would someone can explain the differences and effect of these 2 options? https://github.com/ververica/flink-cdc-connectors/blob/release-2.3/docs/content/connectors/mysql-cdc.md#connector-options
  • t

    Tan Trinh

    02/06/2023, 9:18 AM
    Hi team! I am using flink-operater with application mode (flinkdeployment CRD) for a metrics convert application.
    taskmanager.numberOfTaskSlots: "1"
    taskmanager.memory.managed.fraction: "0.0"
    taskmanager.memory.managed.size: 100m
    taskmanager.network.memory.buffer-debloat.enabled: "true"
    execution.checkpointing.interval: 1m
    execution.checkpointing.tolerable-failed-checkpoints: "3"
    execution.checkpointing.unaligned: "true"
    state.checkpoints.dir: s3://<>
    state.savepoints.dir: s3://<>
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3://<>
    s3.endpoint: http://<>
    s3.access-key: <>
    s3.secret-key: <>
    s3.path.style.access: "true"
    kubernetes.operator.reconcile.interval: 1m
    metrics.reporters: prom
    metrics.reporter.prom.factory.class: org.apache.flink.metrics.prometheus.PrometheusReporterFactory
    metrics.reporter.prom.port: "9999"
    env.java.opts.taskmanager: "-XX:+UseParallelGC"
    I change the taskmanager.memory.managed.fraction config to 0.0 (it means managed memory size = 0) because I am not use managed memory (not using rockDBstate) But, taskmanager always be deleted and has the error code 137 (OOM killed)
    2023-02-06 07:54:33,801 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Worker log2metric-prod-example-taskmanager-1-1 is terminated. Diagnostics: Pod terminated, container termination statuses: [flink-main-container(exitCode=137, reason=OOMKilled, message=null)], pod status: Failed(reason=null, message=null)
    2023-02-06 07:54:33,802 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Closing TaskExecutor connection <> because: Pod terminated, container termination statuses: [flink-main-container(exitCode=137, reason=OOMKilled, message=null)], pod status: Failed(reason=null, message=null)
    I increase taskmanager.memory.managed.fraction config to 0.4 and the taskmanager pod is running normally So, is flink still using managed memory even though we not using rockDBstate which causes pod to use more memory than requested?
    đź‘€ 1
    g
    s
    • 3
    • 5
  • k

    Kristian Grimsby

    02/06/2023, 9:44 AM
    I'm exploring the Kubernetes Operator for Flink and are wondering how to deploy python tasks to kubernetes. In the python-example the deployment spins up a jobManager and a taskManager and creates a custom container running the python code. When deploying multiple python applications this way - wouldn't that result in a lot of taskamangers and jobmanagers? I was looking more for the SessionJob type of deployment - is that not possible with python?
  • k

    kingsathurthi

    02/06/2023, 12:40 PM
    Hi All, I'm Using Flink flink operator and deployed deployment only. then i tried submitting below session job with local filesystem as jarURI and im getting this error
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: basic-session-job-only-example-2
    spec:
      deploymentName: basic-session-deployment-only-example
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 4
        upgradeMode: stateless
    Error :
    Spec:
    Deployment Name:  basic-session-deployment-only-example
    Job:
    Args:
    Jar URI:       local:///opt/flink/examples/streaming/StateMachineExample.jar
    Parallelism:   4
    State:         running
    Upgrade Mode:  stateless
    Status:
    Error:  {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file s
    ystem implementation for scheme 'local'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a full list of supported file s
    ystems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>.","throwableList":[{"type":"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException","message
    ":"Could not find a file system implementation for scheme 'local'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded. For a ful
    l list of supported file systems, please see <https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/>."},{"type":"org.apache.flink.core.fs.UnsupportedFileSystemSchemeException
    ","message":"Hadoop is not in the classpath/dependencies."}]}
    g
    s
    • 3
    • 33
  • k

    Kosta Sovaridis

    02/06/2023, 1:00 PM
    Hi, I have a single stream which I need to split into x streams based on key type then apply different operators for them. Is it a valid solution to make a process function to create x side outputs ?
    s
    • 2
    • 1
  • r

    Richard Noble

    02/06/2023, 3:14 PM
    Hi, I'm battling to crack the problem of running the S3 Filesystem with the ORC format. Orc local works fine. JSON s3 works fine. Using them together seems challenging. Current setup: hadoop shaded in /lib orc format in /lib s3-fs-hadoop s3 in plugins/s3-fs-hadoop
    Copy code
    HADOOP_OPTS="-Dfs.AbstractFileSystem.s3a.impl=org.apache.hadoop.fs.s3a.S3A -Dfs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem"
    I get an
    Copy code
    Class org.apache.hadoop.fs.s3a.S3AFileSystem not found.
    I've tried the plugins in lib, and that worked locally, but not on the cluster. Does anyone have an example setup?
    m
    • 2
    • 7
  • t

    Tony Wang

    02/06/2023, 5:59 PM
    Hello! I am trying to process a CSV file source row by row and add watermarks to mimic real-time streaming use case. Unfortunately I can't get it to work properly, i.e. the watermark doesn't seem to be applied properly. Do people know if this is even possible? Here's my code:
    Copy code
    env.getConfig().setGlobalJobParameters(params);
            env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
            env.setParallelism(1);
            String path = "/home/ubuntu/flink/flink-examples/flink-examples-streaming/target/csv";
            final CsvReaderFormat<Pojo> format = CsvReaderFormat.forPojo(Pojo.class);
            File csvfile = new File(path);
            final FileSource<Pojo> csv_source =
                    FileSource.forRecordStreamFormat(format, Path.fromLocalFile(csvfile)).
                    monitorContinuously(Duration.ofMillis(1)).build();
    
            DataStream<Pojo> source = env.fromSource(
                    csv_source,
                    new WatermarkStrategy<Pojo>() {
                        @Override
                        public WatermarkGenerator<Pojo> createWatermarkGenerator(
                                WatermarkGeneratorSupplier.Context context) {
                            return new MyPerElementWatermarkGenerator();
                        }
                    }
                    .withTimestampAssigner((event, timestamp) -> event.event_time), "csv-source");
    d
    • 2
    • 3
  • s

    Slackbot

    02/06/2023, 6:01 PM
    This message was deleted.
    m
    t
    • 3
    • 3
  • k

    Krish Narukulla

    02/06/2023, 6:57 PM
    Is there a Serdes to convert flink rowdata to byte array and convert back to rowdata. Something like happening here https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/s[…]he/flink/connector/dynamodb/util/DynamoDbSerializationUtil.java
  • n

    Nihar Rao

    02/06/2023, 10:37 PM
    Hello team, I am a beginner with Flink and I want to register some metrics and I am reading this guide . I have written the following simple code in java:
    Copy code
    public class FlinkMetric extends RichMapFunction<String, String> {
    
    	public void playWithMetrics() {
    		// Gauge
    		Gauge<Integer> random = () -> 1;
    		this.getRuntimeContext().getMetricGroup().gauge("myGauge", random);
    	}
    
    	@Override
      	public String map(String value) throws Exception {
    	    return null;
      	}
    
    }
    This class is simply called my the main class and I am running my application as a jar by submitting it on flink web's submit job option. I am getting the following error:
    Copy code
    java.lang.IllegalStateException: The runtime context has not been initialized yet.
    I looked into it and I understand that the runtime context is null. Can you please tell me why the runtime context is not getting set automatically? Thanks
    s
    d
    • 3
    • 10
  • m

    Mikko Lehtimäki

    02/07/2023, 9:08 AM
    I'm working with the Python API and have a question about event timestamps. I'm trying to window based on event time. Events are coming from a Kafka source but I'd like to use a field from the JSON payload as event time. It's my understanding that I should provide an implementation of TimestampAssigner for this purpose and implement the extract_timestamp method. Question is, what data type or datetime format should that method return?
    d
    • 2
    • 2
  • m

    Mikko Lehtimäki

    02/07/2023, 9:45 AM
    Can I add the watermarks and timestamps after some steps of processing, like this:
    Copy code
    env.from_source(
                source,
                WatermarkStrategy.no_watermarks(),
                "Kafka Source",
            )
            # Read the json-strings into dictionaries
            .map(json.loads)         .assign_timestamps_and_watermarks(WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(SensorTimestampAssigner()))
    or must they be added to
    from_source()
    ? I'd like to load the Kafka payload string into dictionary before I extract the timestamp.
    source
    here is a
    KafkaSource
    .
    d
    • 2
    • 1
  • m

    Mikko Lehtimäki

    02/07/2023, 11:47 AM
    I'm actually not seeing output from TumblingEventTimeWindows even if I set the watermark strategy to Kafka source. Is there a complete example about this, I'm probably missing something basic?
    d
    • 2
    • 2
  • y

    Yang LI

    02/07/2023, 12:53 PM
    Hello, I am having a scala unit function which use scala
    groupBy
    API running in flink job, and it has different results when I run this function in a unit test with normal jvm context and run this function in the flink job (with parallelism 1 and only 1 taskmanager and 1 task slot ). Anyone knows something we should be careful to configure in our flink cluster so the computation result are the same in unit test and flink run time ?
    âś… 1
    • 1
    • 1
  • y

    Yang LI

    02/07/2023, 2:06 PM
    Hello, another question about scala version in flink docker image, how could I know the exact version scala inside flink docker image 1.16.1-scala_2.12-java11 ? 2.12.12 or 2.12.13?
    âś… 1
    m
    • 2
    • 6
  • s

    Sami Airaksinen

    02/07/2023, 2:56 PM
    Hello, I'm following Flink user guides to try to setup local environment where I can test very simple case of streaming data in parquet format into S3 with python runtime. I use bitnami helm charts for minio and flink, and as a client I use the simple pyflink repl found from the apache-flink 1.16.1 pypi package. My first attempt was to test with csv, but bumped into this issue https://issues.apache.org/jira/browse/FLINK-28513 . Next idea was to tryout parquet format as output (as it is my ultimate goal as dataformat in my solution), but started to think that where should I define the jar includes the
    parquet
    formatFactory. So, I'm keen to know that: 1. Which jar should I use (I'm using TableDescriptor to define my sink table)? flink-parquet or flink-sql-parquet? 2. Should I include it into cluster job and taskmanagers /opt/flink/lib/ folder or try add is as
    .add_jar(...)
    method? pyflink-shell doesn't offer --jars option during launch
    s
    d
    • 3
    • 7
  • b

    Betim Bryma

    02/07/2023, 4:47 PM
    Hello guys. I am new to Flink and learning it right now as I write this, so I hope what I am saying makes sense. I would like to apply sliding windows to a DataStream, and then for each of those Windows to perform anomaly detection, using FlinkML or maybe FlinkCEP (in fact I want to use both). My question is, which function should I use after I have created the sliding windows. So far I am trying to achieve this using the apply method, but I am not sure if it makes sense. To my understanding when the apply function is performed, then I will have all the elements within the window available. I am just not sure if what I am trying to do is completely wrong in Flink or something like that...
  • k

    Kevin Lam

    02/07/2023, 5:05 PM
    đź‘‹ Hello! I was reading the Flink Kubernetes Operator documentation and noticed that if you want to redeploy a Flink job from a specific snapshot, you must follow these manual recovery steps. Are there plans to streamline this process? Deploying from a specific snapshot is a relatively common operation and it'd be nice to not need to delete the FlinkDeployment
    m
    • 2
    • 4
  • k

    Kyle Ahn

    02/07/2023, 6:31 PM
    Is there a documentation on Flink operator compatibility with Kubernetes versions?
    g
    • 2
    • 3
  • e

    Erwin Cabral

    02/07/2023, 7:21 PM
    Hi there. I am using the Flink Kubernetes Operator and used FlinkDeployment to configured checkpoints/savepoints and high availability using S3-compatible storage, Minio. This is working fine. However, I am exploring alternatives to Minio due to its license. I found in the documentation about HDFS support for this. However, I could not find any concrete examples. I have tried using
    state.checkpoints.dir: <hdfs://namenode>:port/checkpoints
    and I get the following error:
    Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
    Any ideas? Thanks in advance.
    m
    • 2
    • 3
1...535455...98Latest