https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • p

    Pankaj Singh

    06/20/2023, 10:08 AM
    Hi team, Language: Python (pyflink) Is it possible to write to different files (StreamingFileSink) based on some meta data like table_name or event_type in incoming data stream? As per this doc: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/datastream/filesystem/#bucket-assignment pyflink only supports
    DateTimeBucketAssigner
    and
    BasePathBucketAssigner
    Is it not possible to write custom bucket assigner in python? or is there any better way to do this?
    • 1
    • 2
  • r

    Raghunadh Nittala

    06/20/2023, 11:44 AM
    Hi All, I’m consuming data from a compacted kafka topic using
    upsert-kafka
    connector and converting the same to a DataStream using
    .toChangelogStream
    . I have another DataStream from source kafka created using
    kafka
    connector. Now, I’m doing a keyBy on both of these and connecting them using a
    KeyedCoProcessFunction
    implementation. I’m facing an issue where I see huge number of records in the source kafka operator in the flink UI, even though I do not have those many records being published to that topic. When I avoid
    .connect
    I could see less records in the source operator. Any idea what could be wrong here?
  • n

    Nishant Goenka

    06/20/2023, 12:32 PM
    HI All, I am storing value state in rocksdb but after job restart value state is not getting picked up from persistence volume storage. Do I have to specify TTL, even if we don't have to clean the state ever. P.S. we are using Kueberntes PVC for RocksDB storage with persistentVolumeReclaimPolicy as "Retain"
    👀 1
    a
    • 2
    • 2
  • j

    Jirawech Siwawut

    06/20/2023, 1:26 PM
    Can anyone from community help review this PR? It’s been quite a while and there is no progress on this
  • t

    Theodore Curtil

    06/20/2023, 1:31 PM
    Hi guys, We are trying to consume data from Kafka using protobuf format, with the SQL Client. Code looks like this:
    SET 'state.checkpoints.dir' = '<s3://state/checkpoints>';
    SET 'state.backend.incremental' = 'true';
    SET 'execution.checkpointing.unaligned' = 'true';
    SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
    SET 'execution.checkpointing.interval' = '10s';
    SET 'execution.checkpointing.min-pause' = '10s';
    SET 'sql-client.execution.result-mode'='TABLEAU';
    SET 'parallelism.default' = '1';
    ADD JAR '/opt/sql-client/lib/flink-sql-connector-kafka-1.16.0.jar';
    ADD JAR '/opt/sql-client/lib/flink-protobuf-1.16.0.jar';
    ADD JAR '/opt/sql-client/lib/original-kafka-send-proto-0.1.0.jar';
    -- SYNTHETIC EVENTS GEO
    DROP TABLE IF EXISTS TEST;
    CREATE TABLE TEST (
    name STRING
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'protos_topic_cards',
    'properties.bootstrap.servers' = 'kafka_broker:29092',
    'format' = 'protobuf',
    'protobuf.message-class-name' = 'com.example.CardData',
    'protobuf.ignore-parse-errors' = 'true'
    );
    SELECT * FROM TEST;
    And the
    original-kafka-send-proto-0.1.0.jar
    does contain the protobuf Java class com.example.CardData. Still, the SQL script fails in the SQL Client with the error: Flink SQL> CREATE TABLE TEST (
    name STRING
    ) WITH (
    'connector' = 'kafka',
    'topic' = 'protos_topic_cards',
    'properties.bootstrap.servers' = 'kafka_broker:29092',
    'format' = 'protobuf',
    'protobuf.message-class-name' = 'com.example.CardData',
    'protobuf.ignore-parse-errors' = 'true'
    )[INFO] Execute statement succeed.
    Flink SQL> [ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: com.example.CardData Shutting down the session... done. Anyone ever faced this issue with protobuf?
    • 1
    • 1
  • c

    Carlos Santos

    06/20/2023, 2:43 PM
    Hi, I need some help with understanding a detail with savepoints. Let's assume that I've started my app from a savepoint, let it run for a while it took some check points then something happen and it restarted. For some reason it fail to start from the latest checkpoint. Because of that it restarted from the savepoint that I passed in the start up. My question is, how can I stop this behaviour? I don't want it to start from the savepoint because of the kafka offsets + other operators states. Thank you
  • b

    Bruno Filippone

    06/20/2023, 4:34 PM
    Hello! I’m using the Flink Kubernetes Operator to define a session cluster and a FlinkSessionJob. I am able to use s3 URIs for high availability and checkpoints/savepoints but not for the
    job.jarURI
    part of a session job, where I get the error:
    Could not find a file system implementation for scheme 's3'
    . What am I missing? Here are my Kubernetes resources for context (shortened versions):
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: session-cluster
      namespace: flink-jobs
    spec:
      image: flink:1.17
      flinkVersion: v1_17
      jobManager:
        [...]
        podTemplate:
          spec:
            containers:
              - name: flink-main-container
                env:
                  - name: ENABLE_BUILT_IN_PLUGINS
                    value: flink-s3-fs-presto-1.17.1.jar;flink-s3-fs-hadoop-1.17.1.jar
      taskManager:
        [...]
        podTemplate:
          spec:
            containers:
              - name: flink-main-container
                env:
                  - name: ENABLE_BUILT_IN_PLUGINS
                    value: flink-s3-fs-presto-1.17.1.jar;flink-s3-fs-hadoop-1.17.1.jar
      serviceAccount: flink
    ---
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: session-cluster-job
      namespace: flink-jobs
    spec:
      deploymentName: session-cluster
      job:
        jarURI: <s3://flink-test-bucket/TopSpeedWindowing.jar>
        parallelism: 4
        upgradeMode: stateless
    z
    • 2
    • 11
  • i

    Ilya Sterin

    06/20/2023, 5:35 PM
    How would one achieve better performance for Flink SQL joins to ensure data isn't being sent around from one instance to another for each join? Like, is there a way to ensure data affinity, meaning all the tables are consistently keyed and thus end up on same instance? Am I thinking about this wrong?
    m
    • 2
    • 4
  • k

    kiran kumar

    06/20/2023, 5:55 PM
    Hi All, I am trying to start flink application with the below standalone-job command in the local system. But I am getting "org.apache.flink.util.FlinkException: Could not load the provided entrypoint class.". The class name is correct and even the jar path is correct. Can someone please help here what is going wrong.
    Copy code
    sh standalone-job.sh start-foreground -jar ~/Documents/Repository/sherlocksvc/sherlock/target/sherlock-0.0.1.jar --job-classname com.cashfree.sherlock.Application
    • 1
    • 1
  • z

    Zhong Chen

    06/20/2023, 11:03 PM
    The Flink k8s operator failed to upgrade one of my applications due to the below error. However, when I checked the logs of that application, I could see the app was creating checkpoints without any issues.
    Copy code
    >>> Status | Error   | STABLE          | {"type":"org.apache.flink.kubernetes.operator.exception.ReconciliationException","message":"java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.","throwableList":[{"type":"java.util.concurrent.ExecutionException","message":"java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed."},{"type":"org.apache.flink.util.SerializedThrowable","message":"java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed."}]}
    • 1
    • 4
  • r

    Ravi Nishant

    06/20/2023, 11:11 PM
    Hi All, re-posting this from a few days back.
  • v

    Vishal bharatbhai Vanpariya

    06/21/2023, 5:48 AM
    Hi Team, i am using 1.14.3 version. i am using SinkTo method for kafkasink. i was working properly in prod but recently we have build new docker image and deployed again with the same version but we are getting SinkTo method is not found in prod.
    k
    • 2
    • 1
  • d

    Dr Ravi Tomar

    06/21/2023, 7:52 AM
    I want to use GEOLOCATION function in flink sql. I am having lat, long data in table and want to calculate distance. any help is appreciated.
    g
    • 2
    • 2
  • d

    Diogo Santos

    06/21/2023, 10:45 AM
    Hello 👋 I’m using Flink 1.14.5 and I’m trying to understand what is affecting the trigger time of the checkpoints In my scenario: • Checkpoint interval: 1s • Minimum Pause Between Checkpoints: 500ms • Unaligned Checkpoints are enabled • Delivery semantic is exactly-once What is happening is that the checkpoints aren’t predictable, i.e, they can be triggered 5seconds after the latest ack checkpoint as can be triggered 10 seconds, 2 seconds and it is very rare to be 1 second Does someone have some insight of what I should pursue? 🙏
    g
    • 2
    • 11
  • k

    Keyur Makwana

    06/21/2023, 1:07 PM
    Hello I am using flink for large data process, But when i pass around 50k logs per second, My flunk job goes to 100% busy. I have tried by increasing parallelism and flink memory but it did not work for me. Can anyone help me?
    m
    j
    • 3
    • 29
  • a

    Akshat

    06/21/2023, 1:25 PM
    Hello We are using apache beam over flink and we were wondering if we can have a cache for parallel pipelines which are running which have the same source url ?
  • m

    Mikhail Spirin

    06/21/2023, 1:28 PM
    UPD: this is not an iossue with kinesis sink, its temporal join question - and actually exactly this problem https://stackoverflow.com/questions/70688670/flink-temporal-join-not-showing-data Hi! If anyone has an experience with aws kinesis sink cinnector - i will be really happy to hear your thoughts! Question is the following: i cant understand how bundling of outcoming records works. I found to settings which affects this: • ‘sink.batch.max-size’ seems to define max number of outcoming records, and • ‘sink.requests.max-buffered’ seems to define maximum difference between already sent and what is now in buffer. As soon as new records come and increase this difference more than this setting, records are sent to AWS. Question: if there are no new records in kinesis sink, this leftover seems to stuck there forever, waiting until new records will come. I see no timeouts which affects this behaviour. Basically i need “enrichment” pattern - when some timeout comes, send record even if it’s 1 record. How to achieve this?
    j
    a
    • 3
    • 22
  • d

    Dr Ravi Tomar

    06/21/2023, 4:35 PM
    Hi all, question: Flink SQL: i am having the data containing id,temperature,humidity, carbonmonooxide level in flink table from a kafka topic. I want to produce an alert to alert topic when temp>50;humidity<10;CO>100 for 5 minutes. I understand this needs to be done thru tumbling window. Can anyone help me with approach/SQL syntax? 🙏
  • g

    Giannis Polyzos

    06/21/2023, 4:48 PM
    I have some examples here for sensor data https://medium.com/@ipolyzos_/streaming-sql-with-apache-flink-a-gentle-introduction-8a3af4fa3194 You can also leverage udfs for more flexibility if needed
    🙏 1
    d
    • 2
    • 2
  • d

    dp api

    06/21/2023, 4:49 PM
    jdbc_url = os.environ.get('MYSQL_JDBC_URL')
    jdbc_table_name = os.environ.get('MYSQL_TABLE_NAME')
    jdbc_username = os.environ.get('MYSQL_USERNAME')
    jdbc_password = os.environ.get('MYSQL_PASSWORD')
    sink_mysql = f"""
    CREATE TABLE pending_orders_table (
    rest_id VARCHAR,
    pending_count INT,
    PRIMARY KEY (rest_id) NOT ENFORCED
    ) WITH (
    'connector' = 'jdbc',
    'url' = '{jdbc_url}',
    'table-name' = '{jdbc_table_name}',
    'username' = '{jdbc_username}',
    'password' = '{jdbc_password}'
    )
    """
    t_env.execute_sql(sink_mysql)
    I am getting an error when I am running this script in a docker container with environment variables set accordingly. The error is something like - Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered "jdbc" at line 8, column 19. Was expecting one of: "UESCAPE" ... <QUOTED_STRING> ... ")" ... "," ... What needs to be changed syntactically for this ? Thanks in advance!!
  • r

    Razin Bouzar

    06/21/2023, 6:17 PM
    Looking for some guidance on overriding Kafka admin client configs. The partition discovery feature sometimes fails to connect with kafka and we'd like to implement retries. There is no clear documentation on which settings need to be changed. admin client configs of note: • default.api.timeout.ms -- applies for client ops which do not specify a timeout param. • reconnect.backoff.max • request.timeout.ms • retries • retry.backoff.ms
    👍 1
  • o

    Oscar Perez

    06/21/2023, 6:42 PM
    hei! one question regarding taskmanagers and task slots. I have a flink cluster with task num slots = 6 which is defined because the job with maximum parallelism is 6 as well. Is that correct? I also understand that task managers spawn as long as new slots are needed. Is there a limit for that? A task manager is a JVM process but has some memory configured per task manager. Is this the bottleneck for spanning more task managers? Thanks!
    d
    • 2
    • 7
  • k

    Kyle Ahn

    06/21/2023, 7:33 PM
    In k8s flink operator, is there a way to submit a flink session job using cmd, not Rest API? I would like to have 1 job manager with multiple flink batch job submissions (to be specific i am running iceberg compaction). When submitting a batch job using
    flinksessionjob
    CRD, I am seeing [FLINK-24883] Use flink web ui to submit the jar throw Job client must be a CoordinationRequestGateway. This is a bug exception. Exception occurs here, but TL;DR is that the batch job needs a long-lived client, but flink jobs submitted via Rest API doesn’t have that.
    Copy code
    Collecting results require a client to continuously fetching the result till the end or explicitly closes the iterator (causing the job to be cancelled). If client goes away without fetching all the results or terminating the job it will hang forever. Submitting by web UI does not fulfill this requirement (as it does not have a long existing client) so we're not supporting collect in web UI. I'll give out a more proper exception message and refine the document of collect.
    g
    • 2
    • 2
  • f

    Faisal A. Siddiqui

    06/21/2023, 10:20 PM
    Hi All, I am using flink 1.16.0 version and trying to set managed memory fraction here is snippet from my taskmanager flink-conf.yaml
    Copy code
    state.backend: rocksdb
    state.backend.incremental: true
    state.backend.rocksdb.metrics.enable: true
    state.backend.rocksdb.write-buffer-size:128mb
    state.backend.rocksdb.max-write-buffer-number:10
    state.backend.rocksdb.checkpoint.transfer.thread.num: 4
    taskmanager.memory.network.fraction:0.25
    taskmanager.memory.managed.fraction:0.5
    Since physical memory for this manager is 8GB shouldn't i get 4GB to managed memory on Flink UI or am i missing something ??
    j
    • 2
    • 3
  • r

    Rion Williams

    06/22/2023, 1:34 AM
    Hey fellow Flinkers, I’m currently working on an addition to an existing job that handles enriching events as they flow through from multiple streams. My need is to use some tenant-specific configuration to do so, which currently exists in a Kafka topic that’s populated via a CDC connector (and contains essentially the state/configuration for that tenant). In an attempt to avoid keying everything by tenant, I thought I could use Broadcast State to accomplish it through the following: • Create a new broadcast stream using a KafkaSource to read the CDC records and map them to a serializable object containing the enrichment that I need. • After performing a
    union
    of all of my other source streams (all mapped to a single common schema), connect that with the previous broadcast stream. • Apply the enrichments in an appropriate BroadcastProcessFunction where: ◦ Broadcast elements are simply stored in broadcast state (something like
    context.getBroadcastState(descriptor).put(x.key, x.value)
    ); Not sure if there’s anything more needed. ◦ Non-Broadcast elements would simply check broadcast state for the given tenant configuration and apply the enrichment. Since I haven’t used Broadcast State previously, my questions are: • Would the addition of a new stream/join/broadcast like this be a breaking change? I.e. would the job be able to be restored from the previous state/savepoint prior to the change? • Is Broadcast State safe for a scenario like this? The cardinality of the data is fairly small, possibly a thousand or so, with the objects only containing two fields. Enough to store in memory pretty easily. • Does parallelism increase any issues here? My assumption is that for parallelism >1, performing the
    broadcastState.put(…)
    operation would rebroadcast that new/updated state to all of the task managers/tasks to ensure that the same data is eventually available. • Since Broadcast state is checkpointed, how does that factor in when the job is restarted or recovers from a failure? Will it be restored/available when data begins flowing again? I’m happy to provide any more context/detail/etc. if it helps!
    • 1
    • 2
  • d

    Dr Ravi Tomar

    06/22/2023, 1:45 AM
    Why do flink kafka connector SQL query doesn’t show as a consumer group on kafka topic? Do it use some low level method to fetch data from kafka?
  • d

    Dheeraj Panangat

    06/22/2023, 7:52 AM
    Hi Team, Need help to understand why the Flink state keeps growing for any Table Sinks or Joins we do on Tables. We are using Hudi as table source and performing a streaming read. In below screenshot, the Join has grown to 1.39GB Can we use table API to just lookup and not have it store in the state and also join based on lookup ?
    g
    • 2
    • 5
  • d

    Dheeraj Panangat

    06/22/2023, 7:53 AM
  • i

    Ilia Kapanadze

    06/22/2023, 9:23 AM
    Hi everyone! I just arrived, and I'm wondering if something like this is possible using CEP (sadly, could not find any similar example in docs): example #1: Pattern: a b c Input: a1 b1 a2 b2 c1 c2 Matches: (a1, b1, c1); (a2, b2, c2) <- I only want these two matches example #2: Pattern: a b c Input: a1 a2 b1 a3 b2 c1 c2 b3 c3 Matches: (a1, b1, c1); (a2, b2, c2); (a3, b3, c3) Numbers added for clarity, in fact, they could potentially be same: a1 = a2 = a3; b1 = b2 = b3; c1 = c2 = c3; In other words, something like 'first in, first out' and each input element should be in only one match.
  • h

    Hussain Abbas

    06/22/2023, 11:04 AM
    Hello Everyone, Today i have a very unique issue. We are dealing with issue with Flink Restart. We are using Kinesis, Rocksdb and dynmodb as our infrastructure. Here everything is working fine, we can see that records are fetch, processed and pushed to dynamodb, until there is a restart. Following thing we have observed when restart occurs. 1- There is constant back pressure 2- Nothing In/out buffers for like 20min before restart 3- no resource usage, CPU usage is close to nothing 4- I see checkpoints faling at 20 minutes before that 5- Batch write to dynamodb is constant in that window. Sharing your monitoring result as well. We are still not sure what can be the cause of issue, any suggestions would really help.
    g
    • 2
    • 10
1...899091...98Latest