https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • k

    kingsathurthi

    03/09/2023, 8:42 AM
    I able to submit same jar multiple times with different ports but when I try to give input Kubernetes service throwing connection refused error example jobs Job 1
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: test_job
    spec:
      deploymentName: job_1
      job:
        jarURI: <https://repo.com/jar_1.jar>
        parallelism: 1
        upgradeMode: savepoint
        state: running
        savepointTriggerNonce: 0
        entryClass: <entryClass>
        args:
          - "-false,localhost,8081"
          - "-1,<service_name>,50051,3,30,false"
    Job 2
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkSessionJob
    metadata:
      name: test_job
    spec:
      deploymentName: job_2
      job:
        jarURI: <https://repo.com/jar_1.jar>
        parallelism: 1
        upgradeMode: savepoint
        state: running
        savepointTriggerNonce: 0
        entryClass: <entryClass>
        args:
          - "-false,localhost,8082"
          - "-1,<service_name>,50051,3,30,false"
    h
    • 2
    • 2
  • r

    Raghunadh Nittala

    03/09/2023, 8:57 AM
    Hello folks, I’m running a simple query on top of a table that is created with an ‘upsert-kafka’ connector. Here is the query:
    Copy code
    SELECT event_id, event_type, data_source_type FROM event_changes
    Here event_id, event_type, data_source_type are the primary key fields in the table. When run in the sql client shell, this query is giving proper results. When run as part of the job throws below error:
    Exception in thread "main" org.apache.flink.table.api.TableException: Table sink '*anonymous_datastream_sink$1*' doesn't support consuming update and delete changes which is produced by node ChangelogNormalize(key=[event_id, event_type, data_source_type])
    This is observed only when I run using tableEnv.sqlQuery() method, when I tried with tableEnv.executeSql() method, the query is running fine and I’m able to see the TableResult type. I am trying to create a DataStream out of the result so using .sqlQuery() method. Any inputs on this is appreciated. Thanks in advance.
  • m

    Mali

    03/09/2023, 10:43 AM
    I am using pyflink with k8s operator and i want to create a application with multiple jobs. How can i do that with pyflink ?
    d
    g
    b
    • 4
    • 25
  • l

    Luis Calado

    03/09/2023, 1:32 PM
    Hey folks! I hope you are all having a beautiful day. We have had a few issues yesterday we’re a single task didn’t recover from a checkpoint and we could even scale up the parallelism. Are you doing recurrent savepoints to prevent being stuck or do you expect that a savepoint can be done later?
    s
    • 2
    • 2
  • a

    Abhinav sharma

    03/09/2023, 1:39 PM
    I am running Flink v1.16 on my server and using Java I am trying to sink the flink results to Hive. But going through the documentation I see the flink v1.16 does not provide a Hive connector. Can I use some other way to achieve this?
    m
    • 2
    • 2
  • j

    Jeesmon Jacob

    03/09/2023, 3:58 PM
    Hi team, is it possible to provision standby taskmanagers with FlinkDeployment for quick recovery?
    👀 1
    g
    k
    • 3
    • 36
  • f

    Francisco Morillo

    03/09/2023, 4:58 PM
    Hi Team! I am trying to build a HybridSource reading first from an Iceberg Source and then switching to a Kinesis Data Stream once it finishes the Iceberg Table. However i am not finding any examples on how to create the hybrid source or how from there use it as a datastream.
  • a

    Andreas Kunze

    03/09/2023, 5:45 PM
    Hi, I'm currently implementing custom metrics and could use some help. What I am trying to achieve is having a simple metric reported to Prometheus that also has a custom label.
    Copy code
    class MyCustomMetric extends RichMapFunction[String, String] {
      @transient private var valueToExpose = 0
    
      override def open(parameters: Configuration): Unit = {
        getRuntimeContext()
          .getMetricGroup()
          .addGroup("MyMetricKey", "MyMetricValue")
          .gauge[Int, ScalaGauge[Int]]("MyGauge", ScalaGauge[Int]( () => valueToExpose ) )
      }
    
      override def map(value: String): String = {
        valueToExpose += 1
        return value
      }
    }
    First of all, from my understanding, the
    addGroup(key, value)
    will define a User-Variable and this will be exported to Prometheus as a label, is that correct? I configured the necessary settings for exporting metrics to Prometheus and it's working for the default Flink system metrics. However, I have some problems with my custom metric: 1. When using the code above as it is, the metric is not accessible at all, neither from the Flink Web UI nor from Prometheus 2. Without using the
    addGroup(key, value)
    part the metric is accessible from the Flink Web UI and Prometheus but, of course, with any additional custom label Can anyone help me understand what I'm doing wrong or what's missing here?
    ✅ 1
    • 1
    • 1
  • r

    Reme Ajayi

    03/09/2023, 6:47 PM
    Hi Everyone, I tried running my stream join job on Kinesis and I think I may be having some memory issues. Locally, my job runs fine with
    memory.size
    set to 4096. However, I can't set config properties on kinesis because kinesis does not allow
    flink-conf.yaml
    . This is the error stack trace below. Does any have suggestions on how to resolve this?
    Copy code
    2023-03-09 12:40:24
    java.lang.Exception: Exception while creating StreamOperatorStateContext.
    	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:255)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:268)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for WindowOperator_7e145440e6a4bf9d4540b5d7293494cc_(1/1) from any of the 1 provided restore options.
    	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
    	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:346)
    	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:164)
    	... 11 more
    Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB
    	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:296)
    	at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:444)
    	at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend.createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:97)
    	at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:329)
    	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
    	at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
    	... 13 more
    Caused by: java.lang.IllegalArgumentException: The fraction of memory to allocate should not be 0. Please make sure that all types of managed memory consumers contained in the job are configured with a non-negative weight via `taskmanager.memory.managed.consumer-weights`.
    	at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
    	at org.apache.flink.runtime.memory.MemoryManager.validateFraction(MemoryManager.java:672)
    	at org.apache.flink.runtime.memory.MemoryManager.computeMemorySize(MemoryManager.java:653)
    	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:521)
    	at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:292)
    	... 18 more
    j
    • 2
    • 57
  • r

    Ricco Førgaard

    03/10/2023, 7:52 AM
    Hi, has anyone successfully used the Flink Kubernetes Operator with Kustomize? I'm having trouble patching things under
    spec.taskManager.podTemplate.spec.containers
    . Values from the base get overwritten in the output. My setup is 3 levels of FlinkDeployment: • base (common to all jobs) • job base (common to all environments for a given job) • overlay (configuration for a particular job in a particular environment) In base I would for example mount a configmap to get a Hadoop config, in job base I would reference a common configmap, and in the overlay I would set resources for the pod. I added the
    openapi
    reference to the CRD definition, too. I did a quick test with a regular Kubernetes deployment and it seems to work as I expect. Any tips?
    m
    • 2
    • 4
  • j

    Jiri Holusa

    03/10/2023, 12:30 PM
    Hi 🙂 I'm trying to run the Nexmark (https://github.com/nexmark/nexmark) benchmark with Flink (locally so far). I currently see that the CpuMetricReceiver is not started and thus my CpuMetricSenders are failing with
    java.net.ConnectException: Connection refused
    . That makes sense. I digged through the code and it seems that the script metric-server.sh is actually never called anywhere and I also don't see it mentioned in the README. So I guess it's not a surprise that it fails. Questions: • Am I missing something? • Is this a bug in the Nexmark benchmark? How was it ever supposed to run? • If it is a bug, it seems that the Nexmark benchmark is actually not really used, is it? Are there better ways how to test Flink from a performance perspective?
    m
    • 2
    • 9
  • k

    Konstantinos Samaras-Tsakiris

    03/10/2023, 1:57 PM
    Hello! Thanks to any Flink expert here who can help me take my first baby steps with this system!! 🙇 🙏 I have a basic question about ROWTIME. I'm using the pyflink Table API and... 1. define a streaming table that has a column
    deviceTimestamp
    as ROWTIME:
    Copy code
    streamTableEnv.create_table("source",
      .schema(Schema.new_builder()
        .column('deviceTimestamp' ...)
        ...
        .watermark('deviceTimestamp', "deviceTimestamp - INTERVAL '0.04' SECOND")
    2. then join it with the output of a UDTF that adds a new
    sampleTimestamp
    column 3. then try to perform a window operation on the
    sampleTimestamp
    column I realize that I can't do this because windowing can only happen on the ROWTIME column. Do you know how I can transform the ROWTIME column, to be
    sampleTimestamp
    instead of
    deviceTimestamp
    ?
    d
    • 2
    • 3
  • r

    Reme Ajayi

    03/10/2023, 2:51 PM
    Hello! I am trying to understand how Flink stores data for Tumbling Window Joins? If I am joining data from two sources with 1000 messages each in every window, will all 2000 messages be stored in state for that window? If yes, does this mean shorter windows improve memory utilization?
  • r

    Reme Ajayi

    03/10/2023, 5:57 PM
    Another question about joins, I am joining data from Kafka topics, one of my topics is much larger than the other (10m records vs 100 000). Using Event time does not work because all the data from the smaller topic is ingested very quickly, so there are no more records on the smaller topic to advance the timestamps. What is the best way to achieve this join, asides using Processing time?
  • g

    Gerald Schmidt

    03/10/2023, 6:26 PM
    I want to grant Flink API access only to services that send a recognised authorization header. Is that a configurable option or would I need to place a proxy in front of the jobmanager?
    • 1
    • 1
  • y

    Yufei Chen

    03/10/2023, 11:26 PM
    Hi Guys, happy Friday~ We are testing out Autoscaler (https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.4/docs/custom-resource/autoscaler/) with a very simple Kafka reading and sinking Job, with the Kafka side data generation rate (1000 records per second) and Flink Job Reading rate at 8 records per second, we would expect Autoscaler should scale up to catch-up with the speed, but nothing happens in our env. Could anyone help to figure out why the backlog-based scaling (https://cwiki.apache.org/confluence/display/FLINK/FLIP-271%3A+Autoscaling) is not working in our case? Thank you! Here is the screenshot of the Job Metric: And Here is our flinkConfiguration:
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
    name: myauto
    spec:
    image: <http://ghcr.io/apache/flink-docker:1.17-snapshot-scala_2.12-java11-debian|ghcr.io/apache/flink-docker:1.17-snapshot-scala_2.12-java11-debian>
    flinkVersion: v1_17
    flinkConfiguration:
    kubernetes.operator.job.autoscaler.enabled: "true"
    kubernetes.operator.job.autoscaler.scaling.enabled: "true"
    kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
    kubernetes.operator.job.autoscaler.metrics.window: "1m"
    kubernetes.operator.job.autoscaler.vertex.max-parallelism: "16"
    kubernetes.operator.job.autoscaler.vertex.min-parallelism: "1"
    kubernetes.operator.job.autoscaler.catch-up.duration: "1m"
    pipeline.max-parallelism: "24"
    taskmanager.numberOfTaskSlots: "8"
    execution.checkpointing.interval: "1m"
    serviceAccount: flink
    👍 1
    g
    • 2
    • 2
  • a

    Amir Hossein Sharifzadeh

    03/10/2023, 11:38 PM
    Quick question: What’s the highest version of Java that works with Flink?
    j
    d
    k
    • 4
    • 5
  • k

    Kyle Ahn

    03/11/2023, 12:32 AM
    Has anyone tried using Iceberg table as Sink, but writing to multiple Iceberg tables based on some properties of RowData in runtime?
    k
    • 2
    • 6
  • s

    Sumit Nekar

    03/11/2023, 5:16 AM
    Hello Folks, We are frequently seeing following error when the FlinkKafkaProducer is handling high throughput. We are using flink 1.13.6. Any suggestions on tuning the flink kafka producer configs or how should I configure producer to retry the failed request?
    Copy code
    Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: The server disconnected before a response was received.
    Caused by: org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.
    m
    • 2
    • 5
  • y

    Yonatan Veksler

    03/11/2023, 3:08 PM
    Hi guys, I'm trying out Flink, and following the documentation to setup everything locally. After starting the cluster with
    ./bin/start-cluster.sh
    I can't open the web-ui on
    localhost:8081
    I'm using this version:
    flink-1.16.1-bin-scala_2.12.tgz
    .
    • 1
    • 1
  • a

    Amir Hossein Sharifzadeh

    03/12/2023, 1:27 AM
    I am following https://github.com/immerok/recipes/ examples and found them very useful! I am trying to figure out what’s the best approach to pass the DataStream into a processor or a function for further data analysis. Does sinkTo method sounds like a possible solution? For example, in TableDeduplicatedJoin.java:
    defineWorkflow(
    tableEnv,
    customerStream,
    transactionStream,
    workflow -> workflow.sinkTo(new PrintSink<>()));
    sinkTo, passes dataStream for print (PrintSink). Should I create a new class (i.e. DataProcessingSink implements Sink) and override
    createWriter
    method?
    d
    • 2
    • 4
  • j

    Junqi Xie

    03/13/2023, 5:49 AM
    Hi all, I'm trying to run basic-checkpoint-ha.yaml with flink-kubernetes-operator. I installed minikube with all default settings, and deployed the operator according to quick start guide. However, I'm keep getting the following error:
    Copy code
    ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
    java.util.concurrent.CompletionException: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: java.lang.IllegalStateException: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.16.1.jar:1.16.1]
    ... 4 more
    From previous discussions I learned this might be related to permission issues, so I tried the following fix:
    Copy code
    securityContext:
            runAsUser: 9999
            runAsGroup: 9999
            fsGroup: 9999
    under
    podTemplate
    . But it's still not working and getting the same error. I'm wondering if I'm missing any configurations, or if anything not default should be applied to minikube. Any help is appreciated and thanks in advance. I'm using minikube v1.25.2 and Flink v1.16.1, if it matters.
    ✅ 1
    • 1
    • 1
  • a

    Amenreet Singh Sodhi

    03/13/2023, 7:44 AM
    Hi team! I am deploying flink job in application mode on k8s cluster, and have enabled checkpoint via external nfs storage. I noticed, whenever i deploy job, it gets the same job-id. Is it the desired behaviour? I have HA enabled and a dynamic flink-cluster-name.
  • t

    Thijs van de Poll

    03/13/2023, 8:54 AM
    Hi all, I have currently a pipeline setup to read from Postgres (CDC) and use the print sink for testing purposes. I have setup automatic checkpointing to a bucket in Minio (s3). All works fine, and it is writing checkpoints as expected. However, when I want to stop the job and creating a savepoint to later restart the job from that savepoint, I get the following error:
    Copy code
    The program finished with the following exception:
    
    org.apache.flink.util.FlinkException: Could not stop with a savepoint job "6a334909c5d48819afce473ded3fd599".
            at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:587)
            at org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1033)
            at org.apache.flink.client.cli.CliFrontend.stop(CliFrontend.java:570)
            at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1102)
            at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1165)
            at java.base/java.security.AccessController.doPrivileged(Native Method)
            at java.base/javax.security.auth.Subject.doAs(Unknown Source)
            at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1878)
            at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
            at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1165)
    Caused by: java.util.concurrent.ExecutionException: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
            at java.base/java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.get(Unknown Source)
            at org.apache.flink.client.cli.CliFrontend.lambda$stop$5(CliFrontend.java:583)
            ... 9 more
    Caused by: java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
            at java.base/java.util.concurrent.CompletableFuture.encodeRelay(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.completeRelay(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
            at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:261)
            at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
            at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1275)
            at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
            at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
            at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
            at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
            at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:45)
            at akka.dispatch.OnComplete.internal(Future.scala:299)
            at akka.dispatch.OnComplete.internal(Future.scala:297)
            at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
            at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
            at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
            at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
            at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
            at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
            at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
            at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
            at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
            at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:25)
            at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
            at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
            at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
            at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
            at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
            at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
            at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
            at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
            at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
            at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
            at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
            at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
            at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
            at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
            at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
            at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
            at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
            at org.apache.flink.runtime.checkpoint.PendingCheckpoint.abort(PendingCheckpoint.java:550)
            at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2096)
            at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1062)
            at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
            at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
            at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
            at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
            at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: org.apache.flink.util.SerializedThrowable: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: pg_source[1] -> ConstraintEnforcer[2] -> Sink: print_sink[2] (1/1)#1 Failure reason: Task has failed.
            at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1375)
            at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1318)
            at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
            at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$12(StreamTask.java:1122)
            at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
            at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMail(MailboxProcessor.java:398)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsNonBlocking(MailboxProcessor.java:383)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:345)
            at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:861)
            at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:786)
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
            ... 1 more
    Note: I can create a savepoint manually, then cancel the job, and restart the job from the manually created endpoint. Any ideas why this happens and how to resolve? Thanks!
  • v

    Vitalii

    03/13/2023, 9:41 AM
    Hello. I can't find information about the compatibility of Apache flink and statefun function versions, please help. Statefun function does not work with flink version 1.16.1
    m
    • 2
    • 1
  • p

    piby 180

    03/13/2023, 10:54 AM
    Hey, I am new to Flink and would like to store messages from a kafka topic to an S3 bucket partitioned by timestamp. I have created Kafka Source Table and S3 Sink Table and am currently facing issues with aws credentials. Flink app is deployed on kubernetes with a service account attached to the pod. But the application is not picking up IRSA role to connect to S3. I have tried flink-s3-fs-hadoop-1.61.1.jar which raised Access Denied Error because it didn’t picked up instance role and I have not provided S3 access keys. I have tried flink-s3-presto-1.61.1.jar which gave me the error UnsupportedOperationException : This s3 file system implementation does not support recoverable writers. Could anyone give me a hint what should I do to make it work? I don’t want to use AWS keys as it is against our company’s policy
    m
    • 2
    • 4
  • d

    Dheeraj Panangat

    03/13/2023, 1:26 PM
    Hi Team, When using toChanlogStream with JDBC postgres, the flink completes execution after reading the table and does not wait for CDC events. What I understood from the documentation is that toChangelogStream should be able to capture the CDC events. Is my understanding incorrect ? Also do we have a sample for consuming CDC or changelog from Postgres as a stream? Appreciate any help. Thanks
    m
    • 2
    • 9
  • t

    Thijs van de Poll

    03/13/2023, 1:43 PM
    Hi, I am trying to understand how Window functions work in Flink. To give some context, I have this pipeline which streams changes from Postgres (CDC) to Iceberg. Each record is part of a group, identified by
    group_id
    . I would like to compute some statistic over the group, and it to the record that is being streamed to Iceberg. If I understand the documentation correctly (not sure?), there is different approaches to computing statistics on windows. They, however, all depend on a slice of data being available (either defined by a time interval or a row count interval) to compute the aggregations. Would there be a way to ensure that the calculations are computed over the entire group? Thanks! 🙂
    m
    • 2
    • 5
  • a

    Andrew Otto

    03/13/2023, 1:57 PM
    Hi, does anyone have any tips on monitoring and troubleshooting PyFlink memory usage (in k8s)? We are running a simple datastream enrichment app. kafka input -> enrich via http api request -> kafka output. Over time our (single, for now) taskmanager is killed by k8s because it reaches its memory limit. All of the JVM metrics look normal. On the container, I can see the python process slowly using more and more RSS mem. We’re going to try to add instrumentation to get metrics about the python enrich ProcessFunctions, but we’re kind of at a loss for what to look into.
    d
    g
    • 3
    • 17
  • m

    Mali

    03/13/2023, 2:27 PM
    Hello i am getting an error while trying to use flink with iceberg; I am using flink-k8s-operator: v1.4.0 My flink image version is : 1.16.0 I am using these jars; flink-sql-connector-kafka-1.16.0.jar, flink-sql-connector-rabbitmq-1.16.0.jar, iceberg-flink-runtime-1.16-1.1.0.jar, flink-s3-fs-hadoop-1.16.0.jar My code is;
    Copy code
    table_env.execute_sql("""CREATE CATALOG flink_catalog WITH (
                            'type'='iceberg',
                            'warehouse'='<s3://test/test1/test2>,
                            'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',
                            'io-impl'='org.apache.iceberg.aws.s3.S3FileIO')""")
    But i am getting this error; table_env.execute_sql(“”"CREATE CATALOG flink_catalog WITH ( File “/opt/flink/opt/python/pyflink.zip/pyflink/table/table_environment.py”, line 836, in execute_sql File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/java_gateway.py”, line 1321, in call File “/opt/flink/opt/python/pyflink.zip/pyflink/util/exceptions.py”, line 146, in deco File “/opt/flink/opt/python/py4j-0.10.9.3-src.zip/py4j/protocol.py”, line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o54.executeSql. : java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration at org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:207) at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:135) at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:413) at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalog(TableEnvironmentImpl.java:1426) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1172) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:730) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Unknown Source) Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source) at java.base/java.lang.ClassLoader.loadClass(Unknown Source) ... 17 more I think It is not seeing the jars the iceberg jars needs to be inside of /opt/flink/lib right ?
    m
    • 2
    • 4
1...636465...98Latest