https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • n

    Nishant Goenka

    06/27/2023, 1:06 PM
    HI Team, I am using flink 1.15.2 with kafka source and kafka sink. I am getting below exception and flink is keep on retrying the same events, what is the cause I am not able to find it.
    Copy code
    WARN org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher [] - Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
    m
    • 2
    • 1
  • s

    Slackbot

    06/27/2023, 5:14 PM
    This message was deleted.
    m
    i
    • 3
    • 3
  • i

    Ishan

    06/27/2023, 5:16 PM
    Hi Team, Can HiveCatalog talk to HMS using
    jdbc
    ? or thrift service is the only path? Also, in general when running Flink SQL client on local machine, is it common practice to have a metastore thrift service running locally as well? How do people get around if the HMS is not accessible by local instance of Flink SQL client.
    • 1
    • 1
  • i

    Ilya Sterin

    06/27/2023, 5:25 PM
    We are constantly getting:
    Copy code
    java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
    	at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$assignResource$4(DefaultExecutionDeployer.java:227)
    	at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source)
    	at org.apache.flink.runtime.jobmaster.slotpool.PendingRequest.failRequest(PendingRequest.java:88)
    	at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.cancelPendingRequests(DeclarativeSlotPoolBridge.java:185)
    	at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.failPendingRequests(DeclarativeSlotPoolBridge.java:408)
    	at org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPoolBridge.notifyNotEnoughResourcesAvailable(DeclarativeSlotPoolBridge.java:396)
    	at org.apache.flink.runtime.jobmaster.JobMaster.notifyNotEnoughResourcesAvailable(JobMaster.java:887)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$0(AkkaRpcActor.java:301)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:300)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
    	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    	at akka.actor.Actor.aroundReceive(Actor.scala:537)
    	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:579)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    	at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
    	at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
    	at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
    	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
    Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
    	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    	at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source)
    	... 38 more
    Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
    • 1
    • 2
  • s

    Slackbot

    06/27/2023, 5:26 PM
    This message was deleted.
    m
    i
    • 3
    • 3
  • l

    Liza M

    06/27/2023, 6:04 PM
    Is there a way to set parallelism for operators in Flink SQL?
    m
    • 2
    • 2
  • b

    Bharathkrishna G M

    06/27/2023, 7:04 PM
    Hi, I upgraded Flink version from 1.12 to 1.16.1 and the table.connect method is no longer present. I wanted to write Avro records to a Kafka Sink. Can anyone point me to examples that do this in new versions of Flink ?
    m
    • 2
    • 1
  • c

    Control the Guh

    06/28/2023, 12:47 AM
    Is it possible for flink to receive data from a broker, that is running on another docker image? I want to connect my redpanda instance with flink, but it can't get it on localhost and or otherwise. I need the distribution in the same docker image which is frustrating, as i need to connect mutliple programs to it
    l
    • 2
    • 1
  • n

    Neha

    06/28/2023, 10:18 AM
    has anybody tried setting Profiling with Java Flight Recorder option on yarn deployed flink containers? https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/#profiling-with-java-flight-recorder I tried multiple options but no luck. I am getting this in jobmanager.out logs: -XX:FlightRecorderOptions=defaultrecording=... has been removed. Use -XX:StartFlightRecording=disk=false to create an in-memory recording. Error occurred during initialization of VM Failure when starting JFR on_vm_start what is the issue with JFR with yarn container, can we get more logs somewhere?
  • j

    Jirawech Siwawut

    06/28/2023, 12:23 PM
    Hi. I have some question about increase kafka partition. If I have Flink application running using Flink Table API and set parallelism to be 8. Then, I increase Kafka partition from 8 to 16. I know that Flink can discover new partition but, will there be any impact to running application e.g. Flink state and etc if I have window group in my application.
    m
    • 2
    • 1
  • n

    Nathanael England

    06/28/2023, 4:43 PM
    What setting should I use in the k8s operator to make the rest job service a
    NodePort
    so I can look at the UI in a web browser?
  • d

    dp api

    06/29/2023, 7:50 AM
    Hi All, Just wondering whether there is a Pattern API for Python (https://nightlies.apache.org/flink/flink-docs-master/docs/libs/cep/) Is it possible to implement CEP in Python ? Would be lovely if someone can share any walkthrough/tutorial/example of CEP in Python.
    k
    • 2
    • 2
  • h

    Hussain Abbas

    06/29/2023, 9:30 AM
    Hello, I have one question, i have implemented flink autoscaler, i am just confused how it manages the POD usage, i understood that it is per operator level but if it requires more resouce for CPU/Memory how does it manages it? Do we have to use external HPA with that?
  • s

    Slackbot

    06/29/2023, 10:44 AM
    This message was deleted.
    r
    • 2
    • 2
  • и

    Иван Борисов

    06/29/2023, 11:02 AM
    Hello, can't understand how to do next: I've got few streams from few Kafka topics (if it possible to do other way and easier I could make one topic or any other modifications) with sensors measurements into JSON messages:
    Copy code
    topic1: {'data': {'temp':25.2, 'sensore_name': 'T1', 'timestamp': 123123131}, 'compare_with': 'T2'}
    topic2: {'data': {'temp':28, 'sensore_name': 'T2', 'timestamp': 53543543}, 'compare_with': 'T1'}
    topic3: {'data': {'temp':32, 'sensore_name': 'T3', 'timestamp': 6757575}, 'compare_with': 'T2'}
    topic4: {'data': {'temp':12, 'sensore_name': 'T3', 'timestamp': 67856222}, 'compare_with': 'T1'}
    I need to compare T1.data.temp - T2.data.temp (I need to compare it with EXACTLY last measurement of other sensor (Shown in compare_with), because measurements could come with different frequency: T1 1 message per sec, T2 1 message per 5 sec., T3 3 message per sec.) calculate AVG from this difference in 1 houre window, and if this difference more than AVG, then make Alarm to somewhere... Don't understand how to do it? Flink 1.17.1 Java 17
    k
    • 2
    • 10
  • i

    Iat Chong Chan

    06/29/2023, 11:24 AM
    Hi all, is there anyway to set watermark alignment for flink tables w/ python API?
  • c

    Chase Diem

    06/29/2023, 12:28 PM
    Hey all - we have a scenario where we were given a Kinesis stream and credentials to use in another account, but we would like to publish the metrics for those producers to our account. Looking at the KinesisConfigUtil.java for setting up the producer, it looks like the KinesisProducerConfiguration.java contains a configuration that allows you to set separate credentials for Cloudwatch metrics via
    setMetricsCredentialsProvider
    , but the KinesisConfigUtil doesn't utilize it. Is it possible to set them anywhere or would this require some sort of code change or PR?
  • h

    Hussain Abbas

    06/29/2023, 3:03 PM
    Hello, Iam getting this error when using flinkautoscaler. No busyTimeMsPerSecond metric available Source is kinesis.
    m
    • 2
    • 4
  • f

    Faisal A. Siddiqui

    06/29/2023, 5:25 PM
    Hi, I have flink-sql job that makes 21 left joins and 14 inner joins and currently facing some performance issue. Just wondering if join order between tables makes the difference in DAG that flinks generate?? Example:
    Copy code
    table1 inner join table2 ... inner join table10
    vs
    Copy code
    table10 inner join table9 inner join table8... inner join table1
    where volume of data increases from table1... table10 ?
  • l

    Lukasz Krawiec

    06/29/2023, 7:57 PM
    hi all! I am trying to understand why the watermark in my Flink job is falling very far behind the current processing time. (On average seems to oscillate between 2 to 6 minutes behind) • I'm performing stream to stream join via the use of KeyedCoProcessFunction & firing event time based timers • Both stream sources are kafka topics with plenty of data in them. ◦ In test environment 36 & 12 partitions ◦ In prod environment 480 & 360 partitions • Watermarking Strategy for both topics is configured like so
    forBoundedOutOfOrderness(Duration.ofSeconds(5))
    • There is no backpressure, and the kafka consumer lag for both topics is consistently close to 0 • Checkpoints are stored in s3 and are taken every 60s, checkpoint duration is < 1s • Running Flink 1.14 I'm looking for some pointers on how to debug the problem & in general what could explain this behavior
  • a

    Antonio Varela

    06/30/2023, 6:31 AM
    Hi all, I’m writing a Data Ingestion POC that processes a complex POJO. It contain Lists and Maps of other POJO’s, and I’ve stumbled upon the problem of serializing generic types. I’m having a hard time trying to understand how to add TypeHints, or implementing a TypeInfoFactory to annotate the class. I.e.
    Copy code
    @TypeInfo(MyPojoTypeInfoFactory.class)
    public class MyPojo {
      private List<OtherPojo> listOfOtherPojos;
      ...
    }
    can someone please point me to an example on how to accomplish this? I’m thinking to switch to Avro Serialization because of this, but I’m aware that Kryo must be avoided. Any help will be appreciated. Thank you in advance.
    l
    a
    • 3
    • 4
  • d

    Dheeraj Panangat

    06/30/2023, 11:18 AM
    Hi Team, I am setting RocksDBBackend path to a persistent storage (EFS). When new state is stored in storage, the folders created are in the format:
    job_<jobid>_op_KeyedProcessOperator_<operatorid>__<taskIndex>_<totalParallelism>__uuid_<randomuuid>/
    Here the folder name changes every time the job restarts. Also the folders seem to be deleted after restarts. How do I make sure the state is retained and when I restart the job, it is able to query the state. Appreciate any inputs on this. Thanks
    g
    s
    • 3
    • 7
  • o

    Oscar Perez

    06/30/2023, 12:46 PM
    hei, one question regarding KafkaSource and .setStartingOffsets method. What would happen if I use this ? .setStartingOffsets(OffsetsInitializer.earliest()). Does this mean that every new redeployment of the job will read from the beginning of the topic? or the offset will be saved in the state and will pick up next time that we redeploy the job from a savepoint?
    m
    • 2
    • 12
  • m

    Marquis C

    06/30/2023, 1:46 PM
    hey all 👋 has anybody connected the flink sql gateway (on "hive" mode) to an AWS Glue Catalog (to find tables and what not)? I was able to connect to the "rest" mode easily by just doing the normal flink sql configure catalog -> glue flags.
    m
    • 2
    • 4
  • s

    Sumit Nekar

    06/30/2023, 2:05 PM
    Hello , I am getting following exception related and job manager had restarted.
    This is usually caused by: 1) Akka failed sending the message silently, due to problems like oversized payload or serialization failures. In that case, you should find detailed error information in the logs. 2) The recipient needs more time for responding, due to problems like slow machines or network jitters. In that case, you can try to increase akka.ask.timeout.
    I tried increasing akka.framesize and akka.ask.timeout values. This resulted in following exception
    <http://org.apache.flink.runtime.io|org.apache.flink.runtime.io>.network.partition.PartitionNotFoundException: Partition c188f40cdaf8401fef36bf7e6122cd32#3@d7e4ba19d49c4286666eb145f9cc4295_6c80f82e38c6db0aca2cd65eb9ddef48_3_0 not found.
    How to recover the job in such cases? One way was to cancel the job and redeploy. This worked but for a stateful job this might not be helpful. Please suggest of there are any better options. Flink version : 1.17.1 Using flink k8s operator.
  • c

    Chris Tabakakis

    06/30/2023, 3:42 PM
    I'm trying to use flink on a distributed system made up of 1 master VM and 7 worker VMs. I think I've set it up correctly, and I could sweat it used to work, but since my jars weren't working, I went back and tried the examples again and get an error stating: "Could not connect to BlobServer at address localhost/127.0.0.1:37101", among others. I'm very much stuck and would appreciate any insight one might have, and I am willing to provide the whole error log if needed. Thanks in advance.
    d
    • 2
    • 4
  • c

    Caroline McKee

    06/30/2023, 6:09 PM
    Hi All! I am using Pyflink and have a datastream that I am converting to a Table so that I may use Flink SQL syntax on the table. My Flink SQL query includes a user-defined Table Function, meaning each input row results in multiple output rows, using a generator, as specified in the documentation. Here is my UDTF:
    Copy code
    class CalculateValueDiff(TableFunction):
        def eval(self, ids, grouped_values, is_t1s):
            t1_players = [id for id, is_t1 in zip(ids, is_t1s) if is_t1]
            t2_players = [id for id, is_t1 in zip(ids, is_t1s) if not is_t1]
    
            for t1_player in t1_players:
                for t2_player in t2_players:
                    value_diff = abs(grouped_values[ids.index(t1_player)] - grouped_values[ids.index(t2_player)])
                    yield Row(value_diff, t1_player, t2_player)
    Once I register this UDTF as
    calculate_value_diff
    (along with custom
    collect_list_<T>
    aggregators for different data types- those are working fine), I run this SQL query:
    Copy code
    res_table = t_env.sql_query("""
                                SELECT 
                                    message_time,
                                    calculate_value_diff(
                                        collect_list_int(id),
                                        collect_list_float(value),
                                        collect_list_bool(is_t1)
                                    ) as value_diffs
                                 FROM InputTable
                                 GROUP BY message_time
                                 """)
    
    res_stream = t_env.to_changelog_stream(res_table)
    The issue I'm having is the that output of the UDTF is technically a generator, not a row. So when I call
    <http://t_env.to|t_env.to>_changelog_stream(res_table)
    , I get the error:
    Copy code
    File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkLengthPrefixCoderBeamWrapper.encode_to_stream
      File "pyflink/fn_execution/coder_impl_fast.pyx", line 296, in pyflink.fn_execution.coder_impl_fast.ValueCoderImpl.encode_to_stream
      File "pyflink/fn_execution/coder_impl_fast.pyx", line 341, in pyflink.fn_execution.coder_impl_fast.FlattenRowCoderImpl.encode_to_stream
      File "pyflink/fn_execution/coder_impl_fast.pyx", line 391, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
    AttributeError: 'generator' object has no attribute 'get_fields_by_names'
    It seems that stream converter expects a row (works fine if I replace the
    yield
    with
    return
    in the UDTF. But then I'm obviously only getting the first output value). Not sure how to proceed from here... What really needs to happen is the
    pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
    function needs to be adjusted to iterate through the generator if the value is a generator (related: it seems like this exact same issue was fixed here in `commit 62f3c9c`: https://github.com/apache/flink/commit/62f3c9c9dbacf27e297dd2a64788a4d26efccf27 and appears in
    release-1.13
    , but then the fix was lost in subsequent versions). Is there an easy way around this or would it be possible to get a fix pushed?
  • k

    kiran kumar

    07/02/2023, 7:04 AM
    I am hosting my flink application using flink kubernetes opeartor. My application has 1 jobmanager and 2 task managers. Using s3 filesystem for checkpointing and savepointing. After deploying application it works fine for 30m-1h. After this it goes to restarting state with below error. Can anyone help me.
    Copy code
    INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for SlidingEventTimeWindows -> Process (2/2)#38618 (59bbe8ba8d5d7045ee7d2aacec66e7c8_98a01397958c5196eb88cd4117e1b3ad_1_38618).
    INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FAILED to JobManager for task SlidingEventTimeWindows -> Process (2/2)#38618 59bbe8ba8d5d7045ee7d2aacec66e7c8_98a01397958c5196eb88cd4117e1b3ad_1_38618.
    2023-07-02 06:31:07,254 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 9780cdcf42351473e6f636930c2c4d94.
    2023-07-02 06:31:07,256 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task Source: Kafka CFTransactions -> Map -> Filter -> Timestamps/Watermarks -> Map -> Process (2/2)#38619 (59bbe8ba8d5d7045ee7d2aacec66e7c8_b5aa0fa462c5d45eec0c56cc48b37e5e_1_38619), deploy into slot with allocation id 9780cdcf42351473e6f636930c2c4d94.
    2023-07-02 06:31:07,257 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 9780cdcf42351473e6f636930c2c4d94.
    2023-07-02 06:31:07,257 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Source: Kafka CFTransactions -> Map -> Filter -> Timestamps/Watermarks -> Map -> Process (2/2)#38619 (59bbe8ba8d5d7045ee7d2aacec66e7c8_b5aa0fa462c5d45eec0c56cc48b37e5e_1_38619) switched from CREATED to DEPLOYING.
    2023-07-02 06:31:07,257 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Loading JAR files for task Source: Kafka CFTransactions -> Map -> Filter -> Timestamps/Watermarks -> Map -> Process (2/2)#38619 (59bbe8ba8d5d7045ee7d2aacec66e7c8_b5aa0fa462c5d45eec0c56cc48b37e5e_1_38619) [DEPLOYING].
    2023-07-02 063107,257 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: Kafka CFTransactions -> Map -> Filter -> Timestamps/Watermarks -> Map -> Process (2/2)#38619 (59bbe8ba8d5d7045ee7d2aacec66e7c8_b5aa0fa462c5d45eec0c56cc48b37e5e_1_38619) switched from DEPLOYING to FAILED with failure cause: java.lang.IllegalStateException: The library registration references a different set of library BLOBs than previous registrations for this job: old:[p-954843a2fbeba5331142de719765e1b5e15d2a52-d769e111f8329e90b17813ba49062540] new:[p-954843a2fbeba5331142de719765e1b5e15d2a52-d8de14d66657f4955f18011970565412]
    Copy code
    at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.verifyClassLoader(BlobLibraryCacheManager.java:419)
     at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$ResolvedClassLoader.access$500(BlobLibraryCacheManager.java:359)
     at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.getOrResolveClassLoader(BlobLibraryCacheManager.java:235)
     at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$LibraryCacheEntry.access$1100(BlobLibraryCacheManager.java:202)
     at org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager$DefaultClassLoaderLease.getOrResolveClassLoader(BlobLibraryCacheManager.java:336)
     at org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:1024)
     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:612)
     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
     at java.base/java.lang.Thread.run(Unknown Source)
    r
    • 2
    • 2
  • d

    dp api

    07/02/2023, 10:49 AM
    I am using TableAPI in Python with the below environment settings.
    env_settings = <http://EnvironmentSettings.in|EnvironmentSettings.in>_streaming_mode()
    t_env = TableEnvironment.create(env_settings)
    t_env.get_config().set("pipeline.jars", "directory_path_to_jar_file")
    t_env.get_config().set("table.exec.source.idle-timeout", "1000")
    When I am writing a SQL SELECT query and Ordering by timestamp (which is converted using TO_TIMESTAMP_LTZ) I am getting an error
    "Sort on a non-time-attribute field is not supported"
    I searched online and found this - FLINK-18401 It suggested that I enable table.exec.non-temporal-sort.enabled=true which is false by default. However I cannot find this in the docs and do not know how to configure this. Any help would be greatly appreciated. 🙏
  • k

    Keyur Makwana

    07/02/2023, 1:38 PM
    Hi Team, I am configuring a Flink Job that can handle almost 1 million data per second, I have started with below configuration CPU: 4 cores Memory: 2GB Task Slots: 4 with only 30k logs per second But my job still goes too much busy and have a much backpressure, As far as I read that Flink can handle very large amount of data But here is some contradict, I might miss out some of the configuration, So can anybody help me to figure out it would be highly appreciate Thank you in advance 🙏
    n
    f
    +3
    • 6
    • 18
1...919293...98Latest