https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • s

    Sumit Khaitan

    05/02/2023, 6:14 AM
    Hi , Seeing this issue in our Flink Pipeline as well. https://issues.apache.org/jira/browse/FLINK-23886. Using Flink version: 1.13.6. I see that the status of the issue is still open. Any solution for this ?
  • f

    Felix Angell

    05/02/2023, 9:24 AM
    Heyo, I've enabled watermark alignment on my flink apps (1.15.2) running on KDA. We noticed that it seems to pause all processing pretty much straight away and that the watermarkAlignmentDrift metric is super low. Can anyone help me understand what is going on with this?
    u
    • 2
    • 3
  • g

    Gianluca Cacace

    05/02/2023, 9:30 AM
    Hello, I'm flatmapping a record into multiple "jobs" which I want to process then in parallel. Then once all the processing has completed I'd like to wait for all the results for those "jobs" and generate a single result record at the end. I'm basically trying to implement a simple scatter-gather. Given that the number of jobs depends on the original record, I was thinking about passing that total number of jobs as part of the payload of each "job", such that a keyed window operator could count in a state how many completed "jobs" were seen so far and if the count was equals to the expected total I could consider the window "complete". Which keyed window operator would you recommend to use, assuming I want to consider the window complete either when the total number of "jobs" is reached or a timeout of 10 seconds (whichever comes first)?
  • c

    Christophe Bornet

    05/02/2023, 11:16 AM
    Hi, I would need to put the results of dynamic SELECT queries into a single JSON. Eg. if I have SELECT a, b, c FROM mytable, I want it to transform it to a single payload {"a": xxx, "b": xxx, "c": xxx} As I don't know the query in advance, I can't create a Table to insert into. I'm using the Table API (tableEnv.executeSql(selectQuery)). Should I use the internal API as is done in the SQL gateway ?
  • a

    Andrew Otto

    05/02/2023, 3:13 PM
    Can anyone point me to some docs on how (py)flink manages memory in a local (mini?) cluster? I’m troubleshooting some OOM issues and I can see memory increasing, but I don’t get an OOM, even though the RSS is going way above
    taskmanager.memory.process.size
    . I guess in minicluster mode those options don’t apply, because TM and JM are in same process? Do I need to set e.g. -Xmx manually?
    • 1
    • 1
  • о

    Олег Спица

    05/02/2023, 3:38 PM
    Does anyone know how to use (or what to use instead of)
    OneInputStreamOperatorTestHarness
    with new
    FileSink
    instead of deprecated
    StreamingFileSink
    ?
    StreamingFileSink
    implements
    SinkFunction
    and can be used as parameter for
    OneInputStreamOperator
    , but new
    FileSink
    isn’t suitable here.
    o
    • 2
    • 1
  • f

    Felix Angell

    05/02/2023, 4:02 PM
    Heyo! The watermark alignment feature seems to pause all of our source operators after some point is reached. any idea how i can debug this one? we have two sources consuming from Kafka and after the first 3 mins or so of running they just pause and everything goes idle. Normally we would be processing a few million messages a minute but this seems to break at around 500k messages each time.
    • 1
    • 1
  • n

    Nathanael England

    05/02/2023, 4:49 PM
    Does this error mean anything to anyone? This is on flink 1.17.0 after running a message through a custom process function
    Copy code
    2023-05-02 16:31:49,152 ERROR /usr/local/lib/python3.8/dist-packages/pyflink/fn_execution/beam/beam_sdk_worker_main.py:35 [] - Traceback (most recent call last):
      File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute
        response = task()
      File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda>
        lambda: self.create_worker().do_instruction(request), request)
      File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
        return getattr(self, request_type)(
      File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 634, in process_bundle
        bundle_processor.process_bundle(instruction_id))
      File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 1003, in process_bundle
        input_op_by_transform_id[element.transform_id].process_encoded(
      File "/usr/local/lib/python3.8/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 227, in process_encoded
        self.output(decoded_value)
      File "apache_beam/runners/worker/operations.py", line 526, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 528, in apache_beam.runners.worker.operations.Operation.output
      File "apache_beam/runners/worker/operations.py", line 237, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
      File "apache_beam/runners/worker/operations.py", line 240, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
      File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
      File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 194, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
      File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 104, in pyflink.fn_execution.beam.beam_operations_fast.IntermediateOutputProcessor.process_outputs
      File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 169, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
      File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 194, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
      File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 92, in pyflink.fn_execution.beam.beam_operations_fast.NetworkOutputProcessor.process_outputs
      File "pyflink/fn_execution/beam/beam_coder_impl_fast.pyx", line 101, in pyflink.fn_execution.beam.beam_coder_impl_fast.FlinkLengthPrefixCoderBeamWrapper.encode_to_stream
      File "pyflink/fn_execution/coder_impl_fast.pyx", line 273, in pyflink.fn_execution.coder_impl_fast.IterableCoderImpl.encode_to_stream
      File "pyflink/fn_execution/coder_impl_fast.pyx", line 401, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
      File "pyflink/fn_execution/coder_impl_fast.pyx", line 391, in pyflink.fn_execution.coder_impl_fast.RowCoderImpl.encode_to_stream
    AttributeError: 'dict' object has no attribute 'get_fields_by_names'
    a
    • 2
    • 8
  • z

    Zhiyu Tian

    05/03/2023, 3:46 AM
    Hi team, I am trying to read a parquet file on HDFS. But I got the following error
    *org.apache.parquet.format.PageHeader: Required field 'uncompressed_page_size' was not found in serialized data*
    . ## Run environment: Hadoop 2.9.1 Flink Operator: 1.16.1 Flink version: 1.16 ## Investigation I searched the error https://github.com/trinodb/trino/issues/2256, but it did not help. ## Full stack: 51120139836 at 10485760; previously tried [DatanodeInfoWithStorage[/BN2/10/308/09/[260310b0515206a00645be620]10010,DS-07f646f6-dcbf-4b98-8580-1adc1911da0b,DISK]]. 2023-05-03 032615,221 INFO org.apache.hadoop.hdfs.DFSClient [] - Spawning 1 hedged read to DatanodeInfoWithStorage[/BN2/10/307/08/[260310b0515205900645bedcd]10010,DS-59d240da-da8d-47a0-a61b-cb12af056f81,DISK] for BP-1520273214-10.27.194.109-1568248216274:blk_52162250811_51120139836 at 61865984; previously tried [DatanodeInfoWithStorage[/BN2/10/308/09/[260310b0515206a00645be620]10010,DS-07f646f6-dcbf-4b98-8580-1adc1911da0b,DISK], DatanodeInfoWithStorage[/BN2/10/306/14/[260310b0515204f00645beb6e]10010,DS-1c533525-e37e-4671-88e5-ca79ad3f28a3,DISK]]. 2023-05-03 032615,223 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception. java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-connector-files-1.16.0.jar:1.16.0] at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?] at java.util.concurrent.FutureTask.run(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?] at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?] at java.lang.Thread.run(Unknown Source) [?:?] Caused by: java.io.IOException: can not read class org.apache.parquet.format.PageHeader: Required field 'uncompressed_page_size' was not found in serialized data! Struct: org.apache.parquet.format.PageHeader$PageHeaderStandardScheme@112523e7 at org.apache.parquet.format.Util.read(Util.java:365) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.format.Util.readPageHeader(Util.java:132) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readNextRowGroup(ParquetVectorizedInputFormat.java:396) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:378) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:355) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-files-1.16.0.jar:1.16.0] ... 6 more Caused by: shaded.parquet.org.apache.thrift.protocol.TProtocolException: Required field 'uncompressed_page_size' was not found in serialized data! Struct: org.apache.parquet.format.PageHeader$PageHeaderStandardScheme@112523e7 at org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1108) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.format.PageHeader$PageHeaderStandardScheme.read(PageHeader.java:1019) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.format.PageHeader.read(PageHeader.java:896) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.format.Util.read(Util.java:362) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.format.Util.readPageHeader(Util.java:132) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readPageHeader(ParquetFileReader.java:1382) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1429) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader$Chunk.readAllPages(ParquetFileReader.java:1402) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader.readChunkPages(ParquetFileReader.java:1023) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:928) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readNextRowGroup(ParquetVectorizedInputFormat.java:396) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.nextBatch(ParquetVectorizedInputFormat.java:378) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.flink.formats.parquet.ParquetVectorizedInputFormat$ParquetReader.readBatch(ParquetVectorizedInputFormat.java:355) ~[blob_p-876fb9c1decabe8fc7463988e0d4e13b165e8c61-1aceb131dcac9755d058c2b83d5cfd1a:?] at org.apache.flink.connector.file.src.impl.FileSourceSplitReader.fetch(FileSourceSplitReader.java:67) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.16.0.jar:1.16.0] at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-files-1.16.0.jar:1.16.0] ... 6 more 2023-05-03 032615,225 DEBUG org.apache.flink.streaming.runtime.tasks.StreamTask [] - Cleanup StreamTask (operators closed: false, cancelled: false)
    • 1
    • 1
  • s

    Sumit Khaitan

    05/03/2023, 4:20 AM
    Hi , Seeing this issue in our Flink Pipeline as well. https://issues.apache.org/jira/browse/FLINK-23886. Using Flink version: 1.13.6. I see that the status of the issue is still open. Any solution for this ?
  • b

    Bhupendra Yadav

    05/03/2023, 5:41 AM
    Hi everyone. We are using the official Flink K8s operator and have a session cluster with 2 Job Managers for HA. When we submit a FlinkSessionJob with jarURI pointing to remote location, we intermittently see an error in flink session job status:
    Copy code
    org.apache.flink.runtime.rest.handler.RestHandlerException: Jar file /<jar-dir>/ce239a09-463f-4667-8f81-9cd6a76f5454_my-jar.jar
        does not exist\n\ta ...
    Our understanding is this is happening because the jar is uploaded to one of JM's local FileSystem by the flink operator(our web.upload.dir is a k8s emptyDir atm) and if the job submit request goes to other JM, it fails as jar doesn't exist. One way to fix this is to use a Persistent Volume mounted to both JM's jar directory(web.upload.dir). I have a few questions: 1. we are wondering if is there any easier way to fix it? 2. Does the operator downloads the jar and upload it to JM or does the operator instruct the JM to pass jarURI to it downloading the jar?
    o
    • 2
    • 2
  • a

    Amenreet Singh Sodhi

    05/03/2023, 6:46 AM
    Hey team, I am using Flink-1.16.0, and have been receiving the following error very frequently:
    Copy code
    org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
    Caused by: org.apache.kafka.common.errors.CoordinatorNotAvailableException: The coordinator is not available.
    The same issue is being talked about in ticket: https://issues.apache.org/jira/browse/FLINK-27962?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel but its an old ticket. Can anyone share the latest ticket for this issue, or if it has been resolved for 1.16.0 version, please share the details. Thanks.
    t
    • 2
    • 5
  • r

    Rashmin Patel

    05/03/2023, 9:03 AM
    Hii all I have a query related to
    state-processor-api
    which is ported to DataStream API since 1.16.0 My bootstrap job program was written in DataSet API which used to get complete in 30 mins but after migrating that to DataStream API and running it in BATCH execution mode, I am seeing a significant performance degradation. Now the same program takes ~4x time. I am providing same resources to both the programs. Has anyone faced such issue ?
    • 1
    • 2
  • p

    Pedro Mázala

    05/03/2023, 12:15 PM
    Hey there folks 👋 I was using Flink with my custom helm templating and I decided to migrate to the Flink K8s Operator. I’m building my own docker image and publishing it. I override the image following this example. I’m using the mode standalone mode (
    mode: standalone
    ) to be able to use Flink in the reactive mode (
    scheduler-mode: reactive
    ). My task manager pod has my modifications (
    /opt/flink/usrlib/
    jars) but my job manager pod seems to have the same content as the default flink image. Do you know what could lead to this? My deployment is
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: {{ template "flink-application-mode.name" . }}
    spec:
      image: "{{ .Values.imageRepository }}/{{ .Values.imageName }}:{{ .Values.imageTag }}"
      imagePullPolicy: IfNotPresent
      serviceAccount: {{ .Values.serviceAccountName }}
      flinkVersion: v1_16
      podTemplate:
        spec:
          containers:
            - name: flink-main-container
              env:
                - name: APPLICATION_CREDENTIALS
                  value: /secrets/credentials.json
              volumeMounts:
                ...
          volumes:
            ...
      flinkConfiguration:
        scheduler-mode: reactive
      jobManager:
        replicas: 1
        resource:
          {{- toYaml .Values.resources.jobmanager.requests | nindent 6 }}
      taskManager:
        replicas: {{ .Values.replicaCount }}
        resource:
          {{- toYaml .Values.resources.taskmanager.requests | nindent 6 }}
      mode: standalone
      job:
        jarURI: local:///opt/flink/usrlib/custom.jar
        entryClass: {{ required "A valid .Values.jobmanager.entryClass entry required!" .Values.jobmanager.entryClass }}
        parallelism: 2
        upgradeMode: savepoint # last-state OR stateless
    g
    s
    • 3
    • 24
  • f

    Felix Angell

    05/03/2023, 4:54 PM
    recently upgraded a pyflink application to 1.15 and its in a crash loop with 'could not acquire the minimum resources' exceptions being thrown. will attach exception logs in the thread (some are related to WriteBatch mismatch - i figure this is from restoring the checkpoint?). nothing has changed in terms of configuration + we run with a 0.5 memory fraction setting for task managers. any ideas why my app is suddenly crashing all of a sudden from this upgrade?
    • 1
    • 4
  • a

    Adam Augusta

    05/03/2023, 8:48 PM
    Before I bend over backwards doing something stupid, can anyone tell me why the Flink web extension header and parameter types that seem otherwise designed to work with the RestClusterClient are only usable with package private access?
  • m

    Matthew Kerian

    05/04/2023, 5:37 AM
    Hi team qq. I have a stream that we want to reduce over a sliding window by event time. The problem is we want every single time window to be accounted for, even if there's no events. Currently the plan is to use a keyed process function. In the process function we • Maintain a list of "windows". Basically a list of timestamped lists (where each list belongs to a time window) • Any event that comes in gets added to all the windows they belong to • If the event has a new key, then our context has another timer linked to this key • Realistically we'd want every single key to always be there. However we may want to delete keys ◦ Either add in some manual intervention through perhaps a broadcast ◦ Or we just have a TTL for each key state (likely what I'd do) • Can also allow late arrivals under this model I believe this will work for our usecase, it shouldn't be overly daunting to set up. But is there a more ergonomic way to do this? I've looked at how I can override some of the existing Window functionality to pull this off but I don't see a way. It feels like it should be very simple. I want a sliding window where the window creation is decoupled from element input. But there doesn't seem to be a cleaner way to implement this than above^. It'd be nice to use existing window features
  • o

    Otto Remse

    05/04/2023, 8:27 AM
    Hi! I'm trying to export metrics to prometheus but can't seem to get the correct value for our numRecordOut for our sources. In the picture I can see that there's records produced, but when querying
    flink_taskmanager_job_task_operator_numRecordsOut{task_name=~"Source:.*"}
    or
    flink_taskmanager_job_task_numRecordsOut{task_name=~"Source:.*"}
    they're always zero. What metric name is the flink dashboard using for the value that is visible in the picture?
  • s

    Slackbot

    05/04/2023, 2:04 PM
    This message was deleted.
    d
    • 2
    • 2
  • d

    Dylan Fontana

    05/04/2023, 2:51 PM
    Hi folks 👋 When it comes to Partition discovery in Flink, the KafkaSourceEnumerator is able to find new partitions and assign them to task managers. But is there a way to have Flink rebalance partition assignment if we later increase the parallelism? For example if I have 6 kafka partitions assigned to 5 task managers, one of the TMs will have 2 partitions. If I redeploy my app from a snapshot with an increased parallelism of 6, the partitions are not rebalanced - so one of the TMs has no partitions assigned and halts watermarks. While I could set the idleness to prevent watermark halting, that's not really the problem i'm chasing. I instead want to have the partitions rebalanced, so each TM is getting 1 partition. Is there a way to achieve this? Is it not considered a safe operation for processing guarantees?
    ❓ 1
  • s

    shubham gupta

    05/04/2023, 4:18 PM
    Hi All, We are running an Apache Flink application on production where we are serializing Json messages to Avro. Source (Json messages) : Kafka topic(64 partitions) Sink (Avro messages) : Kafka topic(64 partitions) Flink version: 1.7.1 We are running the job on 84 parallelism. The issue is that we are observing High backpressure on the Kafka Source vertex because of which Kafka lag is increasing. Initially, we suspected Kafka Sink for the backpressure on the source. So we tried with the Null Discarding Sink but still the backpressure persisted. So now we suspect Avro serialization since we are not doing any other processing in the job apart from some filter and map operators. Can Avro serialization cause backpressure? Or Are we missing something here? Need help from the community on this. If Avro serialization is the problem then How to mitigate it?
  • v

    Viswa Teja Kuncham

    05/04/2023, 6:08 PM
    Hi, We are using kubernetes operator for deployment, and we want to restore from the savepoint on new deployment(recreates the deployment). we are using custom sources and snapshoting the state, the checkpoints and savepoints are getting created in s3, but for restore we tried by using upgradeMode as "savepoint" but its not working. Using flink1.16.0.
    g
    • 2
    • 5
  • m

    Maroš Mamrák

    05/04/2023, 6:09 PM
    Hello everyone, I am encountering an issue while using my JsonDeserializationSchema. I am attempting to utilize a generic approach for correlating data from two streams, but I'm having trouble understanding why KafkaSource always requires a String type for setValueOnlyDeserializer(new JsonDeserializationSchema<T>(messageType)). My IDE suggests casting to String, but in my case, I am working with data from two POJO classes, UserA and UserB, and I need to send objects of these classes as JSON strings to the Kafka topics. My code snippet : public static <T>KafkaSource<T> create_source(String server,String sourceTopic, Class<T> messageType){ KafkaSource<T> source = (KafkaSource<T>) KafkaSource.<String>_builder_() .setBootstrapServers(server) .setTopics(sourceTopic) .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer._earliest_()) .setValueOnlyDeserializer( new JsonDeserializationSchema<T>(messageType)) .build(); return source; } private static class JsonDeserializationSchema<T> extends AbstractDeserializationSchema<T> { private final ObjectMapper objectMapper = new ObjectMapper(); private final Class<T> targetType; public JsonDeserializationSchema(Class<T> targetType) { this.targetType = targetType; } @Override public T deserialize(byte[] message) throws IOException { return objectMapper.readValue(message, targetType); } @Override public TypeInformation<T> getProducedType() { return TypeInformation._of_(targetType); } } But still maybe is so complicated and using Avro and schema registry is the right approach for serialization...
  • d

    David Christle

    05/05/2023, 12:11 AM
    Hi team, I’m wondering if I found a bug in Window TVF not correctly propagating
    rowtime
    in the
    toDataStream
    conversion, at least in batch execution mode. The core logic of what I’m doing is in the attached snippet. At a high level, the code extracts a field from a POJO in a DataStream, converts to the Table API, performs a Window TVF, and converts back to a DataStream<Tuple<>>. Then, that DataStream is converted back into a Table, where another Window TVF is applied, and the output is converted back to a different Tuple DataStream. I realize there’s no need to convert the first table back to DataStream, but the point is the
    toDataStream
    conversion isn’t, apparently, propagating the
    rowtime
    on the first Table. When this logic runs in batch execution mode, the error is:
    org.apache.flink.util.FlinkRuntimeException: Could not find timestamp in DataStream API record. Make sure that timestamps have been assigned before and the event-time characteristic is enabled.
    . From inserting some .shuffle calls to split the operators and narrow down the error source, it looks like the first phase of the 2nd window TVF is where it happens, i.e. this stage fails:
    Copy code
    [40]:TableSourceScan(table=[[*anonymous_datastream_source$6*]], fields=[f0, f1, rowtime])
    +- [41]:Calc(select=[f0, f1, CAST(rowtime AS TIMESTAMP_LTZ(3) *ROWTIME*) AS rowtime])
       +- [42]:WindowTableFunction(window=[HOP(time_col=[rowtime], size=[86400000 ms], slide=[1 h])])
          +- [43]:Calc(select=[f0, window_start, window_end, window_time AS rowtime, f1])
             +- [44]:LocalHashAggregate(groupBy=[f0, window_start, window_end, rowtime], select=[f0, window_start, window_end, rowtime, Partial_SUM(f1) AS sum$0])
    The earlier stage looks like this:
    Copy code
    [37]:HashAggregate(isMerge=[true], groupBy=[f0, window_start, window_end, rowtime], select=[f0, window_start, window_end, rowtime, Final_COUNT(count1$0) AS f3])
    +- [38]:Calc(select=[f0, f3, rowtime])
       +- [39]:ConstraintEnforcer[NotNullEnforcer(fields=[f3, rowtime])]
          +- TableToDataSteam(type=ROW<`f0` INT, `f3` BIGINT NOT NULL, `rowtime` TIMESTAMP_LTZ(3) NOT NULL> NOT NULL, rowtime=false)
    Not sure if it’s relevant, but I thought the
    rowtime=false
    was unusual. I expected the first Window TVF should propagate a time attribute. The documentation says
    window_time
    is a time attribute, and shows how it’s used in a subsequent TVF as the
    rowtime
    . So, I’d expect it to propagate via
    toDataStream
    , too.
    Untitled
    • 1
    • 3
  • z

    Zhang Zhao

    05/05/2023, 1:48 AM
    Hi team,I meet a error when I use tidb cdc:My flink version:14.5;tidb cdc version2.3.0
    Copy code
    Exception in thread "main" java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:370) at org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:152) at org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:160) at com.zjlab.shuzihu.MysqlToEs_online.main(MysqlToEs_online.java:204) Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:177) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:120) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) ... 5 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628) at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:134) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:174) ... 7 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:252) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:242) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:684) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:444) at sun.reflect.GeneratedMethodAccessor16.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:537) at akka.actor.Actor.aroundReceive$(Actor.scala:535) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) at akka.actor.ActorCell.invoke(ActorCell.scala:548) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) at akka.dispatch.Mailbox.run(Mailbox.scala:231) at akka.dispatch.Mailbox.exec(Mailbox.scala:243) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: java.lang.IllegalArgumentException: Invalid range: [+INF..{116,128,0,0,0,0,0,2,66,95,114,223,255,255,255,255,255,255,249}) at org.apache.flink.shaded.guava30.com.google.common.collect.Range.<init>(Range.java:358) at org.apache.flink.shaded.guava30.com.google.common.collect.Range.create(Range.java:156) at org.apache.flink.shaded.guava30.com.google.common.collect.Range.intersection(Range.java:558) at org.tikv.cdc.CDCClient.overlapWithRegion(CDCClient.java:235) at org.tikv.cdc.CDCClient.addRegions(CDCClient.java:185) at org.tikv.cdc.CDCClient.applyKeyRange(CDCClient.java:178) at org.tikv.cdc.CDCClient.start(CDCClient.java:98) at com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction.run(TiKVRichParallelSourceFunction.java:165) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
    d
    • 2
    • 2
  • d

    Dheeraj Panangat

    05/05/2023, 5:26 AM
    Hi Team, Re-posting this as it was on an old thread. Appreciate any help with the same. Thanks
  • n

    Nitin Bansal

    05/05/2023, 11:30 AM
    Hi Team, I am trying to run PyFlink 1.17 on Mac When using StreamEnvironment it throws me an error
    Copy code
    TypeError: Argument 'input_stream' has incorrect type (expected pyflink.fn_execution.stream_fast.LengthPrefixInputStream, got BeamInputStream)
    It comes when when I add a streaming source to the environment. Any work around the error?
    d
    t
    • 3
    • 8
  • v

    Vladimir Tiukhtin

    05/05/2023, 11:57 AM
    Dear flink team, What is the real use of JOB_MANAGER_RPC_ADDRESS environment variable? Without setting it to kubernetes pod IP flink fails to start. I did not find it anywhere in documentation or code
  • t

    Thijs van de Poll

    05/05/2023, 4:02 PM
    Hi all! We are testing with PyFlink jobs in Kubernetes and are observing that the python jobs are ONLY executing on the jobmanager and not on the taskmanager. Is it correct that python jobs can only execute on the jobmanager, or are we missing something?
    d
    • 2
    • 1
  • h

    Hussain Abbas

    05/05/2023, 4:18 PM
    Hello guys, We have deployed apache flink kubernetes operator in our production environment using
    reactive scheduler
    and
    standalone mode
    . We are using HPA to scale up and down the task managers based on metrics, while flink takes cares of vertical scaling. Our pipeline consists of Kinesis as source and Dynamo as Sink. There are few issues we are seeing right now. 1- We are getting failing checkpoints with unalligned checkpoints, which causes taskmanager restart 2- Task manager restarts on high load not sure what is causing though as we cannot see any exceptions We have a very high load environments 10-12k events/s. Do you think there is any room for improvement? Are we handling it correctly?
    👀 1
    s
    • 2
    • 4
1...777879...98Latest