https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • m

    Maher Turifi

    10/25/2022, 11:35 AM
    Hi, we have a temporal join on processing time running in Kinesis data analytics Application with parallelism 24. this will join a main stream data with a lookup table( which has only one key for now). the issue we are having is that it is running very slow and it is only using one SubTask to do the join as you can see in the attached screenshot(the other 23 SubTask are doing nothing). how can we increase the parallelism and make use of the other SubTasks to enhance the performance?

    https://apache-flink.slack.com/files/U045154GZEW/F048CKFN5R7/screenshot_from_2022-10-25_14-56-06.png▾

    h
    • 2
    • 2
  • t

    Thiruvenkadesh Someswaran

    10/25/2022, 7:32 PM
    anybody ever see
    Copy code
    o.a.f.k.o.r.ReconciliationUtils [ERROR][default/basic-example] Validation failed: Forbidden Flink config key: kubernetes.cluster-id
    🧵 1
    m
    • 2
    • 2
  • e

    Erwin Cabral

    10/25/2022, 11:10 PM
    Hi again. In the documentation of the Flink Kubernetes Operator (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#running-suspending[…]leting-applications), it mentioned something setting the states from "running" to "suspended. When set to 'suspended', is it using the "--drain" flag (similar to the CLI option)? Or would deleting the flinkdeployment resource use the "--drain" flag option by default?
    g
    • 2
    • 6
  • j

    Jirawech Siwawut

    10/26/2022, 5:18 AM
    Hi. As far as i know, Flink SQL does not support assigning UID to operator for some reason, and UID is quite essential for job recovery. I wonder will Flink support this in the future? Recently, i have problem where i tried to update Flink SQL and found that my job does not recover from savepoint correctly. The job is recovered, but it still use the previous sql version.
    m
    • 2
    • 2
  • a

    Aqib Mehmood

    10/26/2022, 7:24 AM
    Hi All, I am using this query to compare difference between last and second last price of our order sku
    Copy code
    WITH CTE AS (
        SELECT a.sku, a.name, a.updatedAt, b.price FROM (    
            SELECT sku, name, max(updatedAt) AS updatedAt from (
                SELECT sku, name, updatedAt FROM wms.PurchaseOrderProduct
                WHERE CONCAT(sku, DATE_FORMAT(updatedAt, '%Y-%m-%d %H:%m:%s')) not in (
                    SELECT CONCAT(sku, DATE_FORMAT(updatedAt, '%Y-%m-%d %H:%m:%s')) FROM (
                        SELECT sku, max(updatedAt) as updatedAt from wms.PurchaseOrderProduct
                        GROUP BY sku
                    ) AS x
                )
            ) AS z
            GROUP BY sku, name
        ) AS a
        LEFT JOIN wms.PurchaseOrderProduct b
        ON a.sku=b.sku AND a.name=b.name and a.updatedAt=b.updatedAt
    )
    SELECT a.sku, a.name, a.updatedAt AS latestupdatedAt, a.price AS latestPrice, b.updatedAt AS lastUpdatedAt, b.price AS lastPrice
    FROM (
        SELECT a.sku, a.name, a.updatedAt, b.price from (
            SELECT sku, name, max(updatedAt) as updatedAt from wms.PurchaseOrderProduct
            GROUP BY sku, name
        ) AS a
        LEFT JOIN wms.PurchaseOrderProduct b
        ON a.sku=b.sku AND a.name=b.name and a.updatedAt=b.updatedAt
    ) AS a
    LEFT JOIN CTE AS b
    ON a.sku=b.sku AND a.name=b.name;
    This issue is that Im getting NULLs for columns lastUpdatedAt and lastPrice. But when I run the same query on our prod database, I'm getting desired results. I suspect that flink is not processing the entire query before giving the results. I get desired results for a couple of rows in while lastUpdatedAt and lastPrice are not NULL in the beginning of the table*.* But then after that the entire two columns return NULLs I would like to know why flink is not executing the above query properly? TIA
    m
    • 2
    • 22
  • r

    Robin Cassan

    10/26/2022, 8:51 AM
    Hey all! Hope you're well 🙂 Is anyone familiar with high-availability (via Zookeeper)? In our setup, we have one single JobManager handled by k8s (which restarts automatically in case of failures) and a zookeeper cluster of 5 nodes for HA. What we think is strange is that, when one ZK node fails, connection is lost of course, but it triggers a leader election for job managers even though the ZK quorum is maintained. This in turn notifies the task manager of a leader change which fails the tasks, causing some downtime to re-download the state. I was wondering if we did something wrong for this to happen or if it's the intended behavior? It seems surprising that losing one ZK node causes a restart of the job even though both the JM and the quorum are still there
    r
    • 2
    • 4
  • l

    Lorin Liu

    10/26/2022, 9:13 AM
    Hi all, I wrote a simple flink application (kafka2kafka, moving data from topic A to topic B) with flink 1.14.4 and kafka 2.4.1. I enable DeliveryGuarantee.EXACTLY_ONCE mode in this application. However, the flink job gets failed continually with the following exception:
    Copy code
    org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka etl-spp-out-0@-1 with FlinkKafkaInternalProducer{transactionalId='etl-spp-out-flink-spp-0-5', inTransaction=true, closed=false} 
    	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405)
    	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.lang.Thread.run(Thread.java:750)
    Caused by: org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker received an out of order sequence number.
    
    
    org.apache.flink.util.FlinkRuntimeException: Failed to send data to Kafka etl-spp-out-1@-1 with FlinkKafkaInternalProducer{transactionalId='etl-spp-out-flink-spp-0-5', inTransaction=true, closed=false} 
    because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.
    To avoid data loss, the application will restart.
    	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.throwException(KafkaWriter.java:405)
    	at org.apache.flink.connector.kafka.sink.KafkaWriter$WriterCallback.lambda$onCompletion$0(KafkaWriter.java:391)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.lang.Thread.run(Thread.java:745)
    Caused by: org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. 
    This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. 
    Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception.
    Any idea about it? Thanks!
    m
    • 2
    • 4
  • t

    Tiansu Yu

    10/26/2022, 9:33 AM
    flink-s3-fs-hadoop contains a Guava which is causing version conflicts with my other libs. Is there any way I may exclude it from my class path?
    m
    h
    • 3
    • 19
  • c

    Christos Hadjinikolis

    10/26/2022, 9:42 AM
    Is there a way to apply rate limiting on a flink pipeline? It seems like this is an non desired feature. Can anyone explain why implementing something like this is not desired?
    m
    r
    • 3
    • 3
  • b

    Burns Smith

    10/26/2022, 5:42 PM
    Hi everyone, I'm using a Beam pipeline to submit Flink jobs. The pipeline is processing sensitive information that must remain encrypted at all times. Flink is configured to store checkpoints/savepoints in Azure Blob Storage. When Flink creates a checkpoint or savepoint, can the checkpoint/savepoint include sensitive information from my source data? If yes: • How can I see the actual data contained in a checkpoint/savepoint? • Is there a way to encrypt the checkpoint/savepoint in Flink before it is written to Azure Blob Storage using client-side encryption?
    a
    s
    m
    • 4
    • 10
  • i

    Iris Grace Endozo

    10/27/2022, 12:55 AM
    Hey everyone, anyone ever encountered job manager crashes (causing a job restart) leads to a consistent increase in network memory usage? After a few crashes network memory usage goes to 100% and causes a job restart loop.
    👀 1
    r
    c
    • 3
    • 12
  • k

    Krish Narukulla

    10/27/2022, 3:18 AM
    Where can i find Flink SQL syntax to insert records to array field and map field?
    Copy code
    message PersonRecord {
      string name = 1;
      int32 age = 2;
    
      enum PhoneType {
        MOBILE = 0;
        HOME = 1;
        WORK = 2;
      }
    
      message PhoneNumber {
        string number = 1;
        PhoneType type = 2;
      }
    
      repeated PhoneNumber phones = 4;
    }
    
    // Our address book file is just one of these.
    message AddressBookRecord {
      repeated PersonRecord people = 1;
    }
    a
    s
    • 3
    • 4
  • m

    M Harsha

    10/27/2022, 6:47 AM
    Hi all, I have a question regarding the adaptive scheduler Say that there are 2 task managers with 4 slots each I submit a job with 2 tasks and default parallelism of 3 When scheduled, 3 slots on each task manager is used Say one task manager dies Now there is 1 slot free Assuming that slot-sharing is enabled, I have observed that the killed tasks get scheduled on the running task slots What happens if slot-sharing is disabled One of the sub-tasks running needs to be killed so that the tasks can now have the parallelism of 2 How is the decision made as to which sub-task will get killed, and what about the data it has processed? Also is there a way to disable slot sharing via sql (using the sql client to submit job)
  • f

    Felix Angell

    10/27/2022, 9:58 AM
    Hey there, We are writing an enricher that uses custom sessionisation logic with a lower level function KeyedProcessFunction (due to use-cases around enriching all events within a window). We have two sources from Kinesis and Kafka, where Kafka is consumed much faster than Kinesis (by days). This is giving us some doubts in that, say we have to extend our out of bounded watermarking strategy to have a lateness window of multiple days, would the state accumulate potentially thousands or hundreds of thousands of events until this lateness period is over, or does the watermark not advance and there is backpressure on the kafka consumer whilst the Kinesis source catches up? i.e. we idle doing nothing for a loong time.
    k
    • 2
    • 13
  • m

    Mihaly Hajdu

    10/27/2022, 10:04 AM
    Hello community! We have a
    KeyedProcessFunction
    with the number of keys being somewhere around 500.000. The logic is heavily time based and runs the production workload in a streaming fashion on unbounded datasources. Before deployments, we want to have the same pipeline being executed against a fixed, goldenish, bounded dataset for testing purposes. And issues start to arise at this point. We figured, this is primarily because of watermarks handled on a per operator-instance level while our use case demands a per key level behaviour. We came up with a way to handle late/out of order by becoming independent from the watermark, but we are still left with the timers firing incorrectly as they are driven by the watermarks. Having more keys per operator instance feels like a common use case and I wonder how others approach this.
  • k

    Konstantin

    10/27/2022, 10:53 AM
    [flink-table-store] Does the Flink Table Store 0.2.1 support S3 as a data warehouse directory? Trying it out (with minio), there are no errors when instantiating the catalog, but no
    default
    database is created and manually creating a database also seems to fail silently.
    c
    • 2
    • 1
  • s

    Sachin Saikrishna Manikandan

    10/27/2022, 3:42 PM
    Hello team, I have a need to make an external call to a database and hence I am using the RichAsyncOperator. Is there a way to cache the DB results in a MapState so that I avoid calling the DB every time and instead use a TTL to evict entries?
    s
    • 2
    • 1
  • v

    vignesh kumar kathiresan

    10/27/2022, 8:57 PM
    [Flink Kubernetes rocksdb backend volume type] Hello all, What is the recommended practice to use a volume for the rocksdb backend in a flink kubernetes standalone setup. Currently have a "local" volume type used for the purpose. We have local persistent volumes and persistent volume claims for each node's local ssd disk. And use one such PVC in the task manager deployment as volume mounts. All my task manager replicas use the same local ssd disk and write/read to them. But with that all my task manager replicas go to the same node. Obviously because of the node-local volume affinity. Can we use hostpath? but kubernetes recommends against it for security reasons. also does it work for rocks db backend i.e does replacement pod need to know the existing rocksdb state Better solution I can think of is to have statefulsets for task manager instead of deployment and have a pvc template with the pvc backed by dynamic external volumes like AWS EBS. Any pointers would be appreciated. Thanks
    👍 1
  • s

    Sylvia Lin

    10/27/2022, 9:28 PM
    For Flink batch job on k8s managed by flink operator, do we support k8s cron job scheduler? https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/
    y
    • 2
    • 2
  • m

    Matt Fysh

    10/27/2022, 10:34 PM
    Is there a repository of Flink examples? I’m looking for something that shows how to write to multiple outputs. I have the following code, but it pauses at the first statement, I assume because it runs suspended in streaming mode:
    Copy code
    table_env.execute_sql("INSERT...
    table_env.execute_sql("INSERT...
    j
    • 2
    • 8
  • v

    Vaibhav Gubbi Narasimhan

    10/28/2022, 4:00 AM
    Hi all, Is the queryable state being phased out of flink?
    • 1
    • 1
  • s

    Sumit Nekar

    10/28/2022, 4:08 AM
    Hello folks, We are using rocksdb as statebackend for our checkpointing and storing the checkpoints in azure storage. Basically checkpoints are flushed using network. if i mount a pvc with file system as a backend ( efs or any other file system) , will it improve checkpointing latency and help to avoid checkpointing failures?
  • m

    Matt Fysh

    10/28/2022, 4:25 AM
    Kind of a strange one here - the version of avro in my
    --pyFiles
    conflicts with the one wanted by the pyflink executor, is there a clean-ish solution to resolve?
  • m

    M Harsha

    10/28/2022, 6:58 AM
    Hi all, Is there a way to disable slot sharing while submitting a job via the SQL Client?
  • a

    Adesh Dsilva

    10/28/2022, 8:47 AM
    Protobuf format
    Flink now supports the Protocol Buffers (Protobuf) format. This allows you to use this format directly in your Table API or SQL applications.
    Couldn't find more information on this. Does this mean I no longer have to register my protobuf class using a 3rd party protobuf serializer with kryo? Will it now work seamlessly like Avro generated classes in DataStream code?
    m
    h
    • 3
    • 7
  • m

    Matt Fysh

    10/28/2022, 9:39 AM
    Does anyone know how I can workaround this existing bug? https://issues.apache.org/jira/browse/FLINK-23860 “Conversion to relational algebra failed to preserve datatypes” In the error message output, when I compare “validated type:” and “converted type:” the only difference is that validated type has
    RecordType:peek_no_expand
    and the other has
    RecordType
  • c

    chunilal kukreja

    10/28/2022, 11:38 AM
    Hi Team, While compiling 1.15.2 version, i am getting error
    Copy code
    Failed to execute goal on project flink-connector-hive_2.12: Could not resolve dependencies for project org.apache.flink:flink-connector-hive_2.12:jar :1.15.2: Failed to collect dependencies at org.apache.hive:hive-exec:jar:2.3.9 -> org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde: Failed to read artifact descriptor for org.pentaho:pentaho-aggdesigner-algorithm:jar:5.1.5-jhyde
    maven central does not have this jar. https://mvnrepository.com/artifact/org.pentaho/pentaho-aggdesigner-algorithm/5.1.5-jhyde Is there any workaround available other than referring to spring repo to get this dependency.?
    m
    • 2
    • 3
  • c

    chunilal kukreja

    10/28/2022, 12:32 PM
    Hi Team, Getting below error for 1.15.2 version,
    Copy code
    Could not resolve dependencies for project org.apache.flink:flink-dist_2.12:jar:1.15.2: The following artifacts could not be resolved: org.apache.flink:flink-dist-scala_2.12:jar:1.15.2, org.apache.flink:flink-examples-streaming-state-machine_2.12:jar:1.15.2: Could not find artifact org.apache.flink:flink-dist-scala_2.12:jar:1.15.2 in central (<https://repo.maven.apache.org/maven2>)
    I tried changing diff repo’s (including my corp artifactory as well) but nothing worked.. I didn’t faced this with earlier version 1.15.1.
    d
    d
    +2
    • 5
    • 9
  • j

    Joris Basiglio

    10/28/2022, 1:44 PM
    Hey Team, I'm trying to perform some analytics on my (pretty large) flink cluster savepoint . I've used the State Processing API to create a Stream of my keyed state operator but every time I'm trying to run the program I end up OOM because flink first try to load the entire key state in an in memory state map before streaming it to my processor. Am i doing something wrong with this API? or is it it's expected behavior? This is what my simple code look like:
    Copy code
    def main(args: Array[String]): Unit = {
        val savepointLocation = "<s3://my-bucket/savepoint/savepoint-f60fde-f0680e680d7e>"
    
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.setParallelism(1)
        val savepoint = SavepointReader.read(
          streamEnv,
          savepointLocation
        )
        savepoint
          .readKeyedState(
            "my-operator",
            new ReaderFunction(),
            scala.createTypeInformation[MyOperatorKey],
            BasicTypeInfo.STRING_TYPE_INFO
          )
          .filter(new TestFilterFunction())
          .addSink(new DiscardingSink[String]())
    
        streamEnv.execute()
      }
    r
    c
    • 3
    • 4
  • m

    M Harsha

    10/28/2022, 1:50 PM
    Hi all, I'm trying to sink records from window process function to postgres The issue is that the process function returns
    List<Object>
    And so I cannot use the Jdbc sink directly( this function expects the input to be a single object not a list) Is there any workaround for this apart from writing a custom sink function?
    ✅ 1
    j
    • 2
    • 2
1...272829...98Latest