https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • a

    Aly Ayman

    08/19/2024, 10:28 AM
    I found that is because helper is null
  • a

    Andrew Sims

    08/20/2024, 1:52 AM
    Hey everyone, I have a use case I’m considering Flink for but I have a feeling that it might be slightly unusual, can anyone help me get a feel for whether it’s a good idea or not? What we would like to do is use Flink to join 4 to 5 topics on a key and emit to a single topic which is easy for application developers to consume and project in local storage. The data in question puts these constraints on our solution - we’ll be joining probably about 4 or 5 different Kafka topics on a single key attribute - we will have records which are “arbitrarily late arriving”, which means that if we get a record we need to keep it in state forever so that the emitted join reflects the current state without missing attributes (e.g. the record from the last month).
    s
    • 2
    • 7
  • n

    Nafer Sanabria

    08/20/2024, 10:17 AM
    Hi there !, [FIXED] I have coded a regular Flink app (v1.18) that reads files from a given directory. By default, it works fine because it reads all the files, the next step is to get it working in streaming mode but I'm unable to set the runtime mode according to https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/execution_mode/
    Copy code
    $FLINK_HOME/bin/flink run -Dexecution.runtime-mode=STREAMING -c com.example.DataStreamJob target/flink-poc-1.0.jar
    I also tried by explicitly setting the runtime in the
    env
    Copy code
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    but it did not work because the job finishes in a couple of seconds. The test I want to perform is by copying a new file into the directory and check that the file is being processed What am I missing ? Any hints ?
    • 1
    • 1
  • a

    Andre Luiz

    08/20/2024, 6:06 PM
    Hello everyone! Its it possible to enable normalization when using Confluent Avro Format? I see that the schema registry client already supports this option but I do not see how to enable it.
    d
    • 2
    • 1
  • є

    Євген Шепелюк

    08/21/2024, 8:36 AM
    Hello, is it possible to setup Flink Job that uses JdbcSink to just ignore/drop failed items, if SQL error happened and do not restart the Job or fail the Job I've read this but I can't understand if such functionality ever exists - all existing restart strategies are all failing the job eventually https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/ops/state/task_failure_recovery/
  • a

    Avinash Upadhyaya

    08/21/2024, 10:52 AM
    Hello, is there a guide to using regular expressions for SIMILAR TO function in Flink SQL (Table API)? Regular expressions that work on regex101.com do not work with SIMILAR TO in Flink SQL. I want to use
    [a-zA-Z0-9~\-_.]+
    as my regular expression and my SQL query has the same. It throws the following error
    Copy code
    Invalid regular expression '[A-Za-z0-9~\-_.]+'
    	at org.apache.flink.table.functions.SqlLikeUtils.invalidRegularExpression(SqlLikeUtils.java:178)
    	at org.apache.flink.table.functions.SqlLikeUtils.sqlSimilarRewriteCharEnumeration(SqlLikeUtils.java:221)
    	at org.apache.flink.table.functions.SqlLikeUtils.sqlToRegexSimilar(SqlLikeUtils.java:285)
    	at org.apache.flink.table.functions.SqlLikeUtils.sqlToRegexSimilar(SqlLikeUtils.java:240)
    	at org.apache.flink.table.functions.SqlLikeUtils.similar(SqlLikeUtils.java:80)
    However,
    [A-Za-z0-9~\-.]+
    works. The problem is with
    _
    and it happens on https://github.com/apache/flink/blob/56c81995d3b34ed9066b6771755407b93438f5ab/flin[…]rc/main/java/org/apache/flink/table/functions/SqlLikeUtils.java If the regex contains one of the special characters defined in _SQL_SIMILAR_SPECIALS_, it throws an invalidRegularExpression exception.
    s
    • 2
    • 1
  • y

    Yaroslav Bezruchenko

    08/21/2024, 12:05 PM
    Have anyone faced with this issue:
    Copy code
    org.rocksdb.RocksDBException: WriteBatch has wrong count
    	at org.rocksdb.RocksDB.put(Native Method)
    	at org.rocksdb.RocksDB.put(RocksDB.java:955)
    	at org.apache.flink.contrib.streaming.state.RocksDBMapState.put(RocksDBMapState.java:139)
    	at org.apache.flink.runtime.state.UserFacingMapState.put(UserFacingMapState.java:52)
    .........
    	at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
    	at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
    	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
    	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    	at java.base/java.lang.Thread.run(Unknown Source)
    We are using: Flink 1.19.0 Flink Kubernetes Operator 1.9.0 Java 17.0.12+7, temurin As for dependencies for RocksDB:
    Copy code
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb</artifactId>
        <version>1.19.0</version>
    </dependency>
    This is not reproducable locally for me. Any ideas what can cause this?
    • 1
    • 3
  • a

    Arthur Catrisse

    08/21/2024, 12:33 PM
    Hello, We're using
    flink-kubernetes-operator
    We are occasionally encountering
    org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has already been submitted
    errors on our podmanagers. This completely blocks the podmanager from restarting correctly and launching new tasks. This seems to happen sometimes when a podmanager pod is rotated by the kubernetes cluster. (via karpenter). Sometimes we do see the
    SIGTERM
    in the pod logs when they are rotated out, but sometimes not. (Could some logs not go through ?) We have tried setting up high-availability (although not sure if it would solve the issue), but the config does not seem to recognise the
    kubernetes.jobmanager.replicas
    value (Tried setting at 2, but it is still interpreted as 1). We have defined
    high-availability.type: kubernetes
    high-availability.storageDir
    (and we can't explicitly set
    kubernetes.cluster-id
    it seems handled by the deployment. When we do the app does not start) Do you have any ideas what could be causing these instabilities ? Thanks. Our flink confs ⬇️
    a
    • 2
    • 3
  • g

    Guruguha Marur Sreenivasa

    08/21/2024, 3:30 PM
    Team, we have enabled autoscaler on application clusters with Flink 1.18. Based on how it works, it looks like the cluster restarts everytime to scale the job up or down. Can we apply this to session clusters as well? We have clusters that run about 50 jobs / cluster. If the cluster restarts in the name of auto scaling, this is going to cause a huge restart chain of all those jobs, right? Or does it work differently in session cluster?
  • j

    Jonas Brami

    08/21/2024, 5:12 PM
    Hey guys, Is somebody using the Flink Kubernetes Operator in Session mode with multiple jobs defined by CRD (PyFlink) ? I think there is some kind of race condition causing errors when 2 FlinkDeployment instances are being applied for the same session at the same time. One of the FlinkDeployment get submitted successfully, but the other one always get a DuplicateJobSubmissionException but eventually (few seconds later, after the first retry) succeed. I am running Flink 1.19 on the operator and the flink runtime
    • 1
    • 7
  • s

    Srivatsav Gorti

    08/22/2024, 3:30 PM
    Hello community, We have a single cluster (session mode) which executes close to 250 (flink session jobs) jobs per day and at times we are running into the following issue, I have attached the stacktrace as well. This is happening in our production. Could someopne please help? gist :
    Copy code
    Caused by: java.io.FileNotFoundException: /tmp/flink-web-b7c8bf9d-504f-4c17-bf1e-b10ce5f8b242/flink-web-upload/1e56ab19-b842-41f0-9ca4-00cf4936ae81_release-1.0-jar-with-dependencies.jar (No such file or directory)
    Caused by: org.apache.flink.util.FlinkException: Failed to execute job
    Jar size is barely 1MB all dependencies are baked into the class path of the cluster.
    StackTrace.txt
  • d

    D. Draco O'Brien

    08/22/2024, 5:20 PM
    Given the context and the error details, it appears that the root cause of your issue lies in the inability to locate the JAR file necessary for executing one of the Flink jobs on your cluster. Here are some targeted steps to troubleshoot and potentially resolve this issue in your production environment: Immediate Steps 1. Check the Temporary Directory: Given the path
    /tmp/flink-web-b7c8bf9d-504f-4c17-bf1e-b10ce5f8b242/flink-web-upload/
    , verify if the directory exists and if there are any recent changes in your system’s cleanup policies that might be deleting files from
    /tmp
    . Some systems automatically clean up
    /tmp
    directories, which could be causing the issue. 2. Job Submission Process: Investigate how jobs are being submitted to the Flink cluster. Ensure that the JAR file is correctly uploaded and referenced during job submission. If you’re using a web interface or REST API, check the logs for any hints about failed uploads or misconfigurations. 3. Disk Space: Confirm that there is enough disk space on the nodes where Flink is running. A full disk can prevent file uploads and cause similar errors. Long-term Resolutions & Best Practices 1. Customize Flink’s Blob Server Path: Instead of using the default
    /tmp
    directory, consider configuring a more persistent storage location for the Flink blob server. You can do this by setting the
    blob.server.base-dir
    property in your Flink configuration file (typically
    flink-conf.yaml
    ). Choose a directory that is less likely to be cleaned up automatically and has ample space. 2. Job Management: Implement checks before job submission to ensure that the required JAR files are present and accessible. This can be done as a pre-flight check in your job submission scripts or application logic. 3. Monitoring & Logging: Enhance monitoring around the job submission process and the blob server activity. Use Flink’s built-in metrics and consider integrating with your monitoring system to get alerts when the disk space is running low or when job submissions fail due to missing files. 4. Resource Management: Review your resource allocation, particularly for the JobManager and TaskManagers. Insufficient resources can lead to failures in managing or executing jobs. 5. Version Compatibility: Ensure that the version of Flink used for compiling the JAR matches the version running in the cluster. Mismatched versions can sometimes cause unexpected behavior.
    s
    • 2
    • 4
  • d

    D. Draco O'Brien

    08/22/2024, 5:22 PM
    If you need further instructions on these steps let us know! Be sure to backup data before making any adjustments.
  • o

    Odin Wang

    08/23/2024, 1:08 AM
    I'm running into an issue where sometimes the state value I get from the RocksDB state backend is just wrong. For example • I store an integer into a value state, but can randomly get back a completely different integer. • Occasionally running into java.io.EOFException when reading a String value Does this sound like a possible issue when using the RocksDB state backend? Or am I crazy?
    👍 1
    d
    j
    • 3
    • 5
  • t

    Tudor Plugaru

    08/23/2024, 9:16 AM
    A release of the ElasticSearch connector was not cut for a very long time. Any chance we get a release so that we could update our jobs to the latest Flink versions?
  • a

    amarjeet pasrija

    08/23/2024, 1:57 PM
    Hi Everyone, I am using a FLink Kubernetive Operator 1.7 . It is not able to create automatic savepoint for Stateful job. I am using a Pyflink Jar in the Job Yaml. Have tried savepoint mechanism mentioned in the doc but not working. Anyone have fail issue. Also there are case when the Kubernetive Node is down that caused the Job to restart the savepoint in that case also not getting created.
  • s

    Sameer alwosaby

    08/23/2024, 7:41 PM
    When we update IP address in Config.yaml file from localhot to server IP. the sql client command not working, when we try the below issues come.
  • s

    Swaraj

    08/24/2024, 7:35 AM
    Hi everyone, We are currently using a POJO for storing data in Flink state. I understand that Flink utilizes the
    PojoSerializer
    for serializing POJOs. However, I have a question regarding how Flink handles serialization when one of the fields in the POJO is a generic type. Specifically, does the presence of a generic field cause the entire POJO to fall back to Kryo serialization, or will only that specific generic field fall back to Kryo serialization while the rest of the POJO continues to use
    PojoSerializer
    ? Any insights would be greatly appreciated!
    d
    k
    • 3
    • 19
  • d

    David Vittori

    08/24/2024, 3:50 PM
    Hi everyone, I'm working on a Flink application that processes streams from a Kinesis source, where we have multiple event types. I've implemented a deserializer for the Kinesis consumer that correctly maps each event to its respective POJO class based on the event key.
    Copy code
    public BaseEvent deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)
            throws IOException {
        JsonNode jsonNode = objectMapper.readTree(recordValue);
        // Determine the event type from the JSON and deserialize accordingly
        if (!jsonNode.has("key")) {return null;}
    
        String key = jsonNode.get("key").asText();
        try {
            switch (partitionKey) {
                case EventConstants.KEY_EVENT_TRIP_ENDED:
                    return objectMapper.treeToValue(jsonNode, TripEndedEvent.class);
                case EventConstants.KEY_EVENT_TRIP_REPORTED:
                    return objectMapper.treeToValue(jsonNode, TripReportedEvent.class);
                // Add more cases for other event types
                default:
                    return null;
            }
        } catch (Exception e) {
            Log.error("Failed to deserialize event", e);
            return null;
        }
    }
    However, I'm running into a
    ClassCastException
    java.lang.RuntimeException: Could not extract key from com.codistica.services.etl.events.TripReportedEvent@6169c6c5
    at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:61)
    at org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.selectChannel(KeyGroupStreamPartitioner.java:35)
    at <http://org.apache.flink.runtime.io|org.apache.flink.runtime.io>.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:55)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:140)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collectAndCheckIfChained(RecordWriterOutput.java:120)
    at <http://org.apache.flink.streaming.runtime.io|org.apache.flink.streaming.runtime.io>.RecordWriterOutput.collect(RecordWriterOutput.java:101)
    when I try to filter the stream by event type. It seems that Flink encounters an issue when it tries to extract the key, possibly due to how I've set up the deserialization or the filtering process.
    Copy code
    return eventsStream
            .filter(event -> event instanceof TripEndedEvent)
            .map(event -> (TripEndedEvent) event)
            .returns(TypeInformation.of(TripEndedEvent.class));
    My question is: Is this the best approach for handling multiple event types in Flink, or is there a better pattern or configuration I should follow? Any insights or suggestions would be greatly appreciated!
    d
    • 2
    • 2
  • a

    Aly Ayman

    08/25/2024, 10:05 AM
    I have a small problem .. I read data from kafka and deserializing using POJO class and after that I want to parse field in pojo to Long It gets this error , why it read it as "" "" and if I want skip this "" "" what i will do
    Copy code
    java.lang.NumberFormatException: For input string: ""201275004137""
    	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    	at java.lang.Long.parseLong(Long.java:578)
    	at java.lang.Long.parseLong(Long.java:631)
    	at org.orangeFlinkDS.Main$2.filter(Main.java:111)
    	at org.orangeFlinkDS.Main$2.filter(Main.java:108)
    	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
    	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
    n
    • 2
    • 14
  • a

    Aly Ayman

    08/26/2024, 5:49 AM
    I have 4 task manager each one is on separate machine and one job manager All task managers are reporting metrics to job manager as i see in web ui Except one task manager so if i have custom metrics like metric event it will be shown in graph for all tasks that runs except for this task manager All the configs is the same , we usually copy paste the config
    a
    k
    d
    • 4
    • 25
  • s

    Sandrine Bédard

    08/26/2024, 4:34 PM
    Hi all! I have a problem at work with an important service, and we're considering using Flink (right now we use Cadence). So here's a bit more about my use-case: 1. Our service handles 12B inventory updates/day. Right now, we have an API (CreateOrUpdateInventory), that saves the requests into S3 and trigger a the InventoryUpdate workflow (Cadence) 2. The InventoryUpdate workflow essentially does some decoration and saves the items into a table (
    trackable_job
    ). It then triggers another workflow, called Sync (also in Cadence), which sends data to downstream services 3. The Sync workflow receives a store ID as input (among other things), waits for 5-min for an aggregation window. After 5-min, it reads all items from
    trackable_job
    with the given store ID, and calls external APIs to sync data The issues with the current architecture are: 1. The 5-min aggregation window results in a lot of items being read at once from
    trackable_job
    (up to 170K), making the DB query to fetch items very long and inefficient 2. Multiple Cadence workflows read from
    trackable_job
    , making things worse from a DB perspective 3. We track success/failed syncs in the
    trackable_job
    table, but we don't do anything with it (e.g., publishing events to clients), so this table is used as a queue only In my design, I'm considering 2 options: • Option 1: Kafka + Cadence ◦ Replace
    trackable_job
    with a proper Kafka queue (partitioned by store ID) ◦ Modify the Sync workflow to pull from that queue. I've read Cadence isn't super easy to set up with Kafka. For example, if queues are partitioned, we need to define which Cadence worker reads from which queue. Is that true? ◦ We must have grouping logic to group items by store ID, and then make the API call downstream (requirement from external dependencies) • Option 2: Kafka + Flink ◦ Replace trackable_job with a proper Kafka queue (partitioned by store ID) ◦ Move business logic of Sync workflow into Flink. Set up Flink tasks to maximize parallelism with Kafka ◦ Add a time/count window through Flink to control to aggregation window ◦ Group events in Flink by store ID Do you think my problem is a good use-case for Flink, and what do you think of option 2? Thanks a lot!
    k
    • 2
    • 8
  • r

    Rion Williams

    08/26/2024, 6:11 PM
    Has anyone encounted the following error on the initial deployment for a Flink Job via the operator? Again - this is the initial deployment for the job, no JobManager pods ever came up and all of the jobs are in RECONCILING states:
    Copy code
    JobManager deployment is missing and HA data is not available to make stateful upgrades.
    It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.
    The jobs themselves are targeting Flink 1.18 and 1.8 for the operator itself. The only job that did successfully run was a batch-based job, but all non-batch jobs failed to start and required manual intervention. Any ideas or possibly a known issue in JIRA related to this?
  • r

    Ritesh Singh

    08/27/2024, 4:14 AM
    Hi All, Is there a way to determine the size of the state created by each operator? I just want to understand the total state size in RocksDB and how much each operator is contributing to the total size of the state.
  • h

    Hammad Hassan

    08/27/2024, 9:03 AM
    Hi all, I am looking to use AWS managed Apache Flink with RabbitMQ-Streams. However, apparently the connector for RabbitMQ (source/sink) only supports queues and not RabbitMQ-Streams plugin. Is it correct or I am missing something ?
  • a

    Aly Ayman

    08/27/2024, 10:41 AM
    If i have task manager with 16 slots and 104 cores Every slot will take one core ? Or it can be better ?
    r
    • 2
    • 2
  • a

    Abrar Sher

    09/06/2024, 8:23 PM
    Hello, anyone know how to get flink-cdc or Flink SQL to propagate schema changes from mysql -> iceberg? I'm using mysql-cdc connector at the source, and iceberg connector at the sink. It's working great, except i can't figure out how to get schema changes propagated.
    g
    • 2
    • 6
  • m

    Michael LeGore

    09/09/2024, 4:09 PM
    Hi all, I am trying to implement a low latency window system using keyed process functions like that is described here: https://flink.apache.org/2020/07/30/advanced-flink-application-patterns-vol.3-custom-window-processing I am using it for a counting process function. The function keeps track of a count state for every window, I am using timers firing to decrement the count after the window has passed, but the timer behavior is not working how I would like. Specifically timers don’t fire until the watermark passes their time. But some of the keys of the keyed process function don’t receive values that often, so the watermark might not be advanced for a while, and timer fires AFTER the event that advances the watermark, even if the timer was for a time much earlier than the latest event that advances the watermark. eg. I have a timer to trigger today at 5pm, and no events happen until today at 6pm, so the count seen when the 6pm event comes in is overcounting, because it should have decremented at 5pm, but instead it decrements after the 6pm event. Is there a way to get timers to fire before the event that advances them, provided the event is AFTER the timer (if the timer time and the event time are equal, the event should go first). I am seeing this behavior in a unit test, so I’m also wondering if its something that only will happen in unit tests because I’m creating a stream from a list, and there is not actual time involved, and a realtime running job won’t have this issue?
  • a

    Aly Ayman

    09/09/2024, 9:58 AM
    Anyone faced this problem ?
    Copy code
    Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:161)
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:212)
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:224)
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:228)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:745)
        ... 26 more
    Caused by: java.io.IOException: Too many open files
        at java.base/sun.nio.ch.EPoll.create(Native Method)
        at java.base/sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:79)
        at java.base/sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
        at java.base/java.nio.channels.Selector.open(Selector.java:295)
        at org.apache.kafka.common.network.Selector.<init>(Selector.java:159)
        ... 30 more
    d
    • 2
    • 2
  • s

    Slackbot

    09/08/2024, 12:42 PM
    This message was deleted.
    g
    • 2
    • 1
1...9495969798Latest