https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • d

    Duc Anh Khu

    08/15/2022, 10:10 PM
    hi all, I'm running into a very odd issue with PyFlink and wondering has anyone seen it before. This is running local in session mode against a dockerised
    flink:1.13.2-scala_2.12-java11
    cluster. Thank you 🙏 .
    Copy code
    env.add_source(kafka_consumer).print()
    This works fine until I add a
    .map
    such as:
    Copy code
    env.add_source(kafka_consumer).map(lambda a: a).print()
    The error I'm getting is:
    Copy code
    Caused by: java.io.IOException: Cannot run program "python": error=2, No such file or directory
    	at java.base/java.lang.ProcessBuilder.start(Unknown Source)
    	at java.base/java.lang.ProcessBuilder.start(Unknown Source)
    	at org.apache.flink.python.util.PythonEnvironmentManagerUtils.execute(PythonEnvironmentManagerUtils.java:193)
    	at org.apache.flink.python.util.PythonEnvironmentManagerUtils.getPythonUdfRunnerScript(PythonEnvironmentManagerUtils.java:154)
    	at org.apache.flink.python.env.beam.ProcessPythonEnvironmentManager.createEnvironment(ProcessPythonEnvironmentManager.java:177)
    	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createPythonExecutionEnvironment(BeamPythonFunctionRunner.java:353)
    	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.lambda$open$0(BeamPythonFunctionRunner.java:261)
    	at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$5(MemoryManager.java:539)
    	at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:126)
    	at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
    	at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:555)
    	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:264)
    	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:121)
    	at org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator.open(OneInputPythonFunctionOperator.java:108)
    	at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    	at java.base/java.lang.Thread.run(Unknown Source)
    Caused by: java.io.IOException: error=2, No such file or directory
    	at java.base/java.lang.ProcessImpl.forkAndExec(Native Method)
    	at java.base/java.lang.ProcessImpl.<init>(Unknown Source)
    	at java.base/java.lang.ProcessImpl.start(Unknown Source)
    	... 23 more
    The command that I'm running is:
    Copy code
    flink run --python apps/app_name.py --jobmanager localhost:28081
    x
    • 2
    • 6
  • d

    Donatien Schmitz

    08/16/2022, 8:12 AM
    [Fine-Grain Resource Management] Hey everyone. When defining SlotSharingGroups at the user level, the fact that the number of task slots per TM is fixed is an incompatibility with fine-grain resource management. e.g I have a TM with two task slots and my Job is composed of 3 operators, each in their own SSG. The scheduler won't find the required resources even if the total Resource Profile specification fits on this TM. My question is: Is there a way to gracefully ignore the number of task slots available per TM in this specific case at the scheduler level?
  • b

    Bumblebee

    08/16/2022, 12:53 PM
    Has anybody tested flink kubernetes operator 1.1.0 on k3s ? i tried to install on rancher desktop which uses k3s. this is the error i get in the logs of flink-kubernetes.
    at io.javaoperatorsdk.operator.Operator.start(Operator.java:100)
    at org.apache.flink.kubernetes.operator.FlinkOperator.run(FlinkOperator.java:182) at org.apache.flink.kubernetes.operator.FlinkOperator.main(FlinkOperator.java:187) 2022-08-16 125022,257 i.j.o.Operator [INFO ] Operator SDK 3.0.3 is shutting down... 2022-08-16 125022,258 i.j.o.p.e.s.i.InformerManager [INFO ] Stopping informer io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager@507b79f7 -> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper@64a9d48c 2022-08-16 125022,258 i.j.o.p.e.s.i.InformerManager [INFO ] Stopping informer io.javaoperatorsdk.operator.processing.event.source.informer.InformerManager@1616022c -> io.javaoperatorsdk.operator.processing.event.source.informer.InformerWrapper@77a171b6 Exception in thread “main” io.javaoperatorsdk.operator.MissingCRDException: ‘flinkdeployments.flink.apache.org’ v1 CRD was not found on the cluster, controller ‘flinkdeploymentcontroller’ cannot be registered at io.javaoperatorsdk.operator.processing.Controller.throwMissingCRDException(Controller.java:337) at io.javaoperatorsdk.operator.processing.Controller.start(Controller.java:309) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(Unknown Source) at java.base/java.util.HashMap$ValueSpliterator.forEachRemaining(Unknown Source) at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) at java.base/java.util.stream.ForEachOps$ForEachTask.compute(Unknown Source) at java.base/java.util.concurrent.CountedCompleter.exec(Unknown Source) at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinTask.doInvoke(Unknown Source) at java.base/java.util.concurrent.ForkJoinTask.invoke(Unknown Source) at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateParallel(Unknown Source) at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(Unknown Source) at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) at java.base/java.util.stream.ReferencePipeline.forEach(Unknown Source) at java.base/java.util.stream.ReferencePipeline$Head.forEach(Unknown Source) at io.javaoperatorsdk.operator.Operator$ControllerManager.start(Operator.java:219) at io.javaoperatorsdk.operator.Operator.start(Operator.java:100) at org.apache.flink.kubernetes.operator.FlinkOperator.run(FlinkOperator.java:182) at org.apache.flink.kubernetes.operator.FlinkOperator.main(FlinkOperator.java:187)
    g
    • 2
    • 16
  • h

    Hunter Medney

    08/16/2022, 1:24 PM
    Hi all, I have several SQL jobs that I'm planning to run off the same Kafka topic - each performing queries and aggregations from messages in the topic. The number of jobs could get into the 100s - some will be simple, others complex. They will all sink into summary messages in another topic. What's the most efficient way to structure these in Kubernetes? Options I see: • Each SQL job is its own Flink Deployment / mini-cluster • Each SQL job runs in a large Flink cluster • One big job that branches for each query (1 source -> 100+ SQL operators)
  • p

    Pedro Cunha

    08/16/2022, 4:32 PM
    Hello everyone. I have a flink job running on VVP on EKS. I’m using RocksDB and saving state and savepoints in S3. I want to download one of the savepoints into my local environment and start my flink job with that savepoint, but I’ve run into several issues. 1. Flink can’t read savepoints from s3 out of the box. Needs the plugin
    s3-fs-presto
    copied to the
    plugins
    folder in order to read paths that have
    s3
    on them. 2. After adding the plugin, I was expecting Flink to read the savepoint no problem, but to my surprise, instead of using the data that’s on the savepoint I downloaded, it still tries to connect to AWS… I don’t want to do this, I want to be able to read the savepoint locally. Is there anyway to workaround point nr 2 other than configure the access to AWS?
    c
    • 2
    • 25
  • k

    karthik

    08/16/2022, 4:38 PM
    Hi everyone. I'm looking for ways to create a streaming pipeline from kafka to s3 in iceberg format. I'm trying to do this with flink SQL, and was wondering if there is any way to automatically detect schema changes. All the examples I come across are defining the source schema statically, like this. Is there a simple way to achieve what I'm trying to do?
    • 1
    • 1
  • i

    iLem0n

    08/16/2022, 9:41 PM
    Hello together, am i right that you can have only Side-output or State and not both in a UDF. I would need both since i have a UseCase where two input streams will generate two distinct output streams and one of them can only be consitent with state. I was expecting someting like a
    RichCoProcessFunction
    . Is there anything like that?
    d
    • 2
    • 2
  • k

    Krish Narukulla

    08/16/2022, 10:40 PM
    Is it possible to read
    avro
    message format dynamically without specifying schema class? something like below.
    Copy code
    CREATE TEMPORARY TABLE XXX (
              `total_time_playing` BIGINT,
              `device_id` STRING,
              `channel_id` STRING,
              `occurrence` BIGINT
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'S000011',
              'properties.bootstrap.servers' = '<brokers>',
              'properties.group.id' = 'flinkapp-test',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'avro'
            )
  • z

    Z Mario

    08/17/2022, 12:57 AM
    Hi team, I installed the flink operator in an on-premise K8s cluster but I am continuously getting several lines of this error message from both the manager and kube-rbac-proxy containers. http: TLS handshake error from [IPv6]&lt;port&gt; remote error: tls: bad certificate (I’ve omitted IPv6 and port number for security reasons) I checked existence of webhook-server-cert and found that it is created where it should be and all other resources were created as well. I also have cert-manager installed in the cluster. This error leads to failure of creating a jobmanager resource. What is the likely issue?
    g
    j
    • 3
    • 9
  • h

    Hilmi Al Fatih

    08/17/2022, 7:09 AM
    Hi, I am currently doing a simple kafka-kafka pipeline with flink, where the requirement is the ability to add/delete consumed topics in the source by restarting from savepoint. I am currently using flink 1.14.4. So from my test, I observed the followings: 1. Adding new topic works fine. 2. Removing a topic is not, i.e. the old topic that has been deleted from subscribed list is still consumed. ( I saw the message “Discovered removed partitions” though) Then I checked the source code, and I found this line in here.
    // TODO: Handle removed partitions.
    When I check the master branch, it seems that the todo is still there, so I am wondering is there any plan to resolve this soon? Or is it already implemented?
    • 1
    • 1
  • t

    Tiansu Yu

    08/17/2022, 12:04 PM
    Hi, I am quite new to Flink. I want to ask here is it possible to configure logging for a Flink App runtime started from a maven project? In particular The flink application will be started by
    mvn compile exec:java -D...
    . I noticed a naive standalone cluster started this way does not pick up
    src/main/resources/log4j.properties
    , which basically eats all my`logger.info()` and whatnots. I want to ask if there is anyway I can configure the logging configures for a cluster generated this way?
    ✅ 1
    c
    • 2
    • 13
  • a

    Alexis Josephides (Contractor)

    08/17/2022, 1:52 PM
    👋 I’m looking for a bit of guidance here. I do hope the community can help. I am using Kinesis Data Analytics (KDA) which is Flink 1.13.2 under the hood. I have a simple pipeline. No transformations. 2 different kafka topics as source sinking to s3. They are setup as 2 different kafka sources as the deserialisation is different for both. One of the topics has 4 partitions and the other has 40. I’ve noticed that when the parallelism of the job is above 4 the whole Flink job ceases to sink to s3. The reason as far as I can tell is because the kafka topic with 4 partitions is assigned more than 4 tasks which results in idle tasks that report as
    FINISHED
    thus stopping Checkpoints working across the application. The error I see in logs is:
    Copy code
    Failed to trigger checkpoint for job ba29c0e38410cd31b3a9870b7d384807 since some tasks of job ba29c0e38410cd31b3a9870b7d384807 has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running."
    I (maybe rather naively) thought that I could do a
    rebalance()
    after the kafka source is added but the issue persists as it breaks the chaining. There is a single StreamExecutionEnvironment and so code is something like:
    Copy code
    var env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.fromSource(topic1.getKafkaSource(),etc...).rebalance()
    env.fromSource(topic2.getKafkaSource(),etc...).rebalance()
    env.addSink(s3Sink1)
    env.addSink(s3Sink2)
    I’m wondering if I can set a parallelism on each of these steps for each operator or will it be across the whole datastream? The load on the topic with 40 partitions will not be sustained if the parallelism is set to 4 across the whole stream. Do hope this makes sense and thanks in advance for any insight
    c
    h
    • 3
    • 18
  • d

    David Wisecup

    08/17/2022, 2:24 PM
    Hi everyone. Quick question: can you use the Table API to do a
    GROUP BY CUBE (...)
    Haven't figured out so far how to do that so I had to write the SQL as a string then execute it via
    Table rows = tableEnv.sqlQuery(sqlStr);
    Thanks for any help.
  • a

    Aeden Jameson

    08/17/2022, 3:51 PM
    After upgrading from 1.13.5 to 1.15.1 while maintaining the use of the deprecated classes FlinkConsumer and FlinkProducer I observed that task manager metrics collected by our Prometheus reporter stopped working. From reading other threads in this channel on this same issue it appears this is now a known bug. My questions are, • what's the link to the issue? • Is there a workaround other than migrating to KafkaSource and KafkaSink? • Will this be addressed in 1.15.2 and when is that anticipated to be released?
    c
    • 2
    • 2
  • k

    Krish Narukulla

    08/17/2022, 6:10 PM
    How can i supply avro schema to kafka connector?
    Copy code
    CREATE TEMPORARY TABLE XXX (
              `total_time_playing` BIGINT,
              `device_id` STRING,
              `channel_id` STRING,
              `occurrence` BIGINT
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'S000011',
              'properties.bootstrap.servers' = '<brokers>',
              'properties.group.id' = 'flinkapp-test',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'avro'
            )
    s
    j
    • 3
    • 8
  • d

    David Christle

    08/18/2022, 1:07 AM
    Are there any users here who consume events via the PubSub connector? I'm also curious whether anyone has gotten a PubSub Lite source working. Nominally, PubSubLite has some shims from Google designed to make the Kafka Java library work with PubSub Lite. I'm wondering if it's possible to get the shims working with the Flink Kafka connector.
  • s

    Sandeep Kathula

    08/18/2022, 1:08 AM
    In Flink application, I am trying to read from Kafka, do some transformations and write back to kafka. I want to read headers as well and forward them to the topic which I write to. I am trying to use
    Copy code
    KafkaSource.<ConsumerRecord>builder()
                    .setProperties(consumerProperties)
                    .setStartingOffsets(offsetStrategy(readConfig))
                    .setTopics(topic)
                    .setDeserializer(
                            new KafkaRecordDeserializationSchema<ConsumerRecord>() {
                                @Override
                                public TypeInformation<ConsumerRecord> getProducedType() {
                                    return TypeInformation.of(ConsumerRecord.class);
                                }
    
                                @Override
                                public void open(DeserializationSchema.InitializationContext context)
                                        throws Exception {}
    
                                @Override
                                public void deserialize(
                                        ConsumerRecord<byte[], byte[]> consumerRecord,
                                        Collector<ConsumerRecord> collector)
                                        throws IOException {
                                    collector.collect(consumerRecord);
                                }
                            })
                    .build();
        }
    but because of having Headers within the consumer record, its using kryo as the deserializer and I am seeing very less throughput. If I remove headers while deserialization, I am seeing almost 9X throughput. Can someone help me to know how to deserialize the headers efficiently without using Kryo?
    d
    s
    • 3
    • 3
  • d

    Duc Anh Khu

    08/18/2022, 8:52 AM
    hi folks, I'm writing a
    CoFlatMapFunction
    with value states and trying to write a unit test for it. However, the
    open
    and
    flat_map
    methods are never called, only
    __init__
    is being called:
    Copy code
    class MyCoFlatMap(CoFlatMapFunction):
        def __init__(self):
            print("!! __init__ called")
            self.user_id_state = None
            self.events_state = None
    
        def open(self, runtime_context: RuntimeContext):
            print("!! open called")
            user_id_stt_desc = ValueStateDescriptor(
                "user_id",
                Types.PICKLED_BYTE_ARRAY()
            )
            events_stt_desc = ListStateDescriptor(
                "events",
                Types.PICKLED_BYTE_ARRAY()
            )
            self.user_id_state = runtime_context.get_state(user_id_stt_desc)
            self.events_state = runtime_context.get_list_state(events_stt_desc)
    
        def flat_map1(self, value):
            print("!! flat_map1 called")
            yield value
    
        def flat_map2(self, value):
            print("!! flat_map2 called")
            yield value
    
    def test_co_flat_map_with_states(self):
        self.env.add_python_file("../../../apps")
        self.env.set_state_backend(MemoryStateBackend())
        ds1 = self.env.from_collection(
            [signed_in_event],
            type_info=self.type_info
        )
    
        ds2 = self.env.from_collection([], type_info=self.type_info)
        ds1.connect(ds2).key_by(
            key_selector,
            key_selector,
            key_type=Types.STRING()
        ).flat_map(MyCoFlatMap(), output_type=self.type_info).add_sink(self.test_sink)
        self.env.execute()
        results = self.test_sink.get_results(False)
        expected = [j_signed_in_event]
        self.assert_equals_sorted(expected, results)
    I'm using test util from Dian Fu's repo. Any suggestions would be much appreciated. Cheers
    x
    • 2
    • 11
  • j

    Jaya Ananthram

    08/18/2022, 11:59 AM
    Hello 👋: Has anyone tried the BigQuery Sink connector using Flink? I see an open JIRA for this, but interested to know if anyone wrote something.
    👍 1
    s
    • 2
    • 2
  • j

    Jirawech Siwawut

    08/18/2022, 2:11 PM
    Hello 👋. Is there anyone here have tried implementing Redis sink using Redis pipelining? I need to sink data from different zone and it is quite slow.
    s
    • 2
    • 2
  • l

    Lee Wallen

    08/18/2022, 5:02 PM
    I’m hitting an issue while trying to deploy a Flink 1.15 job to a k3s cluster using Flink Kubernetes Operator 1.1.0. The job reads from a topic, does a simple map, and then sends output to a PrintSinkFunction. The job runs fine locally, and can consume messages from a topic in a kafka cluster. When I deploy the Flink job to the k3s cluster, I end up getting this exception:
    Copy code
    java.lang.ClassCastException: cannot assign instance of org.apache.kafka.clients.consumer.OffsetResetStrategy to field org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer.offsetResetStrategy of type org.apache.kafka.clients.consumer.OffsetResetStrategy in instance of org.apache.flink.connector.kafka.source.enumerator.initializer.ReaderHandledOffsetsInitializer
    I found a single entry in stackoverflow with a suggestion that the dependencies used when running locally don’t match what is running in the person’s cluster. I verified that the image I made has the 1.15.0 versions of the flink libraries just like the flink app’s dependencies. Also, I get this exception whether I set the starting offsets or not, so I it isn’t related to explicitly setting an offset. One thing to note - I used the Strimzi Kafka operator to spin up a 3.2.0 Kafka cluster with the broker protocol version set to 3.2. I wouldn’t expect that there would be an issue due to the Kafka version but figured I would mention it in case there is a Kafka connector version issue. Here are my current settings:
    Copy code
    final KafkaSource<String> source = KafkaSource
            .<String>builder()
            .setBootstrapServers("my-cluster-kafka-bootstrap:9092")
            .setStartingOffsets(OffsetsInitializer.earliest())
            .setTopics("incoming-events")
            .setValueOnlyDeserializer(new SimpleStringSchema())
            .setProperties(config.consumer())
            .build();
    Does anyone recognize this issue, and/or does anyone have any suggestions on where I should look for the source of the issue?
    h
    • 2
    • 3
  • a

    Almark Cao

    08/19/2022, 7:03 AM
    hi team, can we save flink identity such as
    flink.version
    when saving metadata into hive metastore (TABLE_PARAMS table), i think it's very useful for compatibility check and management purpose. in my case, i can collect hive metadata with the create engine info for metadata management system (datahub e.g.). I see spark and trino have a constant property (
    presto_version
    and
    spark.sql.create.version
    ) to identifier the create engine, but flink does not.
  • s

    Sumit Nekar

    08/19/2022, 7:52 AM
    Hi Team, I am using flink operator with flink 1.15.1. For some reason I had deleted my flink app. But I still see operator is trying to restart the my flink application using ha data. I tried deleting ha folder but somehow operator has access to my previous job graph. This is my local k8s cluster and ha directory is in azure blob storage. Which directory i need to clean up to start from clean state again?
    g
    • 2
    • 12
  • m

    Mustafa Akur

    08/19/2022, 8:35 AM
    Hi all. In Flink SQL, after join operation watermark strategy is lost (even though source has a strategy). I want to be able to use ORDER BY (which require watermark strategy) in subsequent operations. Is there any way to not lose watermark strategy or add it not at the source but at the intermediate views? You can find the more detailed info and queries for replication at the link https://stackoverflow.com/questions/73413870/flink-sql-watermark-strategy-after-join-operation. Kind Regards
  • n

    Nithin kharvi

    08/19/2022, 11:22 AM
    Hello guys, is 1.14.1 version of flink-connector-kafka is compatible with 2.8.1 version of kafka-client ?
    c
    • 2
    • 2
  • r

    Roman Bohdan

    08/19/2022, 12:15 PM
    Hello, guys, i have a question.
    state.backend.rocksdb.localdir
    if I will put this property, will it save state from rocksdb to directory and will jobmanager get it after starting/restarting?
  • a

    Aeden Jameson

    08/19/2022, 7:16 PM
    What’s fastest way to get a 1.16 Flink docker image ahead of release?
    g
    • 2
    • 1
  • c

    chunilal kukreja

    08/20/2022, 1:07 PM
    Hi Team, I am facing an issue, where if i enable “state.backend: rocksdb” & other related config. I don’t see checkpointing enabled with rocksdb (WebUI still shows HashMapStatebackend in use).. while if i do the same from code like
    Copy code
    //        env.enableCheckpointing(1000);
    //        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    //        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(700);
    //
    //        env.setStateBackend(new EmbeddedRocksDBStateBackend());
    //        env.getCheckpointConfig().setCheckpointStorage("file:///Users/chunikukreja/checkpoint_storage");
    checkpointing with rocksdb get enabled.. Any pointer will help me to understand if I am missing something?
  • s

    Shen Zhu

    08/21/2022, 5:25 AM
    Hi team, I have a question about using Flink SQL DDL in version 1.12.1 Let's say I have the following DDL
    Copy code
    CREATE TABLE SourceKafkaTable (
            userId STRING,
            sessionId STRING
          ) WITH (
             'connector' = 'kafka',
             'topic' = 'test_topic',
             'properties.group.id' = 'test_group_id'
    )
    And later apply the following query
    Copy code
    val table1 = tableEnv.sqlQuery("SELECT COUNT(*) FROM SourceKafkaTable GROUP BY userId)
    val table2 = tavleEnv.sqlQuery("SELECT COUNT(*) FROM SourceKafkaTable GROUP BY sessionId)
    Then does both
    table1
    and
    table2
    get all the data from
    SourceKafkaTable
    ? Or each table only get a part of it? Thanks for your help!
    l
    • 2
    • 6
  • l

    Leo Xiong

    08/22/2022, 8:23 AM
    Hey all. We’re running a Flink cluster in native-Kubernetes session mode (with Apache Beam), where task manager pods are dynamically spun up by job managers as jobs are submitted. I expected the task managers to terminate after each task is complete, but it seems to be waiting around afterwards for some time in case new jobs come in. Is there a way to change this behavior? We want to guarantee isolation between jobs.
    l
    • 2
    • 2
1...131415...98Latest