https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Sergio Sainz

    03/29/2023, 5:52 PM
    Hello everyone đź‘‹ , When flink jobs are being checkpointed, and after the job is cancelled, the checkpoint is indeed deleted (as per
    execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION
    ), but the job-id folder still remains:
    Copy code
    [sergio@flink-cluster-54f7fc7c6-k6km8 JobCheckpoints]$ ls
    01eff17aa2910484b5aeb644bc531172  3a59309ef018541fc0c20856d0d89855  78ff2344dd7ef89f9fbcc9789fc0cd79  a6fd7cec89c0af78c3353d4a46a7d273  dbc957868c08ebeb100d708bbd057593
    04ff0abb9e860fc85f0e39d722367c3c  3e09166341615b1b4786efd6745a05d6  79efc000aa29522f0a9598661f485f67  a8c42bfe158abd78ebcb4adb135de61f  dc8e04b02c9d8a1bc04b21d2c8f21f74
    05f48019475de40230900230c63cfe89  3f9fb467c9af91ef41d527fe92f9b590  7a6ad7407d7120eda635d71cd843916a  a8db748c1d329407405387ac82040be4  dfb2df1c25056e920d41c94b659dcdab
    09d30bc0aaaaff786994a6a3bb06abd3  455525b76a1c6826b6eaebd5649c5b6b  7b1458424496baaf3d020e9fece525a4  aa2ef9587b2e9c123744e8940a66a287
    All folders in the above list, like
    01eff17aa2910484b5aeb644bc531172
    , are empty ~ Do you know if there is a setting to delete empty folder?
  • a

    Ananth V

    03/29/2023, 5:53 PM
    đź‘‹ Team! I have a pubsub topic from which I want to consume messages into Flink for further processing. Is there a Connector available for pubsub for Flink SQL Client? Thanks!!
  • a

    Artun Duman

    03/29/2023, 6:40 PM
    Hello folks, I am experimenting with Flink and have been trying to get a setup for SQL Client -> SQL Gateway -> Flink. I got it to work before but somehow I managed to break it and I'm a bit lost now. This is what I do, (I use minikube):
    Copy code
    kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
    $FLINK_PATH/bin/kubernetes-session.sh -Dkubernetes.cluster-id=flink-session-cluster -Dkubernetes.rest-service.exposed.type=LoadBalancer -Dkubernetes.namespace=default -Drest.address=0.0.0.0
    This works fine and creates the cluster on minikube, I can access it if I do
    minikube tunnel
    to access the UI. At this point my services look like this:
    Copy code
    âś— kg svc
    NAME                         TYPE           CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
    flink-session-cluster        ClusterIP      None            <none>        6123/TCP,6124/TCP   5s
    flink-session-cluster-rest   LoadBalancer   10.100.202.72   127.0.0.1     8081:32409/TCP      5s
    kubernetes                   ClusterIP      10.96.0.1       <none>        443/TCP             4d18h
    I then create the SQL Gateway with the below yaml file:
    Copy code
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: sql-gateway
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: sql-gateway
      template:
        metadata:
          labels:
            app: sql-gateway
        spec:
          containers:
            - name: sql-gateway
              image: flink:1.17.0
              ports:
                - containerPort: 8083
              command: ["/bin/sh", "-c"]
              args:
                - "/opt/flink/bin/sql-gateway.sh start-foreground"
              volumeMounts:
                - name: flink-conf
                  mountPath: /opt/flink/conf/flink-conf.yaml
          volumes:
            - name: flink-conf
              hostPath:
                path: /host/flink-refinery/k8s/conf/flink-conf.yaml  # minikube stuff
                type: File
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: sql-gateway
    spec:
      selector:
        app: sql-gateway
      ports:
        - name: sql-gateway
          port: 8083
          targetPort: 8083
      type: ClusterIP
    And my
    flink-conf.yaml
    is
    Copy code
    jobmanager.rpc.address: flink-session-cluster
    jobmanager.rpc.port: 6123
    rest.address: flink-session-cluster-rest
    rest.port: 8081
    sql-gateway.endpoint.rest.address: 0.0.0.0
    sql-gateway.endpoint.rest.port: 8083
    sql-gateway.endpoint.type: rest
    I can confirm from the logs that this config is picked up.
    Copy code
    2023-03-29 18:33:38,431 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: rest.port, 8081
    2023-03-29 18:33:38,431 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: sql-gateway.endpoint.rest.address, sql-gateway
    2023-03-29 18:33:38,431 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: sql-gateway.endpoint.type, rest
    2023-03-29 18:33:38,431 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.address, flink-session-cluster
    2023-03-29 18:33:38,431 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: sql-gateway.endpoint.rest.port, 8083
    2023-03-29 18:33:38,432 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: rest.address, flink-session-cluster-rest
    2023-03-29 18:33:38,432 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: jobmanager.rpc.port, 6123
    When I create a new Flink pod and run the sql client, however, I get an error from SQL Gateway. These are my pods and services at this point:
    Copy code
    âś— kg po
    NAME                                     READY   STATUS    RESTARTS   AGE
    flink-session-cluster-758bc469cc-lgpps   1/1     Running   0          10m
    sql-client-74f44556cf-ls24w              1/1     Running   0          2m29s
    sql-gateway-7758c8fdf9-nldvt             1/1     Running   0          2m29s
    âś— kg svc
    NAME                         TYPE           CLUSTER-IP       EXTERNAL-IP   PORT(S)             AGE
    flink-session-cluster        ClusterIP      None             <none>        6123/TCP,6124/TCP   10m
    flink-session-cluster-rest   LoadBalancer   10.100.202.72    127.0.0.1     8081:32409/TCP      10m
    kubernetes                   ClusterIP      10.96.0.1        <none>        443/TCP             4d18h
    sql-gateway                  ClusterIP      10.103.121.161   <none>        8083/TCP            2m30s
    And I use sql-client pod to connect to the gateway like this:
    Copy code
    âś— kubectl exec -i -t sql-client-74f44556cf-ls24w -- /bin/bash
    root@sql-client-74f44556cf-5tv5d:/opt/flink# ./bin/sql-client.sh gateway -e sql-gateway:8083
    # At this point I get "Session <id> is opened, and the number..." from SQL Gateway logs, so I can connect to the gateway
    Flink SQL> SELECT
    >   name,
    >   COUNT(*) AS cnt
    > FROM
    >   (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name)
    > GROUP BY name;
    As soon as I run that test SQL command, I get this error from the gateway's logs:
    Copy code
    2023-03-29 18:32:28,748 WARN  org.apache.flink.client.program.rest.RestClusterClient       [] - Attempt to submit job 'collect' (ccaafce00ee8c95e4b5c037774d885e1) to '<http://0.0.0.0:8081>' has failed.
    java.util.concurrent.CompletionException: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /0.0.0.0:8081
    	at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    	at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
    	at java.util.concurrent.CompletableFuture$UniCompose.tryFire(Unknown Source) ~[?:?]
    	at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) ~[?:?]
    	at java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) ~[?:?]
    	at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:480) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist-1.17.0.jar:1.17.0]
    	at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /0.0.0.0:8081
    Caused by: java.net.ConnectException: Connection refused
    	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[?:?]
    	at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) ~[?:?]
    	at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[flink-dist-1.17.0.jar:1.17.0]
    Now I am extremely confused, why it's trying to connect to
    0.0.0.0:8081
    ? I remember I got this to work at some point, but I'm not sure what was different since I was just quickly experimenting.. If anyone has any pointers, I'd really appreciate it! Thanks!
    r
    p
    • 3
    • 8
  • r

    Radu Stoian

    03/29/2023, 6:58 PM
    Hi all, I have a Flink error and was hoping someone could help I am encountering an error in my Flink app where calling
    myValueState.value()
    , inside a
    KeyedProcessFunction
    , sometimes returns
    null
    despite the fact that the logic in the code should guarantee that the object returned by
    .value()
    is not null. These nulls are returned rarely, and do not occur again when the app is restarted and run on the same data that it previously failed on. Note:
    myValueState
    is of type
    ValueState<java.time.LocalDateTime>
    . More Context • I am using Flink 1.15.2, hosted on AWS Kinesis Data Analytics; this is where the error occurs • The error does not occur locally • RocksDB is used as the state store backend on AWS Kinesis Data Analytics Code • Near the top of my process function, I run the first function for every single event to update my minTimestamp value state • This function, to my mind, should ensure that the value of this value state should never be null • Later on in the code I call
    minTimestamp.value()
    , which will return null once in a while
    Copy code
    void updateMinTimestamp(LocalDateTime newTimestamp) {
        try {
            final LocalDateTime currentMinTimestamp = minTimestamp.value();
            if (currentMinTimestamp == null || newTimestamp.isBefore(currentMinTimestamp)) {
                minTimestamp.update(newTimestamp);
            }
        } catch (Exception e) {
            throw new FlinkStateAccessException("failed", e);
        }
    }
    Any information regarding why this error is occurring, and how to prevent it, is much appreciated
  • g

    Gaurav Miglani

    03/29/2023, 7:09 PM
    I have a avro schema string, trying
    AvroSchemaConverter.convertToTypeInfo[Row]
    for converting it into flink type info, somehow below is the table DDL is getting generated which is failing at flink sql validation step
    Copy code
    CREATE TABLE t1 (
      `pp` MAP<STRING, RAW('java.lang.Object', '...')>
    ) with (
    'connector' = 'kafka',
    'topic' = 'test',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.group.id' = 'test',
    'properties.auto.offset.reset' = 'latest',
    'value.format' = 'avro-confluent',
    'scan.topic-partition-discovery.interval' = '60000',
    'value.avro-confluent.url' = 'localhost:9091',
    'scan.startup.mode' = 'group-offsets'
    )
    error
    Copy code
    Caused by: org.apache.flink.table.api.ValidationException: Unable to restore the RAW type of class 'java.lang.Object' with serializer snapshot '...'.
    	at org.apache.flink.table.types.logical.RawType.restore(RawType.java:157)
    	at org.apache.flink.table.planner.calcite.FlinkTypeFactory.createRawType(FlinkTypeFactory.scala:371)
    	at org.apache.flink.sql.parser.type.SqlRawTypeNameSpec.deriveType(SqlRawTypeNameSpec.java:60)
    	at org.apache.calcite.sql.SqlDataTypeSpec.deriveType(SqlDataTypeSpec.java:231)
    	at org.apache.calcite.sql.SqlDataTypeSpec.deriveType(SqlDataTypeSpec.java:218)
    	at org.apache.flink.sql.parser.type.SqlMapTypeNameSpec.deriveType(SqlMapTypeNameSpec.java:67)
    	at org.apache.calcite.sql.SqlDataTypeSpec.deriveType(SqlDataTypeSpec.java:231)
    	at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.collectPhysicalFieldsTypes(MergeTableLikeUtil.java:509)
    	at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.appendDerivedColumns(MergeTableLikeUtil.java:397)
    	at org.apache.flink.table.planner.operations.MergeTableLikeUtil$SchemaBuilder.access$000(MergeTableLikeUtil.java:209)
    	at org.apache.flink.table.planner.operations.MergeTableLikeUtil.mergeTables(MergeTableLikeUtil.java:150)
    	at org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:162)
    	at org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:77)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:309)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:262)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:723)
    • 1
    • 1
  • s

    Senad Hadžikić

    03/30/2023, 12:02 PM
    Hello guys, I'm using KafkaSource to define the Kafka consumer:
    Copy code
    KafkaSource
            .<...>builder()
            .setProperty("<http://partition.discovery.interval.ms|partition.discovery.interval.ms>", "10000")
            .setProperty("enable.auto.commit", "true")
            .setProperty("<http://auto.commit.interval.ms|auto.commit.interval.ms>", "50")
            .setProperty("auto.offset.reset", "latest")
            .setProperty("auto.offset.reset.strategy", "latest")
            .setProperty("commit.offsets.on.checkpoint", "false")
            .setProperty("client.id", "clientFlink6661")
            .setPartitions(
                    new HashSet<>(
                            Arrays.asList(
                                    new TopicPartition("myTopic", 0),
                                    new TopicPartition("myTopic", 1)
                            )
                    )
            )
            .setBootstrapServers(kafkaAddress)
            .setStartingOffsets(OffsetsInitializer.latest())
            .setGroupId("FlinkDestroyer6661")
            .setValueOnlyDeserializer(new MyClassDeserializationSchema())
            .build();
    The issue is when I used the
    Copy code
    DeliveryGuarantee.EXACTLY_ONCE
    in my KafkaSink, everything worked perfectly but when I switched to
    Copy code
    DeliveryGuarantee.AT_LEAST_ONCE)
    now every time I start the Flink app, it starts from the first message in the topic, like the OffsetsInitializer.latest() doesn't matter at all. I've tried everything like you can see in the provided code. Any ideas what could cause the issue. Also, even though the flink job consumes messages from the Kafka topic, it shows 0 active consumers and the State is marked as EMPTY.
    m
    • 2
    • 3
  • s

    Senad Hadžikić

    03/30/2023, 12:04 PM
    The version of Flink I'm using is 1.15.2
  • o

    Otto Remse

    03/30/2023, 12:26 PM
    Hi! I'm trying to exclude some metrics that gets pushed to our Prometheus push gateway. What is the format of the
    metrics.reporter.prompushgateway.filter.excludes
    setting? Is it the same as
    metrics.reporter.prompushgateway.scope.variables.exludes: task_attempt_id;task_attempt_num
    i.e semi-colon separated list? The documentation just states that it should be a List<String> whereas the scope.variables.excludes is a String?
  • j

    Jashwanth S J

    03/30/2023, 12:51 PM
    Hi Team, Using flink kubernetes operator to run session jobs in session cluster. Session jobs and task manager pods didn't come up when nodes went down. Once node is back, operator should take care of bringing all the jobs back right. But session jobs only didn't come up. How can we handle jobs even during node failures and make sure all the old jobs started to run again? can you please throw some light here
    Copy code
    ❯ kubectl get FlinkSessionJob -n flink
    NAME                         JOB STATUS    LIFECYCLE STATE
    basic-session-job-example    RECONCILING   STABLE
    basic-session-job-example2   RECONCILING   STABLE
  • j

    Jashwanth S J

    03/30/2023, 12:51 PM
    Copy code
    status:
        error: '{"type":"org.apache.flink.kubernetes.operator.exception.MissingSessionJobException","message":"Missing
          Session Job","throwableList":[]}'
  • s

    Slackbot

    03/30/2023, 1:39 PM
    This message was deleted.
    m
    • 2
    • 1
  • y

    Yufei Chen

    03/30/2023, 3:57 PM
    Hello: we are trying to test out Autoscaler in Flink Kubernetes Operator, and found that Autoscaler seems only works with Application Deployments✅, not in Session Cluster Deployments❌. Can anyone help confirm this is expected behavior? Thanks!
    g
    • 2
    • 5
  • l

    Leon Xu

    03/30/2023, 4:48 PM
    Hi Flink Users, has anyone tried hooking up flink-cdc-connector with postgres 14 or 15? The github page mentioned it only works for 12 maximum. So I am wondering if there's any issue with higher posgres version. If so then is there any alternative solutions? or what's the timeline to get support for postgres 14/15?
    s
    • 2
    • 3
  • i

    Ivan Webber

    03/30/2023, 7:28 PM
    I was waiting for 1.17.0 to have [FLINK-18568] Add Support for Azure Data Lake Store Gen 2 in File Sink - ASF JIRA (apache.org), but due to [FLINK-31612] ClassNotFoundException when using GCS path as HA directory - ASF JIRA (apache.org) I still can’t write to azure. Is FLINK-18568 incorporated into any of the other releases or am I out of luck until 1.17.1 comes out? Thanks, Ivan
    m
    • 2
    • 4
  • g

    Guruguha Marur Sreenivasa

    03/30/2023, 9:18 PM
    Hi All, how do I use
    CheckpointListener
    to listen to completed checkpoints? I'm unable to register this with my stream environment. Can someone please help here? Below is my listener code:
    Copy code
    public class CheckpointCoordinator implements CheckpointListener {
      @Override
      public void notifyCheckpointComplete(long checkpointId) throws Exception {
        <http://log.info|log.info>("Checkpoint {} completed successfully.", checkpointId);
        // add custom logic here
      }
    
      @Override
      public void notifyCheckpointAborted(long checkpointId) throws Exception {
        CheckpointListener.super.notifyCheckpointAborted(checkpointId);
      }
    }
    How do I register this?
    s
    • 2
    • 5
  • g

    Gil Kirkpatrick

    03/30/2023, 9:29 PM
    I'm trying to build a Docker container image containing the bits needed to run the Flink SQL client to issue SQL queries on Kafka topics, and I'm struggling to work out what JAR files to include in the container. I'm starting with the 1.17.0 Docker image, and based on this https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/connectors/table/kafka/, https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/connector/, and https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/configuration/overview/, I've added • flink-json/1.17.0/flink-json-1.17.0.jar • flink-table-api-java-bridge-1.17.0.jar • flink-sql-connector-kafka-1.17.0.jar But when I run a query on a the following table:
    CREATE TABLE FOO (id STRING, name STRING)
    WITH (
    'connector'='kafka',
    'topic'='foo',
    'format'='json',
    'properties.bootstrap.servers'='flinktests-kafka-1:29091',
    'properties.group.id'='flink',
    'scan.startup.mode'='earliest-offset'
    );
    I get the following error from the SQL Client
    [ERROR] Could not execute SQL statement. Reason:
    2023-03-30 15:23:43 java.lang.ClassCastException: class org.codehaus.janino.CompilerFactory cannot be cast to class org.codehaus.commons.compiler.ICompilerFactory (org.codehaus.janino.CompilerFactory is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @4c013fb6; org.codehaus.commons.compiler.ICompilerFactory is in unnamed module of loader 'app')
    Any ideas where I should look?
    h
    • 2
    • 1
  • a

    Adam Augusta

    03/30/2023, 10:01 PM
    I was hoping to create a custom table connector that would allow me to stream field updates. But it seems like Flink requires the full row, even if only a single field has changed. Is that correct?
    m
    • 2
    • 7
  • j

    Jalil Alchy

    03/30/2023, 10:08 PM
    Has anyone ever seen an error like this using Ververica CDC when trying to stop an application with a save point?
    Copy code
    org.apache.flink.runtime.checkpoint.CheckpointException: Task has failed.
    	at org.apache.flink.runtime.messages.checkpoint.SerializedCheckpointException.unwrap(SerializedCheckpointException.java:51)
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveDeclineMessage(CheckpointCoordinator.java:1013)
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$declineCheckpoint$2(ExecutionGraphHandler.java:103)
    	at org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Task name with subtask : Source: postgres_cdc (1/1)#19 Failure reason: Task has failed.
    	at org.apache.flink.runtime.taskmanager.Task.declineCheckpoint(Task.java:1395)
    	at org.apache.flink.runtime.taskmanager.Task.lambda$triggerCheckpointBarrier$3(Task.java:1338)
    	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
    	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1094)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
    	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
    	... 1 more
    Caused by: java.util.concurrent.CompletionException: java.lang.Exception: Could not perform checkpoint 38 for operator Source: postgres_cdc (1/1)#19.
    	at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367)
    	at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:376)
    	at java.base/java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:1019)
    	... 15 more
    Caused by: java.lang.Exception: Could not perform checkpoint 38 for operator Source: postgres_cdc (1/1)#19.
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1138)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$11(StreamTask.java:1085)
    	... 12 more
    Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 38 for operator Source: postgres_cdc (1/1)#19. Failure reason: Checkpoint was declined.
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:269)
    	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:173)
    	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:227)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:212)
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:192)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:647)
    	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:320)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$12(StreamTask.java:1253)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1241)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointAsyncInMailbox(StreamTask.java:1126)
    	... 13 more
    Caused by: org.apache.flink.util.FlinkRuntimeException: Call snapshotState() on closed source, checkpoint failed.
    	at com.ververica.cdc.debezium.DebeziumSourceFunction.snapshotState(DebeziumSourceFunction.java:307)
    	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
    	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
    	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
  • i

    Ivan Webber

    03/30/2023, 11:36 PM
    Are there instructions for how to build a docker container for an unreleased version of Apache Flink (e.g. 1.17.1)? I'm trying to build and containerize 1.17.1 from source and would appreciate being pointed in the right direction
    g
    • 2
    • 1
  • e

    Echo Lee

    03/31/2023, 2:58 AM
    Hi all, I saw that the official website https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/#outer-equi-join describes that Regular Joins only supports equivalent joins, but local verification supports non-equivalent joins, for example
    on a.value > b.value
    Is there a mistake in the description on the official website or something wrong with my understanding?
  • e

    Evaldas Buinauskas

    03/31/2023, 6:16 AM
    Hey, what would be the correct approach to track async call duration in a Flink application? We're using Prometheus to observe cluster metrics. I have found that Flink exposes https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/ and it's possible to register my own metrics, but I'm not entirely sure how histograms work? Normally for histograms I'd declare latency buckets, but here it suggests using Coadahale/DropWizard histograms that seem to work slightly differently. I'm seeking to find something similar as to what I'm doing in Rust, e.g.
    Copy code
    /// HTTP request latency histogram buckets
    const HTTP_REQUESTS_HISTOGRAM_BUCKETS_SECONDS: &[f64; 18] = &[
        0.010, 0.020, 0.030, 0.050, 0.060, 0.070, 0.080, 0.090, //
        0.100, 0.150, 0.200, 0.250, 0.300, 0.350, 0.400, 0.450, //
        0.500, // Timeout limit
        0.600, // Catch all
    ];
    
    lazy_static::lazy_static! {
        /// Tracks request histogram
        pub static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!(
            "http_request_duration_seconds",
            "The HTTP request latencies in seconds.",
            &[HANDLER_LABEL, PORTAL_LABEL, QUERY_LABEL],
            HTTP_REQUESTS_HISTOGRAM_BUCKETS_SECONDS.to_vec()
        )
        .expect("valid histogram vec metric");
    }
    🙏
  • r

    Raghunadh Nittala

    03/31/2023, 11:03 AM
    Hi Team, In a class called
    ResultProcessor
    , I’m using Table API and executing few SQL queries on a Kafka stream using
    executeSql()
    method. In another class called
    ResultExecutor
    , I’m using DataStream to process another Kafka stream, using
    env.execute()
    method. Now, in the Main class, I’m calling
    ResultExecutor
    first and then
    ResultProcessor
    . While submitting the job, I’m getting an exception
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot have more than one execute() or executeAsync() call in a single environment
    . When I reverse the order, I don’t see the exception, but only one stream is getting executed. When I comment out the first one, the second one is working fine and I’m able to see the operators in the job overview (and viceversa). Is there any restriction on the environment to execute TableAPI/DataStream alone? I’m using Flink 1.16 version. 🙏
    s
    a
    • 3
    • 7
  • r

    Ricardo Correia

    03/31/2023, 5:00 PM
    Hello! 👋 I’m trying to deploy my Flink application in a Kubernetes cluster via the Flink Kubernetes operator. The problem that I’m facing now is in getting the JAR file from an S3 bucket. I’ve extended the image of the Kubernetes operator so that I add the S3 plugins:
    Copy code
    FROM apache/flink-kubernetes-operator:1.4.0
    
    ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
    
    COPY flink-s3-fs-hadoop-1.15.4.jar $FLINK_PLUGINS_DIR/s3-fs-hadoop/
    COPY flink-s3-fs-presto-1.15.4.jar $FLINK_PLUGINS_DIR/s3-fs-presto/
    And I can confirm that the pods of the operator contain these plugins. The problem is that when I try to reference the JAR file that is in the S3 bucket:
    Copy code
    job:
        jarURI: '<s3://mybucket/flink-application-15.jar>'
    In standalone deployment I get the following error in the JM:
    Copy code
    Caused by: java.net.MalformedURLException: unknown protocol: s3
    In a session job deployment I get the following error in the deployment:
    Copy code
    The LocalStreamEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context.
    Has anyone faced a similar issue or knows why I’m getting this error? 🙏
    m
    g
    • 3
    • 10
  • v

    Varun Sayal

    03/31/2023, 5:48 PM
    Hi guys, I am looking into using the flink queryable state api
  • v

    Varun Sayal

    03/31/2023, 5:49 PM
    I noticed that
    Queryable state is currently not supported with TTL
  • v

    Varun Sayal

    03/31/2023, 5:49 PM
    Does anyone know the underlying reason why the queryable state apis dont allow queries on state with TTL?
    m
    • 2
    • 2
  • j

    Joris Basiglio

    03/31/2023, 8:24 PM
    Anybody knows if we can tune RocksDB global compaction period to make it happen more often that the 30 days? I tried to change TTL property to be hourly (Via
    RocksDBOptionsFactory
    ) to test things out, but I don't really see anything happening on my state after mutliple days of it running
    d
    • 2
    • 3
  • n

    Nathanael England

    03/31/2023, 8:31 PM
    I have a
    KeyedCoProcessFunction
    in pyflink 1.16.0. When I call
    ctx.get_current_key()
    like the docs show, I just get
    None
    back instead of my key. Does this work for anyone else?
    d
    • 2
    • 15
  • a

    Amir Hossein Sharifzadeh

    04/01/2023, 8:19 PM
    Hello guys. I need advices to fix an issue: I am trying to package all libraries (flink, etc..) into a jar file. I am using Intellij Idea to build artifacts. After I run the jar file it tells me:
    Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/ProcessFunction
    Any idea how to solve this issue? And yes, my
    pom.xml
    contains all required dependencies:
    Copy code
    <dependency>
        <groupId>commons-io</groupId>
        <artifactId>commons-io</artifactId>
        <version>2.11.0</version>
        <scope>compile</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-loader</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-runtime</artifactId>
        <version>${flink.version}</version>
        <scope>compile</scope>
    </dependency>
    
    <!-- Adding jackson dependencies. They must be in the default scope (compile). -->
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>${jackson.databind.version}</version>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-annotations</artifactId>
        <version>${jackson.version}</version>
    </dependency>
    <!-- jackson needs jackson-datatype-jsr310 for Java 8 java.time.Instant support -->
    <dependency>
        <groupId>com.fasterxml.jackson.datatype</groupId>
        <artifactId>jackson-datatype-jsr310</artifactId>
        <version>${jackson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-base</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-json</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>
    where
    Copy code
    <properties>
        <flink.version>1.17.0</flink.version>
        <jackson.databind.version>2.14.0</jackson.databind.version>
        <jackson.version>2.14.0</jackson.version>
        <junit.jupiter.version>5.9.1</junit.jupiter.version>
        <kafka.version>3.2.2</kafka.version>
        <log4j.version>2.19.0</log4j.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <target.java.version>11</target.java.version>
    </properties>
    d
    • 2
    • 1
  • n

    Nancy Yang

    04/02/2023, 1:11 AM
    Hey, I am having problem to deploy my pyflink app to k3s cluster using flink on kubernete operator. Seems the python deployment example from the flink-kubernetes-operator link here doesn't work for me either. I have a local k3s/k3d running in docker, and I deployed the flink-operator using helm with this chart file. I built the image using the example Dockerfile and python_demo.py, and pushed it to a local registry. The tricky thing is it worked fine after deployed. But if I changed the python_demo.py file to a different name like word_count.py, no code change, rebuilt the image, pushed it to registry and deployed the app again. it failed with this error message in flink-operator/python-demo pod.
    Copy code
    Caused by: java.nio.file.NoSuchFileException: /tmp/pyflink/3e67a85b-296d-4586-aa5b-b654963e6464/7fc3a4b9-0c8f-4d6a-9178-8cff5e5c58e6/word_count.py
    I looked at the PythonDriver.java codes, seems it should either create a soft link for the original word_count.py file under the /tmp/pyflink/xxx/yyy/word.py or copy the file over to the /tmp directory. I don't understand why it complaint. Also the weird thing is if I changed the codes to do different things in python_demo.py. the deployment still succeeded and ran the same stream job. Looks to me the pod was running something from pod template, not from my python-example.yaml. Has anybody met the same issue? Or I missed something? Thanks in advance!! gratitude thank you python-example.yaml
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: python-demo
      namespace: flink-operator
    spec:
      image: registry.localhost:5000/flink-python-demo:latest
      flinkVersion: v1_16
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "1"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
      job:
        jarURI: local:///opt/flink/opt/flink-python-1.16.1.jar # Note, this jarURI is actually a placeholder
        entryClass: "org.apache.flink.client.python.PythonDriver"
        args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/word_count.py"]
        parallelism: 1
        upgradeMode: stateless
    g
    • 2
    • 14
1...697071...98Latest