https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Shahid Chohan

    07/12/2023, 12:56 AM
    I've noticed that if I'm using ZooKeeper for HA and I kill a ZK node that's in the quorum, the JM will restart. If I've specified multiple ZK nodes in the quorum string (say 5 for example), is there a way to get the JM to connect to a different node without triggering a restart? I've seen the`high-availability.zookeeper.client.tolerate-suspended-connections` config option, but I'm not sure if it actually applies since the node itself should be lost.
    a
    • 2
    • 7
  • e

    Eugenio Gastelum

    07/12/2023, 1:20 AM
    Hello everyone, I am trying to run pyflink with a UDF locally (what flink apparently calls minicluster mode by using the
    .wait()
    function on a Table) But I get several Java errors. and this one bellow seems to be the main problem, it looks that it happens frequently to other people since I've seen it across stackoverflow:
    Copy code
    Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing Python harness: C:\Users\eugen\anaconda3\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=6-1 --provision_endpoint=localhost:53973
    INFO:root:Starting up Python harness in loopback mode.
    I've seen a solution was to set one variable
    _python_worker_execution_mode
    to `process`: https://stackoverflow.com/a/72037218/9588300 However, I've set the variable and it gives the same error. This is how I've set it up in my .py file:
    Copy code
    from pyflink.datastream import StreamExecutionEnvironment
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar")
    table_env = StreamTableEnvironment.create(stream_execution_environment=env)
    table_env.get_config().set("pipeline.jars","file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar")
    table_env.get_config().set("table.exec.sink.not-null-enforcer", "DROP")
    table_env.get_config().set("_python_worker_execution_mode", "process")
    Did I set it wrong?
    • 1
    • 1
  • m

    Mukesh Kumar

    07/12/2023, 4:57 AM
    Hello All, I am exploring Flink for ETL, to migrate data from a legacy database system to a modern one. Both DB share different schema designs. It has to be a Historical data migration as well as changes have to be migrated on a real-time basis as both systems would be online. Further, I need to think of a way to do bi-directional sync as well. Currently, I am capturing Leagcy DB CDC into Kafka. MSSQL -> Debezium -> Kafka -> Flink -> Postgres I have also built a small POC, which uses Flink rocksdb, checkpoints, and Flink SQL tables with Kafka Connector as Source and Postgres as Sink connector, in between performing SQL-based joins, and things are working fine. I have regarding stats that I don't need to maintain states in my use case as it will be event-based let's, So can I avoid using states in Flink?
    m
    • 2
    • 1
  • p

    Pappu Yadav

    07/12/2023, 5:03 AM
    Hi Team, I have set default parallelism to 10. I am using Flink batch mode and using Kafka sink, but Kafka sink parallelism is always set to 1. Why the sink parallelism is not set to application parallelism 10
    w
    • 2
    • 2
  • m

    Marco Villalobos

    07/12/2023, 6:08 AM
    question. If I use create a 1 ms window using event time processing, is that supported when there are multiple task managers? I am leveraging the map reduce algorithm and I am using a window to reduce and de-duplicate data. It works single node cluster, but when I run on a multi-node cluster, that operator never receives data.
    Copy code
    TumblingEventTimeWindows.of(Time.milliseconds(1))
  • s

    Sameer Chandra

    07/12/2023, 8:08 AM
    We have an ELT use-case where we ingest data from Kafka, transform it, and write to S3 in parquet format. Since we use a CheckpointRollingPolicy, files are written to s3 on checkpoint (15 min interval). We use rocksdb as state backend. Until the checkpoint is triggered, are these records stored in JVM heap?
    r
    • 2
    • 12
  • s

    Shubham Saxena

    07/12/2023, 9:30 AM
    We have been trying to use pluggable file system to integrate our custom file system implementation in flink. However, we have been facing multiple Class not found issue due to this limitation mentioned in flink docs - "_In practice, it means you should avoid using
    Thread.currentThread().getContextClassLoader()
    class loader in your implementation_." In this case, our dependencies like jersey client and hadoop are using thread's class loader to initialise some classes which we have no control over. Has anyone faced this issue ?? Isn't this limitation too restrictive because it would be very hard to reimplement your dependencies ??
    m
    • 2
    • 4
  • k

    kingsathurthi

    07/12/2023, 9:55 AM
    We are getting below error when installing flinkdeployment. how to resolve this
    No dirty JobResults can be restored
    2023-07-12 09:29:47,171 INFO  org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Starting the resource manager.
    2023-07-12 09:29:47,171 INFO  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess.
    2023-07-12 09:29:47,173 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Stopping DefaultJobGraphStore.
    2023-07-12 09:29:47,174 *ERROR* org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - *Fatal* error occurred in the cluster entrypoint.
    _java.util.concurrent.CompletionException_: _java.lang.IllegalStateException_: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
    at java.lang.Thread.run(Unknown Source) [?:?]
    Caused by: _java.lang.IllegalStateException_: The base directory of the JobResultStore isn't accessible. No dirty JobResults can be restored.
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.highavailability.FileSystemJobResultStore.getDirtyResultsInternal(FileSystemJobResultStore.java:182) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.withReadLock(AbstractThreadsafeJobResultStore.java:118) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.highavailability.AbstractThreadsafeJobResultStore.getDirtyResults(AbstractThreadsafeJobResultStore.java:100) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResults(SessionDispatcherLeaderProcess.java:194) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.2.jar:1.16.2]
    at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.getDirtyJobResultsIfRunning(SessionDispatcherLeaderProcess.java:188) ~[flink-dist-1.16.2.jar:1.16.2]
    ... 4 more
    2023-07-12 09:29:47,262 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting KubernetesApplicationClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
    2023-07-12 09:29:47,269 INFO  org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint.
    2023-07-12 09:29:47,270 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:6124
    2023-07-12 09:29:48,074 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
    2023-07-12 09:29:48,075 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.
    2023-07-12 09:29:48,076 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
    2023-07-12 09:29:48,076 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_4b66f1f1-01f0-4003-843c-386e04719e19.jar) to method java.nio.DirectByteBuffer.cleaner()
    WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    2023-07-12 09:29:48,164 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
    2023-07-12 09:29:48,164 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.
  • k

    Keyur Makwana

    07/12/2023, 10:19 AM
    Hello team, I am running basic flink job to implement table API, But I am getting below error of some dependency conflict, Can anyone help me?
    ✅ 1
    m
    • 2
    • 5
  • t

    Tiphanie Dousset

    07/12/2023, 1:10 PM
    Hi everyone, I'm running Apache Flink with the Kubernetes operator. The image I'm using in my deployment contains the following jar "https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar". In my job I'm using the connector "upsert-kafka".
    Copy code
    CREATE TABLE `MyTable` (
      `object_ID` INTEGER PRIMARY KEY NOT ENFORCED,
      `Path` VARCHAR,
      `Status` VARCHAR NOT NULL
    ) WITH (
      'connector' = 'upsert-kafka',
      'value.format' = 'json',
      'topic' = 'my-topic',
      'key.format' = 'json',
      'properties.security.protocol' = 'SASL_SSL',
      'properties.sasl.mechanism' = 'PLAIN',
      'properties.bootstrap.servers' = 'xxxx',
      'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" password="yyyy";'
    );
    My pod complains that it can't find the "PlainLoginModule". I found this documentation: https://issues.apache.org/jira/browse/FLINK-31361 But using
    <http://org.apache.flink.kafka.shaded.org|org.apache.flink.kafka.shaded.org>.apache.kafka.common.security.plain.PlainLoginModule
    instead of
    org.apache.kafka.common.security.plain.PlainLoginModule
    doesn't solve the issue. What is the correct name? Am I missing something else? (I've tried the workaround suggested in the link above; but that leads to other errors then 🙃)
    m
    r
    • 3
    • 19
  • n

    Neha

    07/12/2023, 1:52 PM
    Hello team, I am trying to implement a timestamp extractor that returns currentTimeMillis as the timestamp for each event (i.e., using ingestion time semantics).
    Copy code
    watermarkStrategy = WatermarkStrategy.forMonotonousTimestamps()
            .withIdleness(Duration.ofSeconds(60))
            .withTimestampAssigner((event, ts) -> System.currentTimeMillis());
    I am getting the following Exception: Caused by: java.lang.NoSuchMethodException: <className>.$deserializeLambda$(java.lang.invoke.SerializedLambda) at java.lang.Class.getDeclaredMethod(Class.java:2158) at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:224) at java.lang.invoke.SerializedLambda$1.run(SerializedLambda.java:221) at java.security.AccessController.doPrivileged(Native Method) at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:221) at sun.reflect.GeneratedMethodAccessor18.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274) ... 30 more can somebody please point me to the correct way to resolve this?
    d
    • 2
    • 1
  • t

    Tudor Pavel

    07/12/2023, 3:13 PM
    Hi! 👋 I ran into a pretty strange behavior today. I'm using Flink 1.17.1 and PyFlink setting up something with the Kafka connector and Python UDTF's. My code worked fine locally but when I tried running it in my docker Flink setup I ran into an error like:
    Copy code
    Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: org.apache.flink.table.runtime.operators.python.table.PythonTableFunctionOperator
    ClassLoader info: URL ClassLoader:
        file: '/tmp/tm_172.25.0.5:44065-a2ef4a/blobStorage/job_c5da7b3563558ec66d5e773659c8abe1/blob_p-18b059e5f10b72a375b507c7f72c8ab9931306f9-ae41178318456ae52391027abd82d3de' (valid JAR)
    Class not resolvable through given classloader.
    Many debugging hours later I've narrowed it down to a simple reproducible example:
    Copy code
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    
    # It fails if I uncomment this
    # kafka_connector_path = os.path.abspath('jars/flink-sql-connector-kafka-1.17.1.jar')
    # t_env.get_config().set("pipeline.jars", f"file://{kafka_connector_path}")
    
    # define the source
    table = t_env.from_elements(
        elements=[
            (1, '{"name": "Flink", "tel": 123, "addr": {"country": "Germany", "city": "Berlin"}}'),
            (2, '{"name": "hello", "tel": 135, "addr": {"country": "China", "city": "Shanghai"}}'),
            (3, '{"name": "world", "tel": 124, "addr": {"country": "USA", "city": "NewYork"}}'),
            (4, '{"name": "PyFlink", "tel": 32, "addr": {"country": "China", "city": "Hangzhou"}}')
        ],
        schema=['id', 'data'])
    
    # execute sql statement
    @udtf(result_types=[DataTypes.STRING(), <http://DataTypes.INT|DataTypes.INT>(), DataTypes.STRING()])
    def parse_data(data: str):
        json_data = json.loads(data)
        yield json_data['name'], json_data['tel'], json_data['addr']['country']
    
    t_env.create_temporary_function('parse_data', parse_data)
    t_env.execute_sql(
        """
        SELECT *
        FROM %s, LATERAL TABLE(parse_data(`data`)) t(name, tel, country)
        """ % table
    ).print()
    If I uncomment the part with
    pipeline.jars
    it fails when I run it like:
    Copy code
    flink run -py basic.py
    However I discovered it works if I include the connector from the CLI instead of the code:
    Copy code
    flink run -py basic.py --jarfile jars/flink-sql-connector-kafka-1.17.1.jar
    After further digging I found the difference is there's 2 JARs on the taskmanager when it works:
    Copy code
    oot@58c941f39b15:/opt/flink# ls -lah /tmp/tm_172.25.0.5\:33231-ecc45a/blobStorage/job_2de88f8af859fccdc956e62cc32c4a88/
    total 37M
    drwxr-xr-x  2 flink flink 4.0K Jul 12 14:24 .
    drwxr-xr-x 40 flink flink 4.0K Jul 12 14:24 ..
    -rw-r--r--  1 flink flink 5.4M Jul 12 14:24 blob_p-18b059e5f10b72a375b507c7f72c8ab9931306f9-d01b99c7749267191b1d6da2dfe3a8bc
    -rw-r--r--  1 flink flink  32M Jul 12 14:24 blob_p-275820cb9b5e36c9f3e1e5483e93d0b808fe257e-5f41622e51ea7098f251bcfc3285b1bb
    And just 1 JAR when it doesn't:
    Copy code
    root@58c941f39b15:/opt/flink# ls -lah /tmp/tm_172.25.0.5\:33231-ecc45a/blobStorage/job_af184fc448ee510ac4ebbe92c7e7d893/
    total 5.4M
    drwxr-xr-x  2 flink flink 4.0K Jul 12 14:52 .
    drwxr-xr-x 22 flink flink 4.0K Jul 12 14:52 ..
    -rw-r--r--  1 flink flink 5.4M Jul 12 14:52 blob_p-18b059e5f10b72a375b507c7f72c8ab9931306f9-1fc054bb0990f0378d86166b1edd63ea
    I figured out the 5.4M JAR is the Kafka connector from my project, while the 32M one is the
    flink-python-1.17.1.jar
    that I think should get auto-uploaded. So as a workaround I can copy the flink-python jar into my project subfolder and specify both dependencies in the code and it works:
    Copy code
    kafka_connector_path = os.path.abspath('jars/flink-sql-connector-kafka-1.17.1.jar')
    flink_python_path = os.path.abspath('jars/flink-python-1.17.1.jar')
    t_env.get_config().set("pipeline.jars", f"file://{kafka_connector_path};file://{flink_python_path}")
    But since I haven't seen this documented anywhere I wonder if it might be a bug. It's like specifying
    pipeline.jars
    in the code will override the auto-upload of the flink-python JAR to each task manager when doing
    flink run --python
    . Any clarity on this behavior would be much appreciated, thanks! 🙇
    m
    • 2
    • 4
  • t

    Tsering

    07/12/2023, 3:23 PM
    hi team! i am trying to use Apache Kafka Connector to reading data from kafka, i did as said in the doc but i am keep getting this problem
    No LoginModule found for org.apache.kafka.common.security.scram.ScramLoginModule
    can some one please help me on this.
    m
    • 2
    • 5
  • k

    Kireet Agrawal

    07/12/2023, 5:40 PM
    Hi team, Seeing JobManager failure to restart with the following error, has anyone seen this before?
    Copy code
    20 tasks will be restarted to recover the failed task 10fa6513f35966426dfd36576bff2bae_2878b9ab6776d58d3ac8f74bba6fe037_0_3879.
    at java.lang.Thread.run(Unknown Source) ~[?:?]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419) ~[flink-dist-1.16.1.jar:1.16.1]
    new:[p-143a36af2df48da99d11f33fe47fff0418ca5511-ba29eb7b159adeb0e4cba1e5b379399f]
    old:[p-143a36af2df48da99d11f33fe47fff0418ca5511-f37b157bf869ea7e830900ee42ceb4dd]
    java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job:
    2023-07-12 15:05:22,500 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: Kinesis Data Stream -> watermarking_strategy_v0 -> Flat Map (1/16) (10fa6513f35966426dfd36576bff2bae_2878b9ab6776d58d3ac8f74bba6fe037_0_3879) switched from DEPLOYING to FAILED on eventlogger-pipeline-v0-taskmanager-18-1 @ ip-10-1-10-12.us-east-2.compute.internal (dataPort=42471).
    Seeing the hash
    old:[p-143a36af2df48da99d11f33fe47fff0418ca5511-f37b157bf869ea7e830900ee42ceb4dd]
    that is being referenced related to KinesisDataFetcher, but not sure where the new one would be coming from. Any ideas?
    k
    • 2
    • 2
  • j

    Jalil Alchy

    07/12/2023, 6:02 PM
    Has anyone had any luck using Flink k8s operator with Istio Gateway for ingress? ?
    m
    • 2
    • 1
  • k

    Kevin Lam

    07/12/2023, 6:34 PM
    👋 I'm implementing a Flink SQL job that has a data skew issues with a LEFT JOIN. The table on the LEFT of the join has a large number of rows with NULL values for the join key, and this causes skew by routing a disproportionate amount of data to a single subtask. Does anyone have any strategies or advice they've used to handle this kind of situation?
  • s

    Slackbot

    07/13/2023, 4:33 AM
    This message was deleted.
    r
    d
    • 3
    • 2
  • r

    Rajat Ahuja

    07/13/2023, 9:05 AM
    Hello community, I launched Cluster via Flink k8s operator, and when i try to launch the job via UI ( upload jar and submit from UI any flink examples ) it runs the job but when i login into the Job manager POD and try to run ./flink run examples/sample.jar. It throws following exception. Any idea what might be wrong here ?
    Copy code
    The program finished with the following exception:
    
    org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failure executing: GET at: <https://10.43.0.1/api/v1/namespaces/apl-edm-streaming-svcs-l3-dev1/services/session-seven-deployment-only-example-rest>. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. services "session-seven-deployment-only-example-rest" is forbidden: User "system:serviceaccount:apl-edm-streaming-svcs-l3-dev1:flink" cannot get resource "services" in API group "" in the namespace "apl-edm-streaming-svcs-l3-dev1".
    	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
    	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
    	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
    	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
    	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
    	at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
    	at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
    	at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
    	at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
    	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)
    Caused by: org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: GET at: <https://10.43.0.1/api/v1/namespaces/apl-edm-streaming-svcs-l3-dev1/services/session-seven-deployment-only-example-rest>. Message: Forbidden!Configured service account doesn't have access. Service account may have been revoked. services "session-seven-deployment-only-example-rest" is forbidden: User "system:serviceaccount:apl-edm-streaming-svcs-l3-dev1:flink" cannot get resource "services" in API group "" in the namespace "apl-edm-streaming-svcs-l3-dev1".
    Permissions kubectl get serviceaccounts NAME SECRETS AGE flink 1 154d flink-operator 1 154d kubectl get roles NAME CREATED AT flink 2023-02-08T185229Z flink-operator 2023-07-11T205124Z MU-C02CF4WYMD6R:seven rxahuja$ kubectl describe role flink Name: flink Labels: app.kubernetes.io/managed-by=Helm app.kubernetes.io/name=flink-kubernetes-operator app.kubernetes.io/version=1.5.0 helm.sh/chart=flink-kubernetes-operator-1.5.0 Annotations: helm.sh/resource-policy: keep meta.helm.sh/release-name: flink-kubernetes-operator meta.helm.sh/release-namespace: apl-edm-streaming-svcs-l3-dev1 PolicyRule: Resources Non-Resource URLs Resource Names Verbs --------- ----------------- -------------- ----- configmaps [] [] [*] pods [] [] [*] deployments.apps/finalizers [] [] [*] deployments.apps [] [] [*] MU-C02CF4WYMD6R:seven rxahuja$ kubectl describe role flink-operator Name: flink-operator Labels: app.kubernetes.io/managed-by=Helm app.kubernetes.io/name=flink-kubernetes-operator app.kubernetes.io/version=1.6-SNAPSHOT helm.sh/chart=flink-kubernetes-operator-1.6-SNAPSHOT Annotations: <none> PolicyRule: Resources Non-Resource URLs Resource Names Verbs --------- ----------------- -------------- ----- configmaps [] [] [*] deployments [] [] [*] events [] [] [*] flinkdeployments.flink.apache.org [] [] [*] pods [] [] [*] replicasets [] [] [*] services [] [] [*] deployments.apps/finalizers [] [] [*] deployments.apps [] [] [*]
    b
    • 2
    • 2
  • m

    Mohammad Saif Malek

    07/13/2023, 10:37 AM
    Hello guys can you help me with this question https://stackoverflow.com/questions/76676862/not-able-to-recover-states-after-restart-apche-flink-with-spring-boot
  • j

    Jashwanth S J

    07/13/2023, 11:02 AM
    Hi Team, How can we set operatorParallelism for each FlinkSessionJob? I see only job level parallelism is available from CRD https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
    g
    • 2
    • 2
  • a

    Alex Bryant

    07/13/2023, 12:20 PM
    Please can you help me troubleshoot this error message in our taskmanager logs?
    Copy code
    2023-07-13 22:07:27 2023-07-13 12:07:27,393 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Transport type 'auto': using EPOLL.
    2023-07-13 22:07:27 2023-07-13 12:07:27,394 INFO  org.apache.flink.runtime.io.network.netty.NettyClient        [] - Successful initialization (took 34 ms).
    2023-07-13 22:07:27 2023-07-13 12:07:27,397 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Transport type 'auto': using EPOLL.
    2023-07-13 22:07:27 2023-07-13 12:07:27,419 INFO  org.apache.flink.runtime.io.network.netty.NettyServer        [] - Successful initialization (took 23 ms). Listening on SocketAddress /0.0.0.0:42927.
    2023-07-13 22:07:27 2023-07-13 12:07:27,420 INFO  org.apache.flink.runtime.taskexecutor.KvStateService         [] - Starting the kvState service and its components.
    2023-07-13 22:07:27 2023-07-13 12:07:27,451 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService             [] - Starting RPC endpoint for org.apache.flink.runtime.taskexecutor.TaskExecutor at <akka://flink/user/rpc/taskmanager_0> .
    2023-07-13 22:07:27 2023-07-13 12:07:27,461 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Start job leader service.
    2023-07-13 22:07:27 2023-07-13 12:07:27,464 INFO  org.apache.flink.runtime.filecache.FileCache                 [] - User file cache uses directory /tmp/flink-dist-cache-6ae1b4c5-61be-4d22-a17e-7044a879417f
    2023-07-13 22:07:27 2023-07-13 12:07:27,465 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to ResourceManager <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)>.
    2023-07-13 22:07:27 2023-07-13 12:07:27,510 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@localhost:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@localhost:6123>]] Caused by: [java.net.ConnectException: Connection refused: localhost/127.0.0.1:6123]
    2023-07-13 22:07:27 2023-07-13 12:07:27,511 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: localhost/127.0.0.1:6123
    2023-07-13 22:07:27 2023-07-13 12:07:27,516 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*>.
    2023-07-13 22:07:37 2023-07-13 12:07:37,541 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [null] failed with java.net.ConnectException: Connection refused: localhost/127.0.0.1:6123
    2023-07-13 22:07:37 2023-07-13 12:07:37,542 WARN  akka.remote.ReliableDeliverySupervisor                       [] - Association with remote system [<akka.tcp://flink@localhost:6123>] has failed, address is now gated for [50] ms. Reason: [Association failed with [<akka.tcp://flink@localhost:6123>]] Caused by: [java.net.ConnectException: Connection refused: localhost/127.0.0.1:6123]
    2023-07-13 22:07:37 2023-07-13 12:07:37,543 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Could not resolve ResourceManager address <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*>, retrying in 10000 ms: Could not connect to rpc endpoint under address <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*>.
    The jobmanger seems to be start fine and doing what we expect, listening to a Kafka Topic:
    Copy code
    ...
    2023-07-13 22:17:40 2023-07-13 12:17:40,409 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@6d61af7a for Kafka Streaming Job for topic: dummy_topic (cddea62acc79eb3bfe533bb4e14642f2).
    2023-07-13 22:17:40 2023-07-13 12:17:40,416 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting execution of job 'Kafka Streaming Job for topic: dummy_topic' (cddea62acc79eb3bfe533bb4e14642f2) under job master id 00000000000000000000000000000000.
    2023-07-13 22:17:40 2023-07-13 12:17:40,418 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
    2023-07-13 22:17:40 2023-07-13 12:17:40,418 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Kafka Streaming Job for topic: dummy_topic (cddea62acc79eb3bfe533bb4e14642f2) switched from state CREATED to RUNNING.
    2023-07-13 22:17:40 2023-07-13 12:17:40,420 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: Kafka Source -> Sink: Print to Std. Out (1/1) (8a7317ac5692db1fd2fcedd39324f25a_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from CREATED to SCHEDULED.
    2023-07-13 22:17:40 2023-07-13 12:17:40,428 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Connecting to ResourceManager <akka.tcp://flink@localhost:6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)>
    2023-07-13 22:17:40 2023-07-13 12:17:40,431 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - Resolved ResourceManager address, beginning registration
    2023-07-13 22:17:40 2023-07-13 12:17:40,432 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registering job manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 for job cddea62acc79eb3bfe533bb4e14642f2.
    2023-07-13 22:17:40 2023-07-13 12:17:40,434 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Registered job manager 00000000000000000000000000000000@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 for job cddea62acc79eb3bfe533bb4e14642f2.
    2023-07-13 22:17:40 2023-07-13 12:17:40,438 INFO  org.apache.flink.runtime.jobmaster.JobMaster                 [] - JobManager successfully registered at ResourceManager, leader id: 00000000000000000000000000000000.
    2023-07-13 22:17:40 2023-07-13 12:17:40,439 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Received resource requirements from job cddea62acc79eb3bfe533bb4e14642f2: [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, numberOfRequiredSlots=1}]
    2023-07-13 22:17:40 2023-07-13 12:17:40,474 INFO  org.apache.flink.StreamingJob                                [] - Flink streaming job for Kafka topic 'dummy_topic' started
    Kakfa is running fine as I can see messages coming into that topic fine. Our docker-compose for Flink is as follows:
    Copy code
    version: "3.3"
    services:
      jobmanager:
        restart: always
        image: <http://deeprecognition.azurecr.io/runtime-flink_jobmanager1:${BUILD_NUMBER}|deeprecognition.azurecr.io/runtime-flink_jobmanager1:${BUILD_NUMBER}>
        container_name: runtime-flink_jobmanager1
        expose:
          - "${JOB_MANAGER_PORT}"
        ports:
          - "${JOB_MANAGER_PORT}:${JOB_MANAGER_PORT}"
        command: jobmanager
        dns:
          - ${ENVIRONMENT_DNS}
        environment:
          - FQDN_SUFFIX=${FQDN_SUFFIX}
          - JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS}
          - JOB_MANAGER_RPC_PORT=${JOB_MANAGER_RPC_PORT}
          - KAFKA_HOST=${KAFKA_HOST}
          - KAFKA_PORT=${KAFKA_PORT}
          - KAFKA_TOPIC=${KAFKA_TOPIC}
          - VM_AGENT_NAME=${VM_AGENT_NAME}
    
      taskmanager:
        restart: always
        image: <http://deeprecognition.azurecr.io/runtime-flink_taskmanager1:${BUILD_NUMBER}|deeprecognition.azurecr.io/runtime-flink_taskmanager1:${BUILD_NUMBER}>
        container_name: runtime-flink_taskmanager1
        depends_on:
          - jobmanager
        expose:
          - "${TASK_MANAGER_PORT}"
        ports:
          - "${TASK_MANAGER_PORT}:${TASK_MANAGER_PORT}"
        command: taskmanager
        dns:
          - ${ENVIRONMENT_DNS}
        environment:
          - JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS}
          - JOB_MANAGER_RPC_PORT=${JOB_MANAGER_RPC_PORT}
          - RESTART_STRATEGY_FAILURE_RATE_DELAY_BETWEEN_ATTEMPTS=30000
          - RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL=60000
          - RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL=3
          - RESTART_STRATEGY=failureRate
          - TASK_MANAGER_MEMORY_PROCESS_SIZE=4096m # TODO: adjust this value according to requirements per environment
          - TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
  • c

    Chris Tabakakis

    07/13/2023, 2:45 PM
    Good evening! I've downgraded Flink down to version 1.16.1 in order to use Gelly. I'm running a very simple app, which simply reads a large txt file (from hdfs) containing the edges of a graph, makes the graph, and then outputs the number of triangles in the graph (either by printing to console, or by writing it to a new file in hdfs). But, whenever I try to run the application on flink, where i have 1 job manager and 7 taskmanagers, one of them always fails at the first task, as shown in the screenshot below. The screenshot shows that there was a failure in the third job, and a cancellation in the first, but in fact, after looking at the error log for the taskmanager that failed the third task, the error states that the connection was unexpectedly closed by another taskmanager, which is always the one that didn't complete the first task.
    Copy code
    (7c47ca8b3c53e4646c5f02c6e329a650_6b68f1bbaa75cb9b23c5012ad11acdd0_1_0) switched from RUNNING to FAILED with failure cause: java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at run(TriangleEnumerator.java:72)) -> Combine (Reduce at run(TriangleEnumerator.java:74))' , caused an error: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger Reading Thread' terminated due to an exception: Connection unexpectedly closed by remote task manager '10.0.1.72/10.0.1.72:45235 [ 10.0.1.72:35991-49e4c4 ] '. This might indicate that the remote task manager was lost.
    I've been struggling to figure out what the cause of this might be. At first I thought it might be a problem with hadoop, but I tried a simpler program that reads the same data but just writes it to another file in hdfs, and it worked fine. I have the log from the task manager that didn't complete the first task, and it's set to loglevel=debug, because I couldn't find anything useful in info. I'm appending it here. I still think this might be related to hadoop because i'm noticing some weird things such as: • At the beginning, it states hadoop version = 2.8.3, which is not the case, my system is running hadoop 3.3.5, and I don't think I've set the version as an option somewhere. I've also placed the respective jar files in the lib folder, with hadoop version 3.3.5. • At various points, a WARN level message states Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables). However, by looking at the source code, i can see that it should only display this message after displaying various debug messages, looking for the hadoop configuration in my system. This indicates that it can't even find the environmental variables that I've set, which is concerning. I appreciate any help I can get, and will gladly provide additional info if prompted.
    j
    • 2
    • 12
  • r

    Ravi Nishant

    07/13/2023, 10:24 PM
    Hey Guys, I am trying to move from Standalone deployment to use Flink operator. I am using a custom Flink image where we copy over the job jar from
    /opt/lib
    to
    /opt/flink/lib
    . In the
    flinkdeployment
    custom resource then I refer job like below -
    Copy code
    job:
        jarURI: local:///opt/flink/lib/myjob-all.jar
        parallelism: 4
        upgradeMode: 'savepoint'
        state: 'running'
    In this case, I see the following exception and the job manager pod goes to failed state -
    Copy code
    flink-main-container Exception in thread "main" java.lang.VerifyError: Instruction type does not match stack map
     flink-main-container Exception Details:
     flink-main-container   Location:
     flink-main-container     org/apache/flink/util/FlinkUserCodeClassLoaders.wrapWithSafetyNet(Lorg/apache/flink/util/FlinkUserCodeClassLoader;Z)Lorg/apache/flink/util/MutableURLClassLoader; @20: areturn
     flink-main-container   Reason:
     flink-main-container     Type 'org/apache/flink/util/FlinkUserCodeClassLoader' (current frame, stack[0]) is not assignable to 'org/apache/flink/util/MutableURLClassLoader' (stack map, stack[0])
     flink-main-container   Current Frame:
     flink-main-container     bci: @20
     flink-main-container     flags: { }
     flink-main-container     locals: { 'org/apache/flink/util/FlinkUserCodeClassLoader', integer }
     flink-main-container     stack: { 'org/apache/flink/util/FlinkUserCodeClassLoader' }
     flink-main-container   Stackmap Frame:
     flink-main-container     bci: @20
     flink-main-container     flags: { }
     flink-main-container     locals: { 'org/apache/flink/util/FlinkUserCodeClassLoader', integer }
     flink-main-container     stack: { 'org/apache/flink/util/MutableURLClassLoader' }
     flink-main-container   Bytecode:
     flink-main-container     0000000: 1b99 0012 bb00 1d59 2a2a b600 1eb7 001f
     flink-main-container     0000010: a700 042a b0                           
     flink-main-container   Stackmap Table:
     flink-main-container     same_frame(@19)
     flink-main-container     same_locals_1_stack_item_frame(@20,Object[#84])
     flink-main-container 
     flink-main-container    at org.apache.flink.client.ClientUtils.buildUserCodeClassLoader(ClientUtils.java:63)
     flink-main-container    at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:145)
     flink-main-container    at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
     flink-main-container    at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
     flink-main-container    at org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:213)
     flink-main-container    at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:100)
     flink-main-container    at org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)
    However, if I refer the job from
    /opt/flink
    , everything works fine.
    s
    g
    • 3
    • 18
  • k

    Keyur Makwana

    07/14/2023, 6:25 AM
    Hi team, I am converting Table to Datastream, My table has a inner nested object but when I convert to Datastream from table it gives me below error, Can anyone help me to resolve the same?
    ✅ 1
    • 1
    • 4
  • r

    Raghunadh Nittala

    07/14/2023, 6:43 AM
    Hi Team, We’re trying to upgrade Flink version to 1.17.1 from 1.6.2. We have developed our Flink pipelines in Kotlin and have koin modules to inject the required dependencies. Now, we’re facing an AssertionError with calcite metadata with TablePlanner 1.17.1_2.12. We tried excluding calcite from jacoco, but didnt work. Can someone please help on this?
    java.lang.AssertionError
    at org.apache.calcite.rel.metadata.MetadataDef.<init>(MetadataDef.java:48)
    at org.apache.calcite.rel.metadata.MetadataDef.of(MetadataDef.java:64)
    at org.apache.calcite.rel.metadata.BuiltInMetadata$PercentageOriginalRows.<clinit>(BuiltInMetadata.java:345)
    at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows$RelMdPercentageOriginalRowsHandler.getDef(RelMdPercentageOriginalRows.java:231)
    at org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider.reflectiveSource(ReflectiveRelMetadataProvider.java:134)
    at org.apache.calcite.rel.metadata.RelMdPercentageOriginalRows.<clinit>(RelMdPercentageOriginalRows.java:42)
    at org.apache.calcite.rel.metadata.DefaultRelMetadataProvider.<init>(DefaultRelMetadataProvider.java:42)
    at org.apache.calcite.rel.metadata.DefaultRelMetadataProvider.<clinit>(DefaultRelMetadataProvider.java:28)
    at org.apache.calcite.plan.RelOptCluster.<init>(RelOptCluster.java:97)
    at org.apache.calcite.plan.RelOptCluster.create(RelOptCluster.java:106)
    at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$.create(FlinkRelOptClusterFactory.scala:36)
    at org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory.create(FlinkRelOptClusterFactory.scala)
    at org.apache.flink.table.planner.delegation.PlannerContext.<init>(PlannerContext.java:132)
    at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:121)
    at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:65)
    at org.apache.flink.table.planner.delegation.DefaultPlannerFactory.create(DefaultPlannerFactory.java:65)
    at org.apache.flink.table.factories.PlannerFactoryUtil.createPlanner(PlannerFactoryUtil.java:58)
    at org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.create(StreamTableEnvironmentImpl.java:127)
    at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:122)
    at org.apache.flink.table.api.bridge.java.StreamTableEnvironment.create(StreamTableEnvironment.java:94)
    m
    s
    • 3
    • 18
  • j

    jaiprasad

    07/14/2023, 11:57 AM
    Hello All , We have two flink pipelines 1) 1 job manager and 5 task manager 2) 3 job managers and 5 task manager We are using flink:1.14.4-scala_2.12-java11 and lyft flinkk8soperator We have deployed this flink on a multi host k8s cluster In both the above pipelines we see the task managers restarting with below exceptions
    Copy code
    2023-07-14 10:56:56.204 [flink-akka.actor.default-dispatcher-17] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Source: Custom Source -> Map (7/15) (73095e8cac2266887f4b7a0fc109c199) switched from DEPLOYING to FAILED on 10.233.118.126:43710-2c7cb4 @ 10.233.118.126 (dataPort=34133).
    org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException: Could not send message [RemoteRpcInvocation(null.submitTask(TaskDeploymentDescriptor, JobMasterId, Time))] from sender [Actor[<akka://flink/temp/taskmanager_0$+rod]]> to recipient [Actor[<akka.tcp://flink@10.233.118.126:43710/user/rpc/taskmanager_0#-1148004411]]>, because the recipient is unreachable. This can either mean that the recipient has been terminated or that the remote RpcService is currently not reachable.
            at org.apache.flink.runtime.rpc.akka.DeadLettersActor.handleDeadLetter(DeadLettersActor.java:61)
            at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
            at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
            at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
            at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
            at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
            at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
            at akka.actor.Actor.aroundReceive(Actor.scala:537)
            at akka.actor.Actor.aroundReceive$(Actor.scala:535)
            at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
            at akka.actor.ActorCell.invoke(ActorCell.scala:548)
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
            at akka.dispatch.Mailbox.run(Mailbox.scala:231)
            at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
            at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
            at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
            at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
            at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
            at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
    m
    • 2
    • 2
  • r

    Rajat Ahuja

    07/14/2023, 2:11 PM
    Hi Flink experts, I am facing this issue for quite some days as i am not able to submit/run SQL Queries using REST CLIENT OR FLINK SQL CLIENT via SQL gateway. Here's my Setup Cluster Set up via k8s operator to dpeloy Session Cluster, Service gateway and ingress files. I am able to run `` ./flink run exmaples/abc.jar from my local host , Job manager pod, and Service gateway pod`` however when i try to use Rest client/ Flink SQL to submit SQL queries via SQL GATEWAY . It gives following error. Not really sure how to solve this as i have tried to increase my configurations (rest.client.max-content-length, rest.server.max-content-length) as mentioned in the error everywhere but no luck . I would really appreciate if someone can give pointers to debug further as i have run out of idea.
    Copy code
    Caused by: org.apache.flink.runtime.rest.util.RestClientException: 413 Request Entity Too Large. Try to raise [rest.client.max-content-length]
    	at org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:615) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[flink-dist-1.17.0.jar:1.17.0]
    	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[flink-dist-1.17.0.jar:1.17.0]
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: session-seven-deployment-only-example
    spec:
      image: flink:1.17
      flinkVersion: v1_17
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "10"
        rest.server.max-content-length: "1209715200"
        rest.client.max-content-length: "1209715200"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
    apiVersion: <http://networking.k8s.io/v1|networking.k8s.io/v1>
    kind: Ingress
    metadata:
      annotations:
        <http://kubernetes.io/ingress.class|kubernetes.io/ingress.class>: k8-cps-dev
        <http://nginx.ingress.kubernetes.io/client-body-buffer-size|nginx.ingress.kubernetes.io/client-body-buffer-size>: 500M
      name: my-docker-app-ingress-3
    spec:
      rules:
      - host: <http://flink-sql-gateway.usb.cloud.bank-dns.com|flink-sql-gateway.usb.cloud.bank-dns.com>
        http:
          paths:
          - backend:
              service:
                name: flink-seven-sql-gateway-svc
                port:
                  number: 8086
            path: /
            pathType: Prefix
    Gateway
    Copy code
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-seven-sql-gateway
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: flink-seven-sql-gateway
      template:
        metadata:
          labels:
            app: flink-seven-sql-gateway
        spec:
          containers:
            - name: flink-seven-sql-gateway
              image: flink:1.17.0
              ports:
                - containerPort: 8086
              command: ["/bin/sh", "-c"]
              args:
                - "/opt/flink/bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=localhost -Dsql-gateway.endpoint.rest.port=8086"
              volumeMounts:
                - name: flink-conf
                  mountPath: /opt/flink/conf
          volumes:
            - name: flink-conf
              configMap:
                name: flink-conf
    Gateway svc
    Copy code
    apiVersion: <http://networking.k8s.io/v1|networking.k8s.io/v1> apiVersion: v1
    kind: Service
    metadata:
      name: flink-seven-sql-gateway-svc
    spec:
      selector:
        app: flink-seven-sql-gateway
      ports:
        - name: flink-seven-sql-gateway
          port: 8086
          targetPort: 8086
      type: ClusterIP
    Ingresses
    Copy code
    kind: Ingress
    metadata:
      annotations:
        <http://kubernetes.io/ingress.class|kubernetes.io/ingress.class>: k8-cps-dev
        <http://nginx.ingress.kubernetes.io/client-body-buffer-size|nginx.ingress.kubernetes.io/client-body-buffer-size>: 500M
      name: ingress-3
    spec:
      rules:
      - host: <http://flink-sql-gateway.usb.cloud.bank-dns.com|flink-sql-gateway.usb.cloud.bank-dns.com>
        http:
          paths:
          - backend:
              service:
                name: flink-seven-sql-gateway-svc
                port:
                  number: 8086
            path: /
            pathType: Prefix
    apiVersion: <http://networking.k8s.io/v1|networking.k8s.io/v1>
    kind: Ingress
    metadata:
      annotations:
        <http://kubernetes.io/ingress.class|kubernetes.io/ingress.class>: k8-cps-dev
        <http://nginx.ingress.kubernetes.io/client-body-buffer-size|nginx.ingress.kubernetes.io/client-body-buffer-size>: 500M
      name: ingress-2
    spec:
      rules:
      - host: <http://flink-prod-dns.usb.cloud.bank-dns.com|flink-prod-dns.usb.cloud.bank-dns.com>
        http:
          paths:
          - backend:
              service:
                name: session-seven-deployment-only-example-rest
                port:
                  number: 8081
            path: /
            pathType: Prefix
    Updated flink conf used as configmap
    Copy code
    rest.address: <http://flink-prod-dns.usb.cloud.bank-dns.com|flink-prod-dns.usb.cloud.bank-dns.com>
    rest.port: 80
    
    
    jobmanager.bind-host: <http://flink-prod-dns.usb.cloud.bank-dns.com|flink-prod-dns.usb.cloud.bank-dns.com>
    
    rest.server.max-content-length: 1409715200
    rest.client.max-content-length: 1409715200
    • 1
    • 1
  • s

    Sylvia Lin

    07/14/2023, 5:28 PM
    Hi team, we observed there's a lot of network communication btw task managers for stateful streaming pipeline, especially with large states. For stateless streaming service, do we expect a lot network communication between each task managers as well?
    r
    • 2
    • 6
  • d

    Daniel Packard

    07/14/2023, 6:21 PM
    Hi all - I have a flink job consuming messages from a kafka source The kafka source is configured with a consumer group id, and the flink job is actively consuming/processing messages However, in kafka-ui and using
    kafka-consumer-groups.sh
    cli - I'm getting reports of 0 active consumers 🤔 Is this something I should expect in my metrics/reporting?
    m
    s
    • 3
    • 5
  • d

    Dave Sugden

    07/14/2023, 6:34 PM
    Hi! Flink
    1.17.1
    , using SQL, we have issues completing checkpoints. We’ve tried enabled unaligned and not, and its not an issue with timing out, the checkpointing failure happens before the specified checkpoint timeout. The weird/interesting thing is, even with unaligned checkpoints, we have several operators (JOIN) that are back-pressured but never even acknowledge the checkpoint… Has anyone witnessed anything similar? thnx!
    m
    • 2
    • 14
1...9495969798Latest