https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • t

    Tiansu Yu

    11/16/2022, 10:21 AM
    Flink 1.13.2 finished job without reading from S3 file. I have a pipeline having
    Copy code
    stream = env.createInput(parquetInputFormat)
    as source, and simply print it out for testing. The pipeline stops finished early without reading a single bytes from the S3 file. There is no exception. Inside the taskmanager log, I see that Flink could not find hadoop classpath. Is this somehow related with Flink not reading the S3 file?
    r
    • 2
    • 11
  • m

    Maher Turifi

    11/16/2022, 12:42 PM
    Hi, a question regarding Flink application dashboard. I have a Flink application that will do some operations (such as transformation or lookup join), reading the stream from Kafka topic and then (after processing) it will sink the output into another Kafka topic. Why I can't access all the information on the flink dashboard, for example I can't see the number of records received or sent for each Task or Subtasks and I can't view BackPressure and many of the metrics is not shown. I'm using PyFlink Table API for my application, so is it the way I've implemented my code? and how can I access metrics? is there anything flink provide to visualise data metrics in verbose mode when the source and sink are not fileSystem (for example S3). Thanks
    s
    s
    d
    • 4
    • 4
  • a

    Adrian Chang

    11/16/2022, 2:38 PM
    Hi, I am generating custom metrics in a
    FlatMapFunction
    using Python.
    Copy code
    class OccupancyEventFlatMap(FlatMapFunction):
        def open(self, runtime_context: RuntimeContext):
            mg = runtime_context.get_metrics_group()
            self.counter_sum = mg.counter("my_counter_sum")
            self.counter_total = mg.counter("my_counter_total")
        def flat_map(self, value):
            self.counter_sum.inc(10)
            self.counter_total.inc()
    I am able to query the metric using the REST API http://localhost:43491/jobs/9a376e28a1bb022b45c127d75fb1b447/vertices/5239a5f0e3e9cdca6a88500e58b5759e/metrics?get=0.FlatMap.my_counter_sum
    Copy code
    [{"id":"0.FlatMap.my_counter_sum","value":"28201"}]
    But I don't see any of my custom metrics in Datadog metrics, however I see all the standard Flink metrics there. This is my configuration in Flink for Datadog exporter
    Copy code
    # Datadog
    metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
    metrics.reporter.dghttp.dataCenter: US
    metrics.reporter.dghttp.apikey: ${datadog_api_key}
    metrics.reporter.dghttp.tags: env:development
    
    # <https://docs.datadoghq.com/integrations/flink/#configuration>
    <http://metrics.scope.jm|metrics.scope.jm>: flink.jobmanager
    metrics.scope.jm.job: flink.jobmanager.job
    <http://metrics.scope.tm|metrics.scope.tm>: flink.taskmanager
    metrics.scope.tm.job: flink.taskmanager.job
    metrics.scope.task: flink.task
    metrics.scope.operator: flink.operator
    It's the first time I am tying to send custom metrics from Flink to Datadog. Am I doing something wrong ? Thanks
    ✅ 1
    c
    • 2
    • 7
  • r

    Rommel

    11/16/2022, 6:09 PM
    I am trying to use flink-k8s-operator, and this is what i put into flinkConfiguration in my yaml file.
    Copy code
    flinkConfiguration:
        taskmanager.numberOfTaskSlots: {{ .Values.numOfTaskSlots | quote }}
        jobmanager.rpc.port: "6123"
        blob.server.port: "6124"
        taskmanager.rpc.port: "6122"
        queryable-state.proxy.ports: "6125"
        kubernetes.cluster-id: flink-cluster-{{ .Release.Name }}
        high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
        high-availability.storageDir: {{ .Values.haStorageDir }}
        restart-strategy: fixed-delay
        restart-strategy.fixed-delay.attempts: "10"
        kubernetes.namespace: flink
        metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
        metrics.reporter.prom.port: "9250"
        state.savepoints.dir: {{ .Values.savepointDir }}-{{ .Release.Name }}
    when i try to deploy it, i ran into error
    admission webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>" denied the request: Forbidden Flink config key: kubernetes.namespace
    and also ``admission webhook "flinkoperator.flink.apache.org" denied the request: Forbidden Flink config key: kubernetes.cluster-id` I can remove the
    kubernetes.namespace
    , but i thought in order to use high availability i need to have cluster-id setup. can anyone help me on this?
    g
    • 2
    • 8
  • d

    Dimitris Kalouris

    11/16/2022, 6:55 PM
    Hey everyone! I am trying to convert between different formats in flink sql, currently from csv to avro and im running into this ClassCastException error. On the second picture there's also some samples from the source (.csv) table. ( Temporary tables are used because I use Table-Store)
    j
    • 2
    • 2
  • a

    Adrian Chang

    11/16/2022, 7:53 PM
    Hello, I have a question related to how metrics reporters work on Flink. I am using PyFlink and I want to measure how long my UDF (or part of it) takes. Lets say the UDF is executed 5 times per second, so I measure the time 5 times per second and set a
    gauge
    metric. Does the reporter ( Datadog reporter in my case ) reports all the samples measured or it reports only the current value of my
    gauge
    metric every X seconds ? Is this a good practice or is it better having 2
    counter
    metrics like
    time_sum
    and
    time_count
    and then in Datadog compute
    time_sum / time_count
    to have the average of how long my UDF takes ? Thanks
    c
    • 2
    • 2
  • r

    Raghunadh Nittala

    11/16/2022, 11:45 PM
    Hey everyone, I am trying to sink results to S3 using Tablestream API SQL queries, using
    INSERT INTO
    query. Now, the job is failing with
    java.lang.OutOfMemoryError: Java heap space
    exception, even though we increased the total memory and task manager memory. My question is, does the default parquet partitioning store the state in RocksDB (RocksDB state backend is enabled)?
  • j

    James McGuire

    11/17/2022, 12:44 AM
    👋 Hi everyone, I am trying to write a query in Flink SQL to deserialize messages encoded with protobuf but am consistently running into a
    org.codehaus.commons.compiler.CompileException: Line 23, Column 5: Cannot determine simple type name "com"
    error. Issue seems to be related to deserializing protobuf schema that use
    google.protobuf.Timestamp
    rows. More details in thread.
    m
    r
    • 3
    • 15
  • a

    Aishwarya Raimule

    11/17/2022, 7:50 AM
    Hi everyone, I am trying to publish custom metrics in flink. I have a kafka stream, I am using a filter operation to filter invalid events. I want to publish the number of invalid events using a counter.
    Copy code
    class ValidKafkaEventFilter(FilterFunction):
        def __init__(self):
            self.invalid_counter = None
    
        def open(self, runtime_context: RuntimeContext):
            self.invalid_counter = (
                runtime_context.get_metrics_group()
                .add_group("kinesisanalytics")
                .counter("InvalidKafkaEventCount")
            )
    
        def filter(self, row):
            try:
                is_valid = _has_valid_values(row)
                if not is_valid:
                    self.invalid_counter.inc(1)
                return is_valid
            except ValueError:
                return False
    Now the error I am getting on using filter is
    Copy code
    if not is_valid:
    self.invalid_counter.inc(1)
    AttributeError: 'NoneType' object has no attribute 'inc'
    This means that the invalid_counter hasn’t been initialised. Can someone help me on the reason for this? Am I missing something?
    m
    • 2
    • 6
  • t

    Tan Trinh

    11/17/2022, 8:51 AM
    Hi Team, I am using flink-operator in k8s environment. When I apply FlinkDeployment CRD, the flink cluster is working normally When I use HPA feature (https://github.com/apache/flink-kubernetes-operator/tree/main/examples/hpa), the cluster restart and show error:
    Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: There is no operator for the state 7d9b7588bc2ff89baed50d7a4558caa4
    at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
    Caused by: java.lang.IllegalStateException: There is no operator for the state 7d9b7588bc2ff89baed50d7a4558caa4
    at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:733) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:98) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1670) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1598) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:177) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94) ~[flink-dist-1.15.2.jar:1.15.2]
    at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ~[flink-dist-1.15.2.jar:1.15.2]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
    I think it is stuck in the restoring latest checkpoints stage, but the statement:
    There is no operator for the state 7d9b7588bc2ff89baed50d7a4558caa4
    always appears no matter what cluster flink I apply. Thanks in advance!
  • h

    Haim Ari

    11/17/2022, 8:54 AM
    Hello I’m facing this issue when deploying session cluster :
    Internal error occurred: failed calling webhook "<http://flinkoperator.flink.apache.org|flinkoperator.flink.apache.org>": Post "<https://flink-operator-webhook-service.flink.svc:443/validate?timeout=10s>": x509: certificate signed by unknown authority
    g
    • 2
    • 14
  • v

    Volodymyr Meshchanynets

    11/17/2022, 4:36 PM
    Hi, Is it possible to use Flink's state mechanism with AsyncFunction? We are trying to cache request results to avoid excessive load on other microservice. Currently we are relying on external proxy to do additional rate limiting and map function which has access to state and we want to remove the proxy
  • a

    Ari Huttunen

    11/17/2022, 4:48 PM
    Hi! Where can I find example code that uses the Table API and Filesource to read Parquet data from S3? I’m able to find bits and pieces, but I’ve not been able to put them together as I’m new to Flink.
    c
    • 2
    • 2
  • a

    Abel Lamjiri

    11/17/2022, 5:44 PM
    Hi Team, with RocksDB state backend, during recovery from node failure, • Is the state data copy from S3 / HDFS done lazily or all files need to be fetched to local disk before Flink can resume processing? • For large SSD volumes, we’re trying to understand the time it takes to recover from node failure: i.e. 750 GB of state data on 10 Gigabit network takes 11 minutes • So far, I found documents on ◦ lazy deserialization. ◦ lazy ttl data cleanup
  • m

    Max Gurewitz

    11/17/2022, 5:58 PM
    Hi all! I'm trying to evaluate whether Flink would be a good tool for my use case. For my use case, I need to maintain materialized views which are unbounded by a window e.g. for a given event type, how many of those events have been received since the beginning of our dataset, updated in real time. Flink's documentation suggests this might be problematic,
    For example, it is impossible to count all elements in a stream, because streams are in general infinite (unbounded). Instead, aggregates on streams (counts, sums, etc), are scoped by windows, such as “count over the last 5 minutes”, or “sum of the last 100 elements”.
    https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/time/
    Thank you!
    s
    c
    • 3
    • 6
  • p

    Padraic McAtee

    11/17/2022, 7:14 PM
    Hello all! I am working on a Flink job that would continuously read json from some s3 prefix. I have been able to do this using a file source with continuous monitoring. However, I’d like to test a couple additional use cases: 1. for ad-hoc reads, I would like to somehow filter out files that were created/modified prior to some timestamp. I would like to do this at start up time, effectively limiting the enumerated files to be read based on that create/modify timestamp (in order to avoid reading all files under the prefix). 2. For continuous processing, I would like to save/checkpoint what files have already been read. In the event of a job cancellation, I would like the job to pick up where it left off once it is submitted again. Conceptually, it seems that we would just need Flink to register what files it has already processed and filter those from each next read, but I can imagine that file splitting can complicate this. I’ve had difficulty finding documentation that pertains to these specific use cases. Could anyone point me in the right direction?
    m
    • 2
    • 1
  • s

    Slackbot

    11/17/2022, 11:40 PM
    This message was deleted.
    m
    • 2
    • 1
  • s

    Stephan Weinwurm

    11/18/2022, 12:44 AM
    Hey all, we’re using Flink Statefun and have observed that Flink restarts all tasks if a single tasks is failing, either because a call to our statefun endpoint has timed out or the Task Manager is suddenly terminated. Is there a reason Flink restarts all other Statefun Tasks if only one of them fails? The problem for us is that in such situations during the Flink restart it stops processing messages for a bit (up to a few minutes sometimes) and then comes back and sends a lot of requests towards the statefun endpoint which might take some time to scale up. We’ve verified that we are using
    region
    as the restart strategy but the logs reveal this:
    Copy code
    Nov 17 10:20:30 org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_11.
    Nov 17 10:20:30 org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 5650 tasks should be restarted to recover the failed task 31284d56d1e2112b0f20099ee448a6a9_11.
    Is there a way to have Flink restart only the one failed task instead of all of them or more gracefully handle these scenarios?
    c
    • 2
    • 2
  • n

    Nithin Kumar Vokkarla

    11/18/2022, 6:31 AM
    hi all, we are trying to deploy flink application using "flinkdeployment"(using apache flink kubernetes operator) using argoCD. we are getting this error in argo
    Copy code
    ClusterDeploymentException
    Could not create Kubernetes cluster "flink-caldel".
    Last time when i faced this issue, Task manager and job manager pods were created after long time (like hour) this is the helm template that i used.
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: {{ include "flinkapplication-calculatedelta.fullname" . }}
      namespace: {{ .Release.Namespace }}
      labels:
        environment: {{ .Values.environment }}
    spec:
      image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
      flinkVersion: v1_15
      {{- with .Values.flinkConfig }}
      flinkConfiguration:
        {{- toYaml . | nindent 4 }}
      {{- end }}
    #taskmanager.numberOfTaskSlots = 1 in values.yaml file under flinkconfig
      podTemplate:
        apiVersion: v1
        kind: Pod
        metadata:
          name: pod-template
          labels:
            environment: {{ .Values.environment }}
        spec:
          serviceAccount: {{ include "flinkapplication-calculatedelta.serviceAccountName" . }}
          nodeSelector:
            ............
          containers:
            # Do not change the main container name
            - name: flink-main-container
              volumeMounts:
                - mountPath: {{.Values.configFiles.mountPath}}
                  name: ......
          volumes:
            - name: ......
              configMap:
                name: {{ .Release.Name }}
                  
      serviceAccount: {{ include "flinkapplication-calculatedelta.serviceAccountName" . }}
      
      jobManager:
        resource:
          {{- toYaml .Values.jobManagerConfig.resources | nindent 6 }}
        replicas: {{ .Values.jobReplicas }}
      taskManager:
        resource:
          {{- toYaml .Values.taskManagerConfig.resources | nindent 6 }}
    
      job:
        jarURI: {{ .Values.jarName }} # Note, this jarURI is actually a placeholder
        parallelism: {{ .Values.parallelism }}
        entryClass: {{ .Values.entryClass }}
    can someone help me in this issue Thank you
  • e

    Emily Morgan

    11/18/2022, 7:41 AM
    Hello 👋 I have a question about state recovery when resuming from a savepoint. Do the operators retrieve their latest state from the savepoint in parallel or one after the other? For example if we have some operator with 2 parallel instances both resuming their KeyedState after a failure and instance 1 takes 10ms and instance 2 takes 10ms, is the total time to restart 20ms?
    c
    • 2
    • 4
  • m

    Makas Tzavellas

    11/18/2022, 7:53 AM
    Hi all, Does anyone happen to know if Pojo Types are supported in PyFlink when using a custom source? I am getting the following error but I'm not sure if there's a way to convert the Java Pojo to a Python class
    Copy code
    java.lang.UnsupportedOperationException: Could not find type serializer for current type [PojoType<my.org.demo.CustomResult, fields = [col: String, value: String]>].
    	at org.apache.flink.streaming.api.utils.PythonTypeUtils$TypeInfoToSerializerConverter.typeInfoSerializerConverter(PythonTypeUtils.java:831) ~[?:?]
    	at org.apache.flink.streaming.api.utils.PythonTypeUtils$TypeInfoToSerializerConverter.lambda$typeInfoSerializerConverter$0(PythonTypeUtils.java:747) ~[?:?]
    	at java.util.stream.ReferencePipeline$3$1.accept(Unknown Source) ~[?:?]
    	at java.util.Spliterators$ArraySpliterator.forEachRemaining(Unknown Source) ~[?:?]
    	at java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
    	at java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) ~[?:?]
    	at java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
    	at java.util.stream.AbstractPipeline.evaluateToArrayNode(Unknown Source) ~[?:?]
    	at java.util.stream.ReferencePipeline.toArray(Unknown Source) ~[?:?]
    	at org.apache.flink.streaming.api.utils.PythonTypeUtils$TypeInfoToSerializerConverter.typeInfoSerializerConverter(PythonTypeUtils.java:748) ~[?:?]
    	at org.apache.flink.streaming.api.operators.python.process.AbstractExternalOneInputPythonFunctionOperator.open(AbstractExternalOneInputPythonFunctionOperator.java:108) ~[?:?]
    	at org.apache.flink.streaming.api.operators.python.process.ExternalPythonProcessOperator.open(ExternalPythonProcessOperator.java:64) ~[?:?]
    	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:726) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:702) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:669) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:904) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.0.jar:1.16.0]
    	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.0.jar:1.16.0]
    	at java.lang.Thread.run(Unknown Source) ~[?:?]
    Appreciate any advice/pointers to resolving this issue.
    l
    • 2
    • 1
  • d

    Dan Hill

    11/18/2022, 7:53 AM
    Hi. I have a small job that uses Flink SQL to output csv files (a customer wants csv files). I expected the csv files to have an option to write a header out but the connector doesn't seem to support it. How have other people supported this? Update - I created a separate custom Table format. I copied the csv format but switched to use a BulkWriter.
  • c

    chunilal kukreja

    11/18/2022, 11:46 AM
    Hi Team, In case async i/o operator is waiting for the future to return (basically let the embedded function complete its processing), till then checkpointing gets halt at this operator. And only proceeds once it gets finished. attaching the screenshot. Basically I would like to test the scenario where async io is waiting for the response & job is restarted. So ideally the last checkpoint will get restored & technically the async i/o processing should get restarted again with the same set of data. But as checkpoint is stuck, & if i restart the job, than previous checkpoint is restored which doesn’t have the correct set of data. Please guide how should I tackle this scenario.
    🆘 1
  • c

    chunilal kukreja

    11/18/2022, 11:51 AM
    and the same i noticed this with savepointing as well.. If i try to stop the job & default savepoint will be captured (i hv enabled it in configuration), timeout exception occurs. <<this is with flink 1.15.2>>
  • c

    clen.moras

    11/18/2022, 12:25 PM
    hello team, Current setup : 1.2.0 operator version, 12 global parallelism +. 4 task slots. (3 TM), 2 JM in ha. native mode The registration time for Task managers with resoruce manager is a few seconds. when i convert this same setup to standalone with the. exact same setup. I face some odd issues. 1. TaskManager is not able to identify the resource-manager, but this is only for one of the taskmanagers but the other 2 taskmanagers, seem to connect fine with the exact same setup. I see error s like this
    Copy code
    Could not resolve ResourceManager address <akka.tcp://flink@service-name.ns:6123/user/rpc/resourcemanager_0>,
    here the service name is headless service created when we set the cluster in standalone mode. Any pointers as to what i m doing wrong, would be greatly helpful. Since reproducing it is not always the same effect. It could vary between the number of TM's or sometimes all fail.
  • f

    Felix Angell

    11/18/2022, 4:10 PM
    Hey there, If I have a sink that is constantly busy (90% ish cpu utlisation) and causing backpressure are there common things to look out for to optimise? it seems to spend a lot of time (looking at the flame graphs) in initiateMultiPartUpload. Is the best bet to increase the batch size for the data being sinked?
    j
    k
    • 3
    • 17
  • k

    Kyle Ahn

    11/18/2022, 8:29 PM
    Has anyone experienced this error before?
    Copy code
    Failed to commit after recovery s3://.../file.parquet with MPU ID j_nxH7JJKegFvFiOEsNcNjKuSuZ2UhnEO4.rkTCXBu.DJ9qx4cgp8Ux87gDnY_vHjEOMpVgApBOzqfqTp8uI20uLITzIabGrHWrWQDyTdZ2EKqzQoFf0ichfi5W7TUgzLed8QCqLqY0NCDJDmEYraCz41mN6wjaQv01WWgxl11Q-. Checking if file was committed before...
    This occurred while trying to update
    FlinkDeployment
    in HA mode using
    last-state
    upgradeMode. It seems that validating the last successful checkpointing commit failed, and the job does not start but gets stuck in there. Is there a way to skip the failed checkpointing validation after
    n
    times ?
    g
    m
    • 3
    • 37
  • t

    Tawfik Yasser

    11/18/2022, 9:30 PM
    Can I ask, what
    WatermarkGeneratorSupplier
    actually do? cause I didn't understand the attached comment in flink
    Copy code
    /**
     * A supplier for {@link WatermarkGenerator WatermarkGenerators}. The supplier pattern is used to
     * avoid having to make {@link WatermarkGenerator} {@link Serializable} for use in API methods.
     *
     * <p>This interface is {@link Serializable} because the supplier may be shipped to workers during
     * distributed execution.
     */
    d
    • 2
    • 2
  • r

    Rommel

    11/18/2022, 11:46 PM
    Hi, I use flink-k8s-operator to make deployment and i would like to set my job manager pod’s name and task manager pod’s name, i setup the name in podtemplate -> metadata -> name, but it doesn’t work. my current pods show as this
    Copy code
    xxx-flink-78954c766b-hprhj   1/1     Running   0          2m12s
    xxx-flink-taskmanager-1-1    1/1     Running   0          93s
    xxx-flink-taskmanager-1-2    1/1     Running   0          93s
    xxx-flink-taskmanager-1-3    1/1     Running   0          93s
    xxx-flink-taskmanager-1-4    1/1     Running   0          93s
    g
    • 2
    • 5
  • c

    chunilal kukreja

    11/19/2022, 9:13 AM
    Hi Team, Is this expected that when Async I/O operator is awaiting a response from an embedded function, checkpointing on this operator will be in wait state? Its happening for me, and due to this i am unable to get the checkpoint done. Same thing happens for savepointing. as well. This is kinda a blocker for me, any leads will help me to get through this. Thanks in advance!
    d
    • 2
    • 2
1...333435...98Latest