https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • k

    Krish Narukulla

    09/29/2022, 5:01 AM
    Can we make flink sql work for s3 scheme? https://github.com/apache/hudi/issues/4297
    m
    • 2
    • 9
  • d

    Dan Dubois

    09/29/2022, 8:44 AM
    Hi all. I'm asking for some advice with my issue of the Task Manager quitting unexpectedly when running a job. It's related to my Stack Overflow question here. I have a Docker Compose cluster set up as with the documentation example in session mode. I have
    SourceFunction
    ->
    DiscardingSink
    job where the source function produces never ending stream of short random strings. After a variable period of time my Task Manager quits with a 137 error and no stack trace. I can consistently produce this on my Linux machine but not on my Mac. I thought it might have been because of the unrelenting busy loop in
    SourceFunction
    continually spitting out data but am not sure. I tried using the
    Source
    API with a
    SingleThreadMultiplexSourceReaderBase
    which I thought worked but eventually after several hours got the same issue. Any advice on what is going on here and how I can mitigate it would be greatly appreciated.
    c
    t
    • 3
    • 34
  • a

    Abhinav sharma

    09/29/2022, 12:39 PM
    Is is possible for me to create an API in Java which will post the flink aggregated streams when the URL is hit? If yes, what can I use to create the API?
  • v

    Vincent canuel

    09/29/2022, 2:18 PM
    Hi, I would like to add some custom env to my flink job running on k8s via the k8s operator. I have tried to overload the
    podTemplate
    but this does not working (when I describe the running pod the env is missing). Here is my deployment :
    Copy code
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: flink01
    spec:
      image: flink01:1
      flinkVersion: v1_15
      podTemplate:
        spec:
          containers:
            - name: flink-main-container
              volumeMounts:
                - mountPath: /opt/flink/log
                  name: flink-logs
              env:
                - name: MY_CUSTOM_ENV
                  value: "SUPER"
          volumes:
            - name: flink-logs
              emptyDir: { }
      flinkConfiguration:
        taskmanager.numberOfTaskSlots: "2"
      serviceAccount: flink
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
      job:
        jarURI: local:///opt/flink/usrlib/my.jar
        parallelism: 2
        upgradeMode: stateless
    Could you help me on that ?
    m
    • 2
    • 13
  • h

    Hartmut

    09/29/2022, 4:56 PM
    Hi, quick question, with Flink Table API / SQL - if I create a table for connector kafka / kafka-upsert - is the table always source and sink at the same time???
    m
    r
    +2
    • 5
    • 17
  • r

    Rashmin Patel

    09/29/2022, 5:01 PM
    Hii Folks When restoring a state from checkpoint for a Flink SQL program, I am a facing a weird issue that watermark is getting reset to Long.MIN_VALUE and this is causing some of the past windows to fire again. Below is the runtime topology of Flink Job Kafka Source (upsert mode) >> ChangeLogNormalize >> GroupWindowAggregate >> PostgresSink I went into the flink code and added log statements to see what is happening internally.
    Copy code
    [CL] [WindowOperator] Initializing internalTimerService with wm -9223372036854775808
    [CL] [InternalWindowProcessFunction] this.ctx.currentWatermark() at open(): -9223372036854775808
    [CL] Opening ProcTimeDeduplicateKeepLastRowFunction
    [CL] [ProcTimeDedupeFn] cw -9223372036854775808 for input org.apache.flink.table.data.binary.BinaryRowData@f17bddd6
    Can someone help why this could be happening ?
    d
    • 2
    • 16
  • s

    Sigh

    09/29/2022, 6:21 PM
    Hi, im on flink 1.15.2 i usually disable generic on all my flink job and most of the time i use stream api, i noticed that when i use sql api with à fs sink, s3 or file it complain about List problem, it work with other table sink.
  • p

    Prasaanth Neelakandan

    09/29/2022, 7:19 PM
    hi folks, we are on Flink 1.13.5 and are running into checkpoint failures in our TMs intermittently. Our checkpoint storage is S3. These generally occur across 2-3 TMs at a time and when we checked the logs we see these WARN messages with the exception:
    2022-09-23 08:36:15,992 WARN  org.apache.hadoop.fs.s3a.S3AInstrumentation                  [] - Closing output stream statistics while data is still marked as pending upload in OutputStreamStatistics{blocksSubmitted=1, blocksInQueue=1, blocksActive=0, blockUploadsCompleted=0, blockUploadsFailed=0, bytesPendingUpload=9759410, bytesUploaded=0, blocksAllocated=1, blocksReleased=1, blocksActivelyAllocated=0, exceptionsInMultipartFinalize=0, transferDuration=0 ms, queueDuration=0 ms, averageQueueTime=0 ms, totalUploadDuration=0 ms, effectiveBandwidth=0.0 bytes/s}
    These have also often been accompanied with another exception:
    java.util.concurrent.CancellationException: null
    The root causes seem to be:
    org.apache.flink.streaming.runtime.tasks.AsyncCheckpointRunnable [] - VideoQos Stream Reporter (14/40)#0 - asynchronous part of checkpoint 5149 could not be completed.
    org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Checkpoint 5149 of job 22c1b49deb67c5388647ac9a79bf87ca expired before completing.
    Could anyone help understand these exceptions ? And how to solve these checkpoint failures ? will using the S3 presto plugin help ? Am attaching the detailed stacktraces for these exceptions as well as our Flink config settings. Any guidance would be appreciated 🙂
    d
    • 2
    • 7
  • d

    ding bei

    09/30/2022, 5:39 AM
    Hi, i would like to change rest service type to Loadbalancer ,but it wont work. then I tried NodePort, still does not work , is this a bug or did i do something wrong? can somebody please take a look.
    kubernetes.rest-service.exposed.type
    Copy code
    ################################################################################
    #  Licensed to the Apache Software Foundation (ASF) under one
    #  or more contributor license agreements.  See the NOTICE file
    #  distributed with this work for additional information
    #  regarding copyright ownership.  The ASF licenses this file
    #  to you under the Apache License, Version 2.0 (the
    #  "License"); you may not use this file except in compliance
    #  with the License.  You may obtain a copy of the License at
    #
    #      <http://www.apache.org/licenses/LICENSE-2.0>
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    # limitations under the License.
    ################################################################################
    
    apiVersion: <http://flink.apache.org/v1beta1|flink.apache.org/v1beta1>
    kind: FlinkDeployment
    metadata:
      name: statemachineflink
    spec:
      image: flink:1.15
      flinkVersion: v1_15
      ingress:
        template: "xxx"
        className: "alb"
      flinkConfiguration:
        kubernetes.rest-service.exposed.type: "Loadbalancer"
        containerized.master.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-presto-1.15.2.jar"
        containerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS: "flink-s3-fs-presto-1.15.2.jar"
        taskmanager.numberOfTaskSlots: "2"
        state.savepoints.dir: "<s3://xxx>"
        state.checkpoints.dir: "<s3://xxx>"
        high-availability: "org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory"
        high-availability.storageDir: "xxx"
        execution.checkpointing.interval: 10s
        state.backend: rocksdb
      jobManager:
        resource:
          memory: "2048m"
          cpu: 1
      taskManager:
        resource:
          memory: "2048m"
          cpu: 1
      podTemplate:
        metadata:
          labels:
            app: statemachineflink
        spec:
          containers:
            - name: flink-main-container
              volumeMounts:
              - mountPath: /flink-data
                name: flink-volume
          volumes:
            - name: flink-volume
              emptyDir: {}
      job:
        jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
        parallelism: 2
        upgradeMode: savepoint
        state: running
        savepointTriggerNonce: 0
    g
    m
    • 3
    • 24
  • a

    Aqib Mehmood

    09/30/2022, 8:21 AM
    Hi All, We are running flink on K8s. We are using flink to build triggers, based upon changes in the product prices in our prod database. The hurdle that we're facing right now is that, • We need to store pricing data locally in flink to be able to build stateful processor on top of it • We need to persist this data in case of job failure. Currently, we're looking into hive metastore for storage of our pricing data. Since we're just using the metastore and not the complete hive architecture, this seems like a lengthy approach. Is there any simpler way to do this? Can we store and persist our data somewhere else? TIA
  • n

    Nick Pocock

    09/30/2022, 10:54 AM
    Hey, does anyone know if its possible to use SQS as a connector to Flink?
    d
    s
    • 3
    • 7
  • b

    Balazs Varga

    09/30/2022, 11:31 AM
    Hi, We have a job running on Flink 1.13.2 with
    yarn-per-job
    mode with Zookeeper HA. The job has gone into a
    SUSPENDED
    state because of a Zookeeper timeout:
    Client session timed out, have not heard from server in 40019ms for sessionid 0x20a20d1641b67b4
    , and the cluster is shut down:
    Copy code
    org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Shutting down cluster with state SUSPENDED, jobCancelled: false, executionMode: DETACHED
    org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics null.
    After it starts up again, it does not find the checkpoint handles:
    Copy code
    org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Recovering checkpoints from ZooKeeperStateHandleStore{namespace='flink/application_1661494717285_98085/checkpoints/a636bd043052b707aadb1a7b98fe99d5'}.
    org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 0 checkpoints in ZooKeeperStateHandleStore{namespace='flink/application_1661494717285_98085/checkpoints/a636bd043052b707aadb1a7b98fe99d5'}.
    I see in the logs of the shut down application that these were called, which seems to have cleared these handles:
    Copy code
    org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices [] - Close and clean up all data for ZooKeeperHaServices.
    org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices [] - Finished cleaning up the high availability data.
    As I understood from browsing jiras (e.g. this one) and the docs, HA data, such as checkpoints, should only be cleaned up on a globally terminal state, which would make a lot of sense. Can someone help understand why the state cleanup could have happened in my case?
    ✅ 1
    m
    c
    • 3
    • 11
  • t

    Tommy Gunnarsson

    09/30/2022, 11:40 AM
    Hi! We are seeing high latency on
    Sync duration
    during checkpointing, in the docs it says that sync duration is
    The duration of the synchronous part of the checkpoint. This includes snapshotting state of the operators and blocks all other activity on the subtask (processing records, firing timers, etc).
    which is, our assumption, the time it takes to checkpoint all our state for the operators. What could be taking time here? We are running our job on k8s on nodes with RocksDB backed by local NVMe drives. For starters, we are trying out with a total state of 100GB. Here's some numbers and our config:
    d
    s
    • 3
    • 3
  • s

    Shen Zhu

    09/30/2022, 11:21 PM
    Hi team, happy Friday! We are trying to write an aggregate user defined function and use that in Flink SQL. According to the docs:
    Copy code
    Subsequently, the accumulate(...) method of the function is called for each input row to update the accumulator. Once all rows have been processed, the getValue(...) method of the function is called to compute and return the final result.
    It seems hard to define
    rows have been processed
    in a streaming application, is it possible to have
    getValue()
    method called right after
    accumulate()
    ? Basically , generate aggregated results once there's new record coming, thanks!
    m
    • 2
    • 28
  • j

    Jirawech Siwawut

    10/01/2022, 3:45 AM
    Hi Team. I am using Hive Sink on Flink and found that checkpointing is quite slow compared to Kafka Sink. Data size is less than 100kb, but it takes more than 5 to 10 seconds to finish each checkpoint. Can someone point where to check further e.g. code where Kafka perform checkpoint vs where Hive sink performs checkpoint. BTW, I am using local file storage for checkpoint
    d
    • 2
    • 3
  • k

    Krish Narukulla

    10/01/2022, 6:32 AM
    Is there a format for confluent protobuf?
    m
    • 2
    • 3
  • h

    Hartmut

    10/01/2022, 11:17 AM
    Hi, question: When I setup my env for checkpointing, I define a path. What about RocksDB state backend? Where does it live? How to configure (both for local + e.g. k8s -> PV, …)? (note: I’m currently testing/running from local machine / IDE)
    Copy code
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
        val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    
        env.enableCheckpointing(5000)
        env.checkpointConfig.setCheckpointStorage("file:///Users/xyz/dev/my-flink-app/checkpoints")
        env.stateBackend = EmbeddedRocksDBStateBackend(true)
        
        // ...
        
        env.execute("Stream Join Demo"
    k
    • 2
    • 3
  • k

    Krish Narukulla

    10/02/2022, 4:25 AM
    Flink GS integration using documentation https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/gcs/. Found there is no file system for scheme:
    gs
    . Later I have added below to hive-site.yaml.
    Copy code
    <property>
        <name>fs.gs.impl</name>
        <value>org.apache.flink.fs.gs.GSFileSystem</value>
      </property>
    Later resulted no class found:`org.apache.flink.fs.gs.GSFileSystem`. I have looked at flink cluster jars found, below
    Copy code
    root@airstream-f69f865f8-xcqfh:/opt/flink# ls plugins/gs-fs-hadoop/
    flink-gs-fs-hadoop-1.15.2.jar
    root@airstream-f69f865f8-xcqfh:/opt/flink# ls opt/flink-gs-fs-hadoop-1.15.2.jar
    opt/flink-gs-fs-hadoop-1.15.2.jar
    • 1
    • 1
  • r

    Ravi-Dev

    10/02/2022, 8:28 AM
    Hi Team, Question. I am new to Flink, I have created a Kafka Source Stream and Use JDBC Sink to persist in DB. I have a one Parent Table and two Child Table, JDBC Sink works fine and I am able insert data into Main table, my stream has List of Child objects which need to ingest into sub or Child Table. Please suggest how to implement this, can this be done using JDBC Sink or to ingest data in Child Table I need to use some different APIs?
    👀 2
    h
    • 2
    • 3
  • h

    Hartmut

    10/03/2022, 12:13 PM
    question: with table api - when emitting data, e.g. to kafka in avro format - currently it’s not possible to define the namespace or apply a pre-defined schema / avro-artifact (as the documentation tells https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/avro/#data-type-mapping) Is there any good example / known pattern? 🤔 (I’m using the table api) there’s always the manual option to resolve as a DataStream, then .map into the target defined custom avro, then sinkTo kafka..?
  • m

    Marti H

    10/03/2022, 12:48 PM
    Question: I am currently facing this problem, and i checked here and there is the bugfix created, and PR is ready , do you know how long it takes usually to merge? or who i can ping to ask ? thanks mates 😄
  • r

    Rommel

    10/03/2022, 5:04 PM
    when i try to install
    Copy code
    flink-kubernetes-operator
    follow link here https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/operations/helm/ it says that
    Copy code
    Error: failed to download "helm/flink-kubernetes-operator" (hint: running `helm repo update` may help)
    so which helm repo should i add first?
    m
    g
    t
    • 4
    • 28
  • z

    Zsombor Chikan

    10/04/2022, 1:10 AM
    Hey guys! Do you know whether does Flink has an equivalent to spark’s --conf spark.dynamicAllocation.enabled=false? Thank you in advance!
    m
    • 2
    • 2
  • s

    Sumit Nekar

    10/04/2022, 8:23 AM
    Hi , Is this the correct way of overriding watchNamespaces value for flink-kubernetes-operator chart?
    Copy code
    helm upgrade --install --wait flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator --set watchNamespaces={ns1,ns2}
    
    I see that flink operator pod not getting restarted after helm upgrade.
    m
    • 2
    • 17
  • s

    Sevvy Yusuf

    10/04/2022, 10:52 AM
    Hi guys, I'm quite new to Flink and have some questions regarding the source in batch mode. Is it possible to parallelise the source, particularly the FileEnumerator so the splits are created in parallel?
    k
    • 2
    • 1
  • p

    Palani

    10/04/2022, 1:10 PM
    Copy code
    Hi All,
    
    We are trying to enrich streaming data with a look up data. Look up data will be kind of static(updated once in a while).
    For our streaming application we are using flink table API. To handle this scenario we are trying to use flink temporal join.
    We have create two tables for our sources as below (below example is mocked one)
    
    
    1. Look up data
    
    	CREATE TABLE versioned_rates (
    		currency STRING,
    		rate DECIMAL(38, 10),
    		currency_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
    		WATERMARK FOR currency_time AS currency_time,
    		PRIMARY KEY(currency) NOT ENFORCED 
    	) WITH (
    	    'connector' = 'upsert-kafka',
    	    ...)
    
    2. Streaming data (source streaming data)
    	
    	CREATE TABLE orders (
    	    order_id STRING,
    	    currency STRING,
    	    amount INT,
    	    order_time TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' VIRTUAL,
    	    WATERMARK FOR order_time AS order_time
    	) WITH (
    	    'connector' = 'kafka',
    	    ...)
    
    
    The join on these tables is like below
    
    	SELECT 
    	  o.order_id,
    	  o.order_time,
    	  o.amount * r.rate AS amount,
    	  r.rate rate,
    	  r.currency
    	FROM orders AS o JOIN versioned_rates FOR SYSTEM_TIME AS OF o.order_time r
    	on o.currency = r.currency;
    
    The issue we are facing is that we are not getting output on this join if there is no update in the look up data.
    
    We need the behaviour like, the data should be written for every record we receive in streaming data even if there is no update in look up data
    
    After doing some studies we understand that this may be related to watermark is not getting matched between streams,
    but we couldn't clearly understand why. If we can get any help on this that would be appreciated.
    d
    h
    +2
    • 5
    • 7
  • m

    Martijn Visser

    10/04/2022, 1:21 PM
    Hi everyone. I've opened up a discussion on the Flink Dev mailing list with a proposal to deprecate and remove Scala API support from Flink via a FLIP. In case you're not familiar with this, a FLIP is a Flink Improvement Proposal which are used to propose major changes to Flink. You can find the FLIP at https://cwiki.apache.org/confluence/display/FLINK/FLIP-265+Deprecate+and+remove+Scala+API+support. This includes the motivation, the proposed changes and other relevant information. I'm posting this in #C03G7LJTS2G to get the most feedback from Flink users (similar to have I've reached out to the User mailing list) As part of the discussion, it would be really helpful to get feedback from Scala users on this specific proposal. Preferably you give your feedback by replying in the discussion thread on the Dev / User mailing list, which you can find at https://lists.apache.org/thread/d3borhdzj496nnggohq42fyb6zkwob3h. In case you're not subscribed to the mailing lists, feel free to also reply here. I will then relay your feedback from here as a reply (including your name from Slack) on this mailing list, because per our community rules
    All important decisions and conclusions *must be reflected back to the mailing lists.* "If it didn't happen on a mailing list, it didn't happen." - The Apache Mottos
    .
    😲 2
    🙁 3
    👍 1
    ✔️ 1
    n
    • 2
    • 2
  • i

    Ildar Almakaev

    10/04/2022, 2:05 PM
    Hi, community. I’ve got a question about Event-Time Temporal Join and watermarks... Let me elaborate on the issue… I am implementing an app to enrich user’s audit logs (
    logs)
    with
    users
    info using Event-Time Temporal join with Table API. AFAIK, to be able to join those two streams I need to: 1. Define a watermark strategy
    .withTimestampAssigner(...)
    on each source operator (
    FlinkKafkaConsumer
    ) 2. When converting a
    DataStream
    to
    Table
    , define a timestamp column and a watermark based on it in a table’s schema 3. Use the
    log's
    timestamp column in Event-Time Temporal Join query There is below a code snippet for clarity. Also I attached some screenshots from Flink’s UI... Issue: • When I’m running the app and looking at the Flink’s UI dashboard, I can’t see any watermark info for
    users
    info in task level. It says me
    No Watermark (Watermarks are only available if EventTime is used)
    . I wonder why there is no watermark info for the
    users
    pipeline even if I define the watermark strategy. • However, I’m seeing watermarks for
    logs
    🤔 • I think not having watermarks for
    users
    causes no join between tables. Could you please help me figure out what there might be wrong? I would appreciate any help, comments and recommendation 🙂
    Copy code
    // Initialize Flink job environment. Some properties are omitted for readability ...
    StreamExecutionEnvironment env = ...
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    tableEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
    ...
    
    /* 1.1 Read 'users' Kafka topic*/
    DataStream<UserDto> usersStream = env
            .addSource(new CustomFlinkKafkaConsumer<>(...))
            .uid("source-users").name("Read Users")
            .map(UserRecordMapper::mapToUserDto)
            .uid("map-to-user-model")
            .keyBy(UserDto::getUId);
    
    // 1.2 Datastream to Table
    Table tUsers = tableEnv.fromDataStream(usersStream, Schema.newBuilder()
            .primaryKey("u_id")
            .columnByExpression("lastModifiedTs2", "TO_TIMESTAMP_LTZ(lastModifiedTs, 3)")
            .watermark("lastModifiedTs2", "lastModifiedTs2 - interval '1' day")
            .build());
    tUsers.printSchema();
    tableEnv.createTemporaryView("users", tUsers);
    Consume
    logs
    records from Kafka topic
    Copy code
    /* 2.1) Read 'logs' Kafka topic*/
    DataStream<LoggingAudit> logAuditStream = env
            .addSource(new CustomFlinkKafkaConsumer<>(...))
            .uid("source-logs").name("Read Logs")
            .filter(l -> Objects.nonNull(l.getUserId()))
            .keyBy(LoggingAudit::getUserId);
    
    DataStream<LoggingAuditDto> transformedLogAudit = logAuditStream
            .map(new LogAuditTransformer())
            .uid("transform-log-audit").name("Transform Log Audit");
    
    // 2.2) Datastream to Table
    Schema logAuditTableSchema = Schema.newBuilder()
            .columnByExpression("insertDateTs2", "TO_TIMESTAMP_LTZ(insertDateTs, 3)")
            .watermark("insertDateTs2", "insertDateTs2 - interval '1' day")
            .build();
    Table logAuditTable = tableEnv.fromDataStream(transformedLogAudit, logAuditTableSchema);
    logAuditTable.printSchema();
    tableEnv.createTemporaryView("logs", logAuditTable);
    3. Enrich logs with users data using Event Time Temporal Join
    Copy code
    tableEnv.sqlQuery("select logs.id, logs.action, users.u_id as userId " +
                    "from logs " +
                    "join users FOR SYSTEM_TIME AS OF logs.insertDateTs2 " +
                    "on logs.userId = users.u_id")
            .execute()
            .print();
    UPD: I updated the code (removed
    assignTimestampsAndWatermarks
    from source operators)
    p
    d
    • 3
    • 10
  • s

    Sumit Nekar

    10/04/2022, 2:41 PM
    Hello, I am trying tune memory config for TM and JM for my flink job deployed using flink-kuberenetes operator. These are my configurations.
    jobmanager.memory.process.size: "1500m"
    taskmanager.memory.process.size: "1500m"
    taskmanager.memory.jvm-metaspace.size: "256m"
    But I see flink operator is trying to request following TM Received new TaskManager pod: pipeline-event-dedup-taskmanager-1-10 2022-10-04 143602,610 INFO org.apache.flink.runtime.resourcemanager.active.ActiveResourceManager [] - Requested worker pipeline-event-dedup-taskmanager-1-10 with resource spec WorkerResourceSpec {
    cpuCores=1.0, taskHeapSize=270.000mb (283115513 bytes), taskOffHeapSize=0 bytes, networkMemSize=105.200mb (110310196 bytes), managedMemSize=420.800mb (441240787 bytes), numSlots=2
    }. I am not understanding why
    taskOffHeapSize=0
    is being set to 0 bytes here? Appreciate your help.
    c
    • 2
    • 2
  • n

    Nithin kharvi

    10/04/2022, 3:19 PM
    Hi, we are getting below error when flink job (sink operator) tries to produce to kafka and the flink job tries to init the producer and goes into infinite loop. This issue started recently when we moved to strimzi kafka Any thoughts why this error is coming [Producer clientId=producer-kafka-sink-0-1, transactionalId=kafka-sink-0-1] Transiting to fatal error state due to org.apache.kafka.common.errors.ProducerFencedException: There is a newer producer with the same transactionalId which fences the current one.
    m
    • 2
    • 6
1...212223...98Latest