https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • c

    chunilal kukreja

    08/01/2022, 12:07 PM
    Hi Team, Is it possible to invoke serverless function through “sink”? Or let me say it this way can we have a sink i.e. serverless function (eg. OCI Function)?
  • p

    Pedro Cunha

    08/01/2022, 3:12 PM
    Hello guys. Is there a limit to how many fields a POJO can have? I have a POJO that is working fine, but as soon as I add a new String field (POJO is being stored as state), I start getting
    IndexOutOfBounds
    exceptions on the
    KyroSerializer
    and it looks like one of the field’s name is being leaked and being considered a class name. Again, this happens just by adding a new String field. Any ideas?
    c
    • 2
    • 121
  • j

    Jaya Ananthram

    08/01/2022, 3:48 PM
    Hello, I am having some trouble with the savepoint with Kinesis as a source and MSK as a sink. I am trying to upgrade an application through Flink K8's operator (helm update) and one of the task managers is failing with the following exception (🧵), Any idea?
    h
    • 2
    • 26
  • j

    Jaya Ananthram

    08/01/2022, 6:01 PM
    Hello, Is there any config in Flink k8's operator to specify the save point retry delay during deployment? I referred to this and had no luck. For example, when there is a failure in the savepoint operation, I want the operator to trigger the savepoint again after a few seconds with a delay (to deploy new version). Currently, it is triggering savepoint immediately after the failure, in my case, the task manager is not ready to take a snapshot, as a result, it fails completely for all the retries and the job is not updating with the latest artifacts. Any idea how to handle this situation?
    g
    d
    • 3
    • 56
  • s

    Sylvia Lin

    08/01/2022, 6:57 PM
    Hi forks, i'm testing over job deployment with hpa on k8s operators with basic hpa example here:https://github.com/apache/flink-kubernetes-operator/blob/main/examples/hpa/basic-hpa.yaml But I'm seeing below hpa errors:
    Copy code
    $ kubectl describe hpa -n dataeng-admin
    Name:                                                  basic-hpa
    Namespace:                                             dataeng-admin
    Labels:                                                <http://kustomize.toolkit.fluxcd.io/name=flux-system|kustomize.toolkit.fluxcd.io/name=flux-system>
                                                           <http://kustomize.toolkit.fluxcd.io/namespace=flux-system|kustomize.toolkit.fluxcd.io/namespace=flux-system>
    Annotations:                                           <none>
    CreationTimestamp:                                     Mon, 01 Aug 2022 11:02:50 -0700
    Reference:                                             FlinkDeployment/basic-example
    Metrics:                                               ( current / target )
      resource cpu on pods  (as a percentage of request):  <unknown> / 30%
    Min replicas:                                          1
    Max replicas:                                          8
    FlinkDeployment pods:                                  5 current / 0 desired
    Conditions:
      Type           Status  Reason             Message
      ----           ------  ------             -------
      AbleToScale    True    SucceededGetScale  the HPA controller was able to get the target's current scale
      ScalingActive  False   InvalidSelector    the HPA target's scale is missing a selector
    Events:
      Type     Reason                        Age                    From                       Message
      ----     ------                        ----                   ----                       -------
      Warning  FailedComputeMetricsReplicas  44m (x12 over 47m)     horizontal-pod-autoscaler  selector is required
      Warning  SelectorRequired              2m14s (x181 over 47m)  horizontal-pod-autoscaler  selector is required
    Did I miss anything here? The manifest is attached.
    hpa.yaml
    g
    v
    • 3
    • 34
  • i

    Isaac

    08/01/2022, 7:18 PM
    Hi Folks, I have a streaming job that reads from 32 ingress partitions of a Kafka source, does some transformations and enrichments, and finally loads it to 32 partitions of a Kafka sink. I have set parallelism in every operator inside my program to be 32. 1. Is it better to set parallelism at operator level or job level? 2. How do I make my transformation and enrichment operators to auto scale , but keeping the ingress and egress at 32 partitions at all times? 3. I was only able to run my job in a 48 Core VM in Azure Kubernetes cluster. I have set config to be 32 slots, default parallelism of 1. Why can't I run my job in a 32 Core VM ? My Azure Kubernetes pod scheduler says insufficient CPU. 4. I noticed that there is only one taskManager. Is it better to have multiple taskmanagers vs just 1 that runs on that 48 core VM?
  • h

    Hunter Medney

    08/01/2022, 7:39 PM
    For a DataStream job, I'm writing JUnit tests around a RichFlatMapFunction operator and am able to successfully send test inputs and verify expected outputs using Flink's test harness utility classes. I'm looking for a way to gain additional visibility at the harness level on what's happening inside the operator - to not only assert a correct set of output records for a given set of input records, but also the reason why. Was a record not emitted because of an invalid input or because a side effect / lookup wasn't successful? Custom metrics would seem like a good way to capture this - each relevant code path would have a counter associated with it, and the JUnit test could assert against the counter values in addition to input. Does this seems like a good approach to test the operator beyond just inputs and outputs? Another approach I suppose could be capturing log output from the operator, but that seems messier. Thank you!
  • s

    Stephan Weinwurm

    08/01/2022, 11:24 PM
    Hi all, we have encountered a strange and unexpected behaviour of Flink Statefun in production which has impacted us a lot. We are debugging it but we would need some help from the community. At this point I’m fairly certain it’s a bug in State Functions. We have opened in issue as well: https://issues.apache.org/jira/browse/FLINK-28747 This is the exception we’re seeing in our Http Statefun Service:
    Copy code
    Traceback (most recent call last):
      File "/src/.venv/lib/python3.9/site-packages/uvicorn/protocols/http/h11_impl.py", line 403, in run_asgi
        result = await app(self.scope, self.receive, self.send)
      File "/src/.venv/lib/python3.9/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
        return await <http://self.app|self.app>(scope, receive, send)
      File "/src/worker/baseplate_asgi/asgi/baseplate_asgi_middleware.py", line 37, in __call__
        await span_processor.execute()
      File "/src/worker/baseplate_asgi/asgi/asgi_http_span_processor.py", line 61, in execute
        raise e
      File "/src/worker/baseplate_asgi/asgi/asgi_http_span_processor.py", line 57, in execute
        await <http://self.app|self.app>(self.scope, self.receive, self.send)
      File "/src/.venv/lib/python3.9/site-packages/starlette/applications.py", line 124, in __call__
        await self.middleware_stack(scope, receive, send)
      File "/src/.venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 184, in __call__
        raise exc
      File "/src/.venv/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
        await <http://self.app|self.app>(scope, receive, _send)
      File "/src/.venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
        raise exc
      File "/src/.venv/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
        await <http://self.app|self.app>(scope, receive, sender)
      File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 680, in __call__
        await route.handle(scope, receive, send)
      File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 275, in handle
        await <http://self.app|self.app>(scope, receive, send)
      File "/src/.venv/lib/python3.9/site-packages/starlette/routing.py", line 65, in app
        response = await func(request)
      File "/src/worker/baseplate_statefun/server/asgi/make_statefun_handler.py", line 25, in statefun_handler
        result = await handler.handle_async(request_body)
      File "/src/.venv/lib/python3.9/site-packages/statefun/request_reply_v3.py", line 262, in handle_async
        msg = Message(target_typename=sdk_address.typename, target_id=sdk_address.id,
      File "/src/.venv/lib/python3.9/site-packages/statefun/messages.py", line 42, in __init__
        raise ValueError("target_id can not be missing")
    Any pointers would be greatly appreciated! Also a quick explanation what
    target_id
    is used for would be great - it sounds like it allows Flink to tie StateFun invocations to a specific instance of a function but I don’t fully understand what this is used in practice.
  • j

    Jaya Ananthram

    08/02/2022, 11:02 AM
    Hello, What is the best way to pass secrets to the Flink configuration if I use K8's operator? The usual way is to inject into the environment from K8's secret (something like this). But in some cases, we need to explicitly pass the secrets to the Flink configuration (eg: graphite token, IAM keys, etc). What is the best and most efficient way to handle this? I found some options but it looks hacky. Any idea?
    👀 2
    g
    c
    m
    • 4
    • 60
  • l

    laxmi narayan

    08/02/2022, 11:05 AM
    Kafka rebalancing of task-managers with timer service : I have a Flink kafka-streaming job with 4 partitions (p0,p1,p2,p3) and a timer-service for keys, Assume, I have 2 task managers (t0,t1) and I scaled it up (now t0,t1,t2) and kafka rebalancing occurred and let’s say p0 partition is assigned to the new task-manager (t2) container. • since the timer service for partition p0 was created in t0 container but now p0 has moved to t3 container, will t0 and t3 both containers will fire timer service ?
  • r

    Roman Bohdan

    08/02/2022, 12:17 PM
    Hello, could you please help me?
    Copy code
    Logback 2022-08-02 15:14:25 WARN  o.a.f.r.t.Task:1097 -   - Sink: pipeline.errors (2/4)#0 (2bd6ddc871fb0b6bf0f90a2c723c51e7) switched from DEPLOYING to FAILED with failure cause: java.lang.UnsupportedOperationException: The configuration is unmodifiable; its contents cannot be changed.
    	at org.apache.flink.configuration.UnmodifiableConfiguration.error(UnmodifiableConfiguration.java:73)
    	at org.apache.flink.configuration.UnmodifiableConfiguration.setValueInternal(UnmodifiableConfiguration.java:63)
    	at org.apache.flink.configuration.Configuration.set(Configuration.java:730)
    	at org.apache.flink.runtime.state.CheckpointStorageLoader.load(CheckpointStorageLoader.java:177)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.createCheckpointStorage(StreamTask.java:1505)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:389)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:359)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:332)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:324)
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.<init>(StreamTask.java:314)
    	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.<init>(OneInputStreamTask.java:75)
    	at jdk.internal.reflect.GeneratedConstructorAccessor6.newInstance(Unknown Source)
    	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
    	at org.apache.flink.runtime.taskmanager.Task.loadAndInstantiateInvokable(Task.java:1582)
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:740)
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    it shouild be related to:
    Copy code
    env.setDefaultSavepointDirectory(configuration.get(ConfigOptions
                    .key("state.savepoints.dir").stringType().noDefaultValue()));
    but then, how can i connect savepoints directory
    👀 1
    c
    • 2
    • 1
  • d

    Duc Anh Khu

    08/02/2022, 12:42 PM
    hi, for PyFlink in session mode (local development), I have a Flink cluster runs in docker. Does anyone know how to get
    logging
    or
    print
    to client stdout?
    Copy code
    import logging
    
    logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
    My current code as above doesn't seem to work. 🤔
    d
    • 2
    • 1
  • p

    Parthiban PR

    08/02/2022, 1:20 PM
    Hello! QQ about flink heap memory: Am trying to process a csv file of size around 3GB in flink locally. The csv file is first pushed to Kafka and then my flink application reads from Kafka. My flink heap memory is around 9GB. But still it’s failing with OOM error. Am not able to catch what went wrong. Is there something that am missing in terms of flink memory usage. Also, sharing my code in this thread. Any help is much appreciated! Thanks!
    • 1
    • 1
  • i

    Ildar Almakaev

    08/02/2022, 1:54 PM
    Hello, community. I’ve got an a SSL certificate issue when connecting from a Flink app in AWS Kinesis Data Analytics to a Schema Registry service deployed in k8s. The URL for Schema Registry was set up using AWS Route53 with
    HTTPS
    and that’s why the Schema Registry client v5.5.2 needs parameters for a truststore certificate. Since Kinesis Data Analytics is serverless and it deploys only
    .jar
    files, I did the following trick based on this AWS example. So they suggest storing a certificate inside a jar and then copying it in runtime to a desired location, e.g.
    /tmp/certificate.jks
    . FYI, it is done in Flink operators like FlinkKafkaConsumer and FlinkKafkaProducer classes. Overall, I’m doing copy the SSL certificate for Schema Registry client like in the example, but not in the context of Flink operators. I think that’s the reason why I’m getting FileNotFoundException here. I think the problem is that I don’t copy it to TaskManager nodes, isn’t it? Or is there any way to copy the same file to all taskmanager nodes so that task nodes could read them using
    FileInputStream(...)?
    Additional info:
    Copy code
    Flink version: 1.13.2
    Java version: 11
    Scala version: 2.12
    h
    • 2
    • 5
  • k

    Kevin L

    08/02/2022, 2:28 PM
    Hi there, I am trying to debug an issue with the flink sql_client not being able to read from a table that uses the
    'kafka'
    connector. I am trying to run the following query:
    Copy code
    // registry Kafka topic as table.  The kafka topic has already been created in my dev environment
    CREATE TABLE clicks (
        id STRING,
        action STRING,
        timestamp TIMESTAMP(3)
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'events',
        'properties.bootstrap.servers' = 'kafka1:29092',
        'scan.startup.mode' = 'latest-offset',
        'format' = 'json'
    );
    Then to test that this works correctly, I am trying to run to preview the kafka topic contents.
    Copy code
    select * from clicks;
    I am submitting both the create table and select statement from the interactive shell I access by starting a shell in the taskmanager and running the provided
    sql_client.sh
    script. The create table query submits without error, but when I submit the select statement, I get the following error:
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getPrimaryKeyIndexes()[I
    Any ideas on what the above error means and how to fix it? Thanks! Additional Information: I am using a custom flink docker image:
    Copy code
    FROM flink:latest
    RUN wget -P /opt/flink/lib <https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.15.1/flink-sql-connector-kafka-1.15.1.jar>
    h
    • 2
    • 1
  • d

    Dan Hill

    08/02/2022, 9:21 PM
    Hi. I'm seeing a weird issue around which offset is used after savepointing and recovering a job. We're using the deprecated FlinkKafkaConsumer on v1.14.4 (FlinkSource has a bug that stopped us from using it). I default the consumers to
    setStartFromEarliest
    . One of our jobs behaves correctly. During the initial run, it starts from the earliest. After savepoint and recovering, the uses the offsets in the checkpoints/savepoints. However, one of our jobs does not behave correctly. Whenever it restores, it always starts from earliest. We don't know why. I don't see warnings or errors in our logs. Thoughts? Is this a bug with the older FlinkKafkaConsumer?
    m
    j
    • 3
    • 6
  • p

    Pedro Lera

    08/03/2022, 2:26 PM
    Hello, im having an issue when trying to do a Temporal Join using a user defined function:
    Copy code
    val resultTableJoin = tableEnv.sqlQuery(
          """
            |SELECT pt.*, poi.poi_id, poi.association_value, poi.owner_id
            |FROM packetsTable pt
            |LEFT OUTER JOIN PoisTable FOR SYSTEM_TIME AS OF pt.`ts_rowtime` AS poi
            |ON <http://pt.pk|pt.pk> = <http://poi.pk|poi.pk> OR IsInZone(poi.association_value, pt.lat, pt.lng)
            |""".stripMargin)
    But i keep getting this exeption: Exception in thread "main" org.apache.flink.table.api.ValidationException: Currently the join key in Temporal Table Join can not be empty. The IsInZone is just a simple user defined function to check if a location is inside another location (with a predefined radius) I really appreciate any help, Thank you all in advance 🙂
  • d

    Don Li

    08/03/2022, 2:48 PM
    Hello, I'm reading a s3 path from a Kafka Producer message, the current object I'm dealing with would be a
    DataStream<String>
    (after grabbing the value through the nested JSON), and I want to loop through all the files in that s3 path, then extract some data from each file. How would I go about doing that? I was thinking about using
    readTextFile
    or a
    FileSource
    but that ends up being a
    DataStream<DataStream<String>>
    , which doesn't seem correct. How would I transform the initial
    DataStream<String>
    (or grab that s3 path) and have another
    DataStream<String>
    from either
    readTextFile
    or
    FileSource
    ?
  • a

    Ali Zia

    08/03/2022, 3:24 PM
    Hi. When converting from table to datastream in Pyflink, how can I pass watermark information? Without passing anything I'm getting an exception. Thanks!
    d
    • 2
    • 2
  • l

    Lee Wallen

    08/03/2022, 3:55 PM
    Hello! Does anyone know of a good example of how to set the HashMapStateBackend state value for a unit test?
  • d

    Dan Hill

    08/03/2022, 4:30 PM
    My recent jobs also started hitting the following Zookeeper related error. I'm running Flink 1.14.4. Zookeeper 3.7.0. Our Zookeeper instances seem well provisioned. They've been running for >50days. There's very little info in the logs.
    Copy code
    2022-08-03 09:15:31
    java.lang.Exception: Job leader for job id 6178032ec3a9318aa06adf0e30ee291f lost leadership.
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$null$2(TaskExecutor.java:2228)
    	at java.util.Optional.ifPresent(Optional.java:159)
    	at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$3(TaskExecutor.java:2226)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
    	at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
    	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    	at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    	at akka.actor.Actor.aroundReceive(Actor.scala:537)
    	at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    	at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    	at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    	at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
    I saw posts saying that TM->ZK connections have issues on AWS. They said timeouts can happen due to networking or TMs being unresponsiv (e.g. GCing). I bumped up these settings.
    Copy code
    high-availability.zookeeper.client.connection-timeout: 60000
        high-availability.zookeeper.client.session-timeout: 240000
    c
    • 2
    • 3
  • d

    Dan Hill

    08/03/2022, 4:30 PM
    When this issue is hit, my job is hosed. Any recommendations for how to resolve? E.g. editing ZK directly?
  • x

    Xi Cheng

    08/03/2022, 5:40 PM
    hey, might need some clarifications on the behaviour of Metric group Gauge:https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/#gauge this mentioned that
    In order to use a
    Gauge
    you must first create a class that implements the
    org.apache.flink.metrics.Gauge
    interface
    but it did not specify when the
    getValue
    method will get invoked. Based on the example, i assume that the supplied function (https://nightlies.apache.org/flink/flink-docs-release-1.9/api/java/org/apache/flink/metrics/Gauge.html#getValue--) is evaluated each time the
    RichMapFunction
    finishes the map value function?
    ✅ 1
    c
    • 2
    • 12
  • a

    Adrian Chang

    08/03/2022, 6:38 PM
    Hello I am using Flink with Python. Is it more efficient to run SQL queries than using the DataStream API in Python ? I am thinking in the data travel between Java thread and Python thread. Does that happen for Table/SQL API ?
    m
    d
    • 3
    • 4
  • r

    Rakesh V

    08/03/2022, 7:36 PM
    anybody else facing issues with
    mvn clean package -DskipTests
    ? I just checked out the master branch and getting the build error for flick-connector-hive component. I thought master branch was stable
    m
    • 2
    • 1
  • i

    Ivan M

    08/03/2022, 7:53 PM
    Hi all, I have a question about the window functions. I have a timestamp field in my table, it's a timestamp metadata from the kafka connector.
    ts TIMESTAMP(3) METADATA FROM 'timestamp'
    but when I try to use it in the over window like this:
    ROW_NUMBER() OVER (PARTITION BY id ORDER BY ts ASC) AS row_num
    I got
    OVER windows' ordering in stream mode must be defined on a time attribute.
    error. On the other hand, when I define a timestamp field as proctime:
    proctime AS PROCTIME()
    it works fine. Could you clarify what the difference between these two timestamps and what I can use in over windows? I thought the first one is also a time attribute.
    d
    • 2
    • 4
  • s

    Sucheth Shivakumar

    08/04/2022, 2:28 AM
    Hi All, Followed this post here - https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html to implement broadcast rules stream. I have 2 questions, 1. Suppose I have a rule in the state say rule 1, If I restart application will that rule still exists after application restart? Currently I do not see that rule, I need to resend the rule every time I restart the application. 2. When I issue EXPORT_RULES_CURRENT control command to export current rules to side topic, It is sinking same rule 16 times(same behaviour with parallelism 1 ) May I know what I'm doing wrong here ?
    d
    • 2
    • 5
  • d

    Darin Lee

    08/04/2022, 4:48 AM
    Hi ,what does task.cancellation.timeout mean ?any risk if set to zero
    l
    • 2
    • 1
  • r

    Raghunadh Nittala

    08/04/2022, 9:11 AM
    Hi All, I have a Flink operator setup in Kubernetes with 6 task managers. Also, the Kafka topics are created with 6 partitions. I can confirm that, when messages are being published to the Kafka topic, all 6 partitions have a fair amount of records distributed. Now, when I submit the Flink job which consumes from the Kafka topic, I always see 1/2 task managers take the processing load and the remaining 4/5 are idle. I have tested this with different messages but the behavior is the same. On restart of Flink operator, I can see a different task manager taking the load, but then other task managers are idle. Can someone help me how I can fix this behavior? - I have 1 task slot per task manager - I am using KeyBy in the code Best regards, Raghu
    e
    s
    d
    • 4
    • 8
  • i

    Ivan M

    08/04/2022, 10:18 AM
    Hi all! I have a complex query with several
    WITH
    clauses and a couple of joins. This query reads data from kafka connected tables and produces new events. Now I want to write these new events into new kafka topic. I created a new table. But when I try to make
    INSERT INTO
    from the query above I got error:
    Table sink <sink table> doesn't support consuming update changes which is produced by node Join(joinType=[InnerJoin], where=..., select=..., leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
    Could you help me to know what I'm doing wrong here?
    u
    h
    • 3
    • 4
1...101112...98Latest