https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • f

    Fariz Hajiyev

    01/18/2023, 7:29 AM
    Hello, I am getting the following exception while running Stateful functions (locally, on my Windows): The app starts running normally, I can see numerous Flink logs from dispatcher, ingress, etc. and suddenly it crashes, upon receival of the first proto from kinesis, I guess:
    Copy code
    Caused by: java.lang.NoSuchMethodError: org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder;
    	at org.apache.flink.statefun.flink.io.common.AutoRoutableProtobufRouter.typedValuePayload(AutoRoutableProtobufRouter.java:72)
    	at org.apache.flink.statefun.flink.io.common.AutoRoutableProtobufRouter.route(AutoRoutableProtobufRouter.java:51)
    	at org.apache.flink.statefun.flink.io.common.AutoRoutableProtobufRouter.route(AutoRoutableProtobufRouter.java:36)
    	at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.processElement(IngressRouterOperator.java:81)
    	at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
    I am using Statefun dependencies of version 3.1.0, and Flink dependencies of version 1.13.1 I have this protobuf dependency in my pom:
    Copy code
    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.21.12</version>
    </dependency>
    Tried all versions starting from 3.8.0 to 3.21.12, same error. Can't try lower than 3.8.0 because the java proto files in the project won't compile. "statefun-flink-io" transitively brings in protobuf 3.7.1, so I tried excluding it, but no success either, same error. Tried statefun 3.1.1, with Flink 1.13.1, same error. Tried statefun 3.2.0, with Flink 1.14.0, same error. I opened
    TypedValue
    proto class (from "External Libraries" of IDEA) and it has
    Builder
    inner class with
    setValue(ByteString value)
    public method, Any idea what I am doing wrong?
  • a

    Amenreet Singh Sodhi

    01/18/2023, 9:19 AM
    Hi Team, Is there any way to set the name of the log file for flink JM and TM. I was able to find the env variable to change the default directory for logging(
    env.log.dir
    ) but couldnt find out the respective variable for log file name.
    c
    • 2
    • 7
  • d

    Dheeraj Panangat

    01/18/2023, 1:33 PM
    Hi Team, While checkpointing to Azure Blob Storage using Flink, we get the following error :
    Copy code
    Caused by: Configuration property <accoutnname>.<http://dfs.core.windows.net|dfs.core.windows.net> not found.
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getStorageAccountKey(AbfsConfiguration.java:372)
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1133)
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:174)
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)
    We have given the configurations in core-site.xml too for following
    Copy code
    fs.hdfs.impl
    fs.abfs.impl -> org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem
    fs.file.impl
    fs.azure.account.auth.type
    fs.azure.account.oauth.provider.type
    fs.azure.account.oauth2.client.id
    fs.azure.account.oauth2.client.secret
    fs.azure.account.oauth2.client.endpoint
    fs.azure.createRemoteFileSystemDuringInitialization -> true
    On debugging found that flink reads from core-default-shaded.xml, but even if the properties are specified there, the default configs are not loaded and we get a different exception as :
    Copy code
    Caused by: Unable to load key provider class.
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getTokenProvider(AbfsConfiguration.java:540)
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1136)
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:174)
    	at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110)
    Is anyone familiar with this issue ? Appreciate any help on this. Thanks
    m
    d
    s
    • 4
    • 6
  • j

    jaiprasad

    01/18/2023, 2:18 PM
    How to scale down number of taskmanagers Context: We have Kubernetes Flink Operator and have done a flink deployment with 1 Job manager and two taskmanagers I want to scale down the taskmanager replica to 1 from 2 , how can we do this ? Couldn't find the option to scale down taskmanager in FlinkDeployment kind
    g
    • 2
    • 4
  • p

    Prashant Bhardwaj

    01/18/2023, 2:45 PM
    I have few questions 1. If I have not enabled checkpointing in my config, can I still manually trigger a savepoint to upgrade my job? 2. Also, does the restart strategy of
    fixed delay
    work without enabling checkpoints? Basically, can I define any restart strategy apart from
    none
    without enabling checkpoint?
    m
    • 2
    • 5
  • s

    Sergio Morales

    01/18/2023, 3:52 PM
    Hi all, I'm trying to run the native kubernetes example but fails with:
    io.fabric8.kubernetes.client.KubernetesClientException: JcaPEMKeyConverter is provided by BouncyCastle
    . I added the 1.70 version for both libraries: https://mvnrepository.com/artifact/org.bouncycastle/bcprov-jdk15on, https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on but now when running the command to run flink in kubernetes (
    ./bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
    ) it does not startup the jobmanager. Any ideas?
    • 1
    • 4
  • m

    Matt Weiss

    01/18/2023, 4:01 PM
    Union operator best practices: We have 3 streams we want to union. We use keyBy after the union and pass the keyed stream through a tumbling window. Should we key all 3 streams before the union? If so, is the result of the union still keyed? Or do we need to keyBy again?
    n
    • 2
    • 2
  • p

    PRASHANT GUPTA

    01/18/2023, 4:36 PM
    How to avoid checkpoint failures Context --> I have enabled checkpointing , with below configs checkpointDurationMs: 60000 checkpointTimeoutMs: 180000 maxCheckpointRetained: 10 tolerableFailedCheckpoint: 100 I am using flink kubernetes operator , whenever i uninstall and reinstall the flink deployments i see these exceptions in my jobmanager and my checkpoints fails.
    Caused by: java.io.IOException: Target file file:/data/datas-template/checkpoints/00000000067179110000000000000000/chk-1/_metadata already exists.
    g
    s
    • 3
    • 9
  • a

    Aeden Jameson

    01/18/2023, 5:02 PM
    I’d like to get clarification on my understanding of schema evolution of state for FlinkSQL jobs and the implications. From reading https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/, I’d conclude that FlinkSQL currently doesn’t support state schema evolution. If I have that correct, would that mean that if I take a save point for a job with windowing, add a new field and then attempt to restore the job from that save point it would fail?
    m
    • 2
    • 1
  • b

    Bhupendra Yadav

    01/18/2023, 5:47 PM
    Hi Everyone, I have a straightforward use case but cannot make it work. I am using BATCH mode. I have 2 DataStreams, assume they are like this:
    Copy code
    DataStream<Integer> input = env.fromElements(1, 2, 3);
    DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4, 5);
    I want to take the intersection of these 2 streams. I tried different things, but they seem to fail with error: https://ideone.com/LdtTmE Any easy way to take intersection?
    Copy code
    Set<Integer> st = new HashSet<>();
        DataStream<Integer> input = env.fromElements(1, 2, 3);
        CloseableIterator<Integer> integerCloseableIterator = input.executeAndCollect();
        while(integerCloseableIterator.hasNext()) {
          st.add(integerCloseableIterator.next());
        }
        integerCloseableIterator.close();
        DataStream<Integer> input2 = env.fromElements(1, 2, 3, 4, 5)
          .filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer value) throws Exception {
              return st.contains(value);
            }
          });
        input2.print();
        env.execute();
  • m

    Mustafa Akın

    01/18/2023, 7:56 PM
    Hello, I want to run some batch, bounded SQL queries from users on-demand, where most of them takes under a second to run, so I want to optimize the time it starts to create a mini local cluster. I use a local execution environment already. What are the things I should keep in mind? I know that's not the intended use case but leveraging Flink like this would benefit us a lot.
    d
    r
    • 3
    • 3
  • m

    Mustafa Akın

    01/18/2023, 8:21 PM
    Just find out CollectionEnvironment, but not sure if it can be used with a TableEnvironment.
  • y

    Yaroslav Bezruchenko

    01/18/2023, 10:19 PM
    Hey, Can someone please help? How to handle this?
    Copy code
    org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2143) [flink-dist-1.15.2.jar:1.15.2]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
    	at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    	at java.lang.Thread.run(Unknown Source) [?:?]
    2023-01-18 22:15:47,640 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to recover from a global failure.
    org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) ~[flink-dist-1.15.2.jar:1.15.2]
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169) ~[flink-dist-1.15.2.jar:1.15.2]
    	at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122) ~[flink-dist-1.15.2.jar:1.15.2]
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2082) ~[flink-dist-1.15.2.jar:1.15.2]
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2061) ~[flink-dist-1.15.2.jar:1.15.2]
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:98) ~[flink-dist-1.15.2.jar:1.15.2]
    	at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2143) ~[flink-dist-1.15.2.jar:1.15.2]
    	at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
    	at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    	at java.lang.Thread.run(Unknown Source) ~[?:?]
    j
    • 2
    • 4
  • g

    Guruguha Marur Sreenivasa

    01/19/2023, 2:09 AM
    Hi all, I have a few doubts regarding the usage of Schema Registry URL in the
    ConfluentRegistryAvroDeserializationSchema.forGeneric(...)
    method in the
    flink-avro-confluent-registry
    package. 1. What is the use of passing this as an argument? The way I understood it, its only used to set the writer schema and nothing else. 2. If say I have a topic with 2 records r1 with schema s1 and r2 with schema s2 (s1 and s2 are slightly different - s2 has one new column than s1). Both schemas are registered in the schema registry. When I run a
    FlinkKafkaConsumer
    , and configure the deserialization schema as
    ConfluentRegistryAvroDeserializationSchema.forGeneric(s1, URL)
    , I see that the new column in r2 is completely ignored. I was under the impression that if the latest schema s2 is compatible with the record that was read, and that it would be used to deserialize the record. But now, that is not happening. What is the point of passing the URL? How do I make my consumer in such a way that they don't need to know the schema beforehand, but use the writer schema from the registry? Additionally, I tried to follow this post to write my own deserialization logic, but it fails with a kryo exception. 😞 Can someone please elaborate on this?
    ❤️ 1
    s
    r
    • 3
    • 21
  • s

    Sumit Nekar

    01/19/2023, 5:04 AM
    Hello, Sometimes I see job manager getting restarted because of following exceptions in our flink cluster (managed by flink k8s operator). Is this intermittent issue or something we need to tune job manager with ?
    2023-01-19 02:04:13,975 ERROR io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector [] - Exception occurred while acquiring lock 'ConfigMapLock: streaming-application streaming-application-resourcemanager-leader (dc2d523f-ffba-493c-9192-e0e4ddc8d299)'
    io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [ConfigMap]  with name: [streaming-application-resourcemanager-leader]  in namespace: [streaming-application]  failed.
    at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64) ~[flink-dist_2.12-1.13.6.jar:1.13.6]
    m
    • 2
    • 2
  • s

    Sumit Aich

    01/19/2023, 7:24 AM
    hi team, is the flink kubernetes operator version 1.3.1 compatible with kubernetes v1.22 ?
    k
    m
    • 3
    • 3
  • n

    nick christidis

    01/19/2023, 10:24 AM
    Hello, I would like to get some opinions, insights or solutions for the following problem I am facing. --> It is related to flink kubernetes operator: https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
    ---- before introduction of flink kubernetes operator ----
    A component in the platform I am working, is providing framework code so that a user could designate-instruct all jobs of job manager to gracefully shutdown with a savepoint. What is doing pretty much behind the scenes is to leverage the Flink Rest API, with the following order: 1. it is calling the GET /jobs endpoint of the jobmanager to get all the job ids [see attached screen] 2. and then for every job id was stopping the job with POST /jobs/:jobid/stop [see attached screen] So everything was working as expected. But....
    ---- after introduction of flink kubernetes operator -----
    because the state is passed in the yaml (eg: state: running), and get managed from Control Plane of k8s, etc. the above does not work -- seems to getting ignored from flink kubernetes operator, so whole above mentioned mechanics are not working in a flink kubernetes operator deployment setup.
    So my question is the following:
    Is there a similar API provided from kubernetes operator so that we can designate job manager to do things we wish (eg: stop jobs) and solve the above issue?
    m
    • 2
    • 4
  • s

    Sami Badawi

    01/19/2023, 11:45 AM
    Postgres source written in Python I have run into Postgres problems with JDBC and CDC: JDBC did not support Postgres JSONB type. CDC does not support Postgres 13 and I could not get the replication slots to work. I am wondering if anybody had any luck with making a Postgres source written in Python?
  • y

    Yaroslav Bezruchenko

    01/19/2023, 11:48 AM
    Hey, I have a question. In Flink Operator for kubernetes, if task manager failed it restarts it, but ignores readiness check and starts processing data, which in my case leads to fail loop. Any idea how to force Flink Operator to respect readiness checks?
    m
    • 2
    • 4
  • a

    Aman Sharma

    01/19/2023, 3:32 PM
    Hello,i am learning the flink cep library in flink,is there a way we can remove the pattern from the group of patterns from the flink streaming job?
    d
    • 2
    • 6
  • m

    Matyas Orhidi

    01/19/2023, 6:01 PM
    Hey folks, one of our jobs (1.15.2) got stuck in resource allocation loop after a lost HB, has anyone seen this?
    Copy code
    Heartbeat of TaskManager with id <app-name>-2-1 timed out.
    Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
    Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
    Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
    Slot request bulk is not fulfillable! Could not allocate the required slot within slot request timeout
    • 1
    • 3
  • y

    Yang LI

    01/19/2023, 7:31 PM
    Hello guys, I am trying my flink job with flink kubernets operator. I have tried to do application upgrade on application mode and session mode. Each time I update FlinkDeployment or FlinkSessionJob to trigger a upgrade, I see the flink job failed to create savepoint and has following exception
    Copy code
    org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException
    Do you know what could be the cause of this? 🙏
    ✅ 1
    m
    k
    • 3
    • 10
  • k

    Krish Narukulla

    01/19/2023, 10:08 PM
    Can i insert null values into flink table?
    insert into table1(col1) select null;
    Copy code
    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 159 to line 1, column 162: Illegal use of 'NULL'
            at <http://org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org|org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org>$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
            at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
            at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
            at org.apache.flin
  • n

    Nathanael England

    01/19/2023, 10:31 PM
    Is queryable state supported in the python API?
    • 1
    • 1
  • i

    Ikvir Singh

    01/20/2023, 12:52 AM
    Hello 👋🏽. I have a strange issue I’m only seeing in Flink 1.15+. I have an exactly-once Kafka sink that is set up. Once the checkpointing interval has elapsed, I’m seeing the following error in my logs show up repeatedly:
    Copy code
    ERROR org.apache.flink.connector.kafka.sink.KafkaCommitter - Unable to commit transaction (org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequestImpl@f2f21e9) because it's in an invalid state. Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.
    org.apache.kafka.common.errors.InvalidTxnStateException: The producer attempted a transactional operation in an invalid state
    I did not have this error in Flink 1.14. Is this a known bug?
    m
    o
    • 3
    • 6
  • a

    abhishek sidana

    01/20/2023, 5:36 AM
    Hi, To secure the flink container in Kubernetes i was trying to add security context like read-only file system but it give the below error: sed: couldn't open temporary file /opt/flink/conf/sedKOSkzJ: Read-only file system sed: couldn't open temporary file /opt/flink/conf/sedO9Qit7: Read-only file system /docker-entrypoint.sh: line 73: /opt/flink/conf/flink-conf.yaml: Permission denied /docker-entrypoint.sh: line 89: /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
    k
    h
    • 3
    • 10
  • a

    Abdul Rafehi

    01/20/2023, 8:37 AM
    Hi all, we're running into issues with JobManager running out of JVM metaspace. As per docs, we're adding dependencies to
    /opt/flink/lib
    . No issues with postgres driver & the flink jdbc connector, but we're having issues with
    flink-connector-kinesis:4.0.0-1.16
    (running on Flink image
    flink:1.16.0-scala_2.12-java11
    ). Adding this jar to the lib folder gives the following error:
    Copy code
    jobmanager_1   | java.lang.NoClassDefFoundError: Could not initialize class org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration
    jobmanager_1   |        at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
    jobmanager_1   |        at org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
    jobmanager_1   |        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
    jobmanager_1   |        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.<init>(KinesisProxy.java:152) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
    jobmanager_1   |        at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.create(KinesisProxy.java:280) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
    jobmanager_1   |        at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.<init>(KinesisDataFetcher.java:412) ~[flink-connector-kinesis-4.0.0-1.16.jar:4.0.0-1.16]
    Any thoughts on what might be causing this?
    j
    • 2
    • 3
  • s

    Sami Badawi

    01/20/2023, 8:54 AM
    I wrote a Postgres Database Source in Python. But I cannot change it to be unbounded streaming. Can you write a Database Source in Scala og Java and use from PyFlink? I have not found any examples of mixed PyFlink and Java/Scala Flink projects.
    d
    • 2
    • 2
  • s

    Sudhan Madhavan

    01/20/2023, 9:53 AM
    Hello Everyone, I am using Ververica platform for flink job deployments. I want to do rolling upgrades with zero downtime. But as per the given upgrade strategies by ververica, none of it achieves zero downtime. Can anyone help me on this?
    m
    j
    s
    • 4
    • 6
  • c

    chunilal kukreja

    01/20/2023, 3:33 PM
    Hi Team, Is there a way to find out what is the current retry count made in Async I/O (asyncInvoke() api) when AsyncRetryStrategy is defined?
1...484950...98Latest