https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • c

    Chris Ro

    12/02/2022, 11:25 PM
    what causes some subtasks to not send/process events? my incoming records are randomly partitioned and eventually go to a keyBy, but i’m seeing subtasks with 0 records sent (unused?) in the flink dashboard. is there a common reason for this?
    s
    • 2
    • 4
  • e

    Emmanuel Leroy

    12/03/2022, 2:00 AM
    what happens when using a reduce function after a window, when there is only 1 value in the window? does the single value get passed to the next task as is? I’m trying to mutate an object on reduce to merge / move fields, but sometimes my object seem to be passed as is, and it seems to be when there is only 1 item in the window. Does that make sense? If so what would be the way to edit field in a reduce and insure all items get processed?
    m
    • 2
    • 2
  • s

    Sandeep Kongathi

    12/03/2022, 5:24 PM
    Hi All, I am trying to experiment a few things in Flink-SQL with Source
    Copy code
    CREATE TABLE orders (
        order_uid  BIGINT,
        product_id BIGINT,
        price      DECIMAL(32, 2),
        order_time TIMESTAMP(3)
    ) WITH (
        'connector' = 'datagen'
    );
    Sink
    Copy code
    CREATE TABLE orders_kafka (
        order_uid  BIGINT,
        product_id BIGINT,
        price      DECIMAL(32, 2),
        order_time TIMESTAMP(3),
        PRIMARY KEY (`order_uid`) NOT ENFORCED
    ) WITH (
      'connector' = 'upsert-kafka',
      'properties.bootstrap.servers' = 'redpanda:29092',
      'topic' = 'orders',
      'sink.parallelism' = '2',
      'key.format' = 'json',
      'value.format' = 'json',
      'key.json.ignore-parse-errors' = 'true',
      'value.json.fail-on-missing-field' = 'false',
      'value.fields-include' = 'EXCEPT_KEY'
    );
    Finally when I insert the data with
    Copy code
    INSERT INTO orders_kafka
    SELECT *
    FROM orders;
    I am getting below error
    d
    d
    • 3
    • 11
  • r

    raghav tandon

    12/03/2022, 8:33 PM
    I want to change Sink parallelism so that Kafka push rate can be higher… But thr doesnt seem to be any way out rather than setting
    pipeline.max-parallelism
    but this increases of other operators as well… And i am not able to set parallelism at sink operator level
    Copy code
    Operator org.apache.flink.streaming.api.datastream.KeyedStream@76828577 cannot set the maximumparalllelism
    Pipeline writes to 3 different sinks using `SideOuput`…. Pls suggest if there is a way out…
    d
    • 2
    • 5
  • j

    Jirawech Siwawut

    12/04/2022, 1:41 AM
    Hi. I am trying to use temporal join to enrich two datastream, perform hop window aggregation and join these two streams eventually. Please refer to picture. Lets assume that i use Flink sql in window join step
    Copy code
    select
          agg1.id
         ,agg2.id
         ,agg1.window_start
         ,agg2.window_start
         ,agg1.window_end
         ,agg2.window_end
    from agg1
    left join agg2
    ON agg1.window_start = agg2.window_start
    AND agg1.window_end = agg2.window_end
    AND agg1.id=agg2.id
    The output is weird where there is always
    null
    value from agg2. It only works for some window at the beginning and start to produce null afterwards. I already try to separate
    agg1
    and
    agg2
    , and found that they both product output for the same window and key. Does anyone here experience the same behavior?
  • s

    sharad mishra

    12/04/2022, 2:29 AM
    Hello 👋, I am using flink(1.16) on yarn with Kafka(3.2.3) as source for reading data. Flow of application is Kafka(source topic) -> flink keyed stream -> Flink Kafka sink -> Kafka(target Topic) I noticed a huge difference in flink operators initialization time, when I change my DeliveryGuarantee from AT_LEAST_ONCE to EXACTLY_ONCE in flink kafka sink operator Initialization time with AT_LEAST_ONCE -> 1 mins Initialization time with EXACTLY_ONCE -> 15 mins Is that expected ? is there a way to reduce Initialization time with EXACTLY_ONCE for flink kafka sink operator ? This is how my kafka sink looks like
    Copy code
    val serializer = AvroSerializationSchema.forSpecific(classOf[DCNPOJORecord])
        val kafkaRecordSerializationSchema = KafkaRecordSerializationSchema.builder()
          .setTopic(targetTopic)
          .setValueSerializationSchema(serializer)
          .build()
    
        val sink: KafkaSink[DCNPOJORecord] = KafkaSink.builder()
          .setBootstrapServers(brokerURL)
          .setProperty("<http://transaction.max.timeout.ms|transaction.max.timeout.ms>", transactionMaxTimeoutMs)
          .setProperty("<http://transaction.timeout.ms|transaction.timeout.ms>", transactionTimeoutMs)
          .setTransactionalIdPrefix(transactionIdPrefix)
          .setRecordSerializer(kafkaRecordSerializationSchema)
          .setDeliveryGuarantee(deliveryGuarantee)
          .build()
        sink
    d
    m
    • 3
    • 9
  • m

    Marco Villalobos

    12/04/2022, 4:07 AM
    is there a way, within a keyed process function, to detect idleness? Kafka is my source. I use event time processing. I have hundreds of thousands unique keys. And if an element with a given key does not arrive within a window, I need to collect its previous value. That works very well when there is traffic on Kafka. But if the source receives no data, I then use processing time to detect idleness. That works very well if the Flink job is always running. However, if the Flink job goes down for a few days, there are race conditions between the processing time timers and event time timers creating havoc upon the sink. Is there another way to check that the watermark is not advancing, like when was the last time the watermarked changed in a global manner, shared across hundreds of thousands of keys?
  • s

    Sumit Nekar

    12/04/2022, 7:49 AM
    Hello, I am trying to tune memory configuration for my flink job deployed using FlinkOperator. Following are the memory settings I am using. I am configuring only Total memory as mentioned in this doc. https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/memory/mem_setup_tm/
    taskmanager.memory.process.size: "8000m"
    taskmanager.memory.task.off-heap.size: "500m"
    taskmanager.memory.jvm-metaspace.size: "250m"
    When the job starts processing, the metrics show that flink_taskmanager_Status_Flink_Memory_Managed_Used is always ZERO. where flink_taskmanager_Status_Flink_Memory_Managed_Total is set to 5G Is this configuration fine or should I need to configure either configured explicitly via
    taskmanager.memory.managed.size
    ?
    d
    • 2
    • 3
  • a

    Amenreet Singh Sodhi

    12/05/2022, 6:02 AM
    Hi Everyone! I am trying to build flink by running the script create_binary_release.sh available at
    Copy code
    flink-1.16.0/tools/releasing
    using maven version 3.2.5 and java 11, But i get the following error while doing so
    Copy code
    [ERROR] Failed to execute goal org.apache.maven.plugins:maven-javadoc-plugin:2.9.1:jar (attach-javadocs) on project flink-core: MavenReportException: Error while creating archive:
    [ERROR] Exit code: 1 - /Users/amensodhi/flink-1.16.0/flink-core/src/main/java/org/apache/flink/management/jmx/JMXServer.java:123: error: package sun.rmi.registry is not visible
    [ERROR] private static class JmxRegistry extends sun.rmi.registry.RegistryImpl {
    [ERROR] ^
    [ERROR] (package sun.rmi.registry is declared in module java.rmi, which does not export it to the unnamed module)
    [ERROR] 
    [ERROR] Command line was: /Library/Java/JavaVirtualMachines/jdk-11.0.16.1.jdk/Contents/Home/bin/javadoc -Xdoclint:none --add-exports=java.base/sun.net.util=ALL-UNNAMED @options @packages
    [ERROR] 
    [ERROR] Refer to the generated Javadoc files in '/Users/amensodhi/flink-1.16.0/flink-core/target/apidocs' dir.
    [ERROR] -> [Help 1]
    But when i build it simply using mvn clean install i dont face any such issues. How to fix this issue? Thanks
    m
    • 2
    • 6
  • a

    Abdelhakim Bendjabeur

    12/05/2022, 10:35 AM
    Hello 👋 I am trying to find a way to leverage Flink and Flink SQL to write a Data Analytics pipeline (Join multiple Kafka topics -> target topic -> OLAP store) I'd like to know what will be the impact of an ever-growing state when joining two or more stream with Flink SQL. Does anybody have experience with this?
    s
    • 2
    • 1
  • n

    Nick Pocock

    12/05/2022, 10:42 AM
    Has anyone used PubSub with Flink StateFun? I see Flink has a connector for PubSub but looks like StateFun only allows Kinesis or Kafka for ingress / egress. We are using the Go SDK for StateFun, would we need to fork the Go SDK to allow the use of PubSub? Thanks!
  • g

    Giannis Polyzos

    12/05/2022, 12:49 PM
    I have a Flink SQL query needs to do a GROUP BY window and aggregate a column as a collection. I found two functions with in Flink SQL - COLLECT() and LISTAGG(), but after i aggregate the events i need to check whether a particular value is present or not in a collection of Strings. Seems like COLLECT returns a Multiset, LISTAGG returns a String and the only available function to search if a string is within a collection is ARRAY_CONTAINS that needs an array as input. Seems like all different types are incompatible, so my question is what would be the best way to achieve this? Do I need to create a UDF?
    Copy code
    SELECT
      TUMBLE_START(eventTime_ltz, INTERVAL '30' SECONDS) AS startT,
      TUMBLE_END(eventTime_ltz, INTERVAL '30' SECONDS) AS endT,
      userSession,
      COLLECT(eventType) AS userSessionEventTypesCount, 
      LISTAGG(eventType) AS events
    FROM click_events
    GROUP BY TUMBLE(eventTime_ltz, INTERVAL '30' SECONDS), userSession
    m
    • 2
    • 1
  • f

    Felix Angell

    12/05/2022, 6:31 PM
    Hey 👋 I'm seeing this error on cleaning leaking objects up since upgrading to 1.15 on local (see stacktrace in thread)
    d
    • 2
    • 6
  • r

    René

    12/05/2022, 8:25 PM
    Hi, creating a connector with ojdbc.jar on the ververica platform leads to the following message:
    Copy code
    The JAR contains multiple connectors. Please choose which one you want to register.
    But I can't choose any connector. Does anyone know how to deal with that or how to install the Oracle driver?
  • m

    Michael Parrott

    12/05/2022, 10:09 PM
    👋 Hey all, I’m using Flink’s TestHarness (specifically, a KeyedTwoInputStreamOperatorTestHarness) to do some unit testing on my stateful KeyedCoProcessFunction. As a part of my unit tests, I want to verify that certain pieces of state in my operator get sets to certain values. However, I can find only
    getOutput
    and
    numKeyedStateEntries
    as ways of verifying that my function does as I expect. Is there a way to inspect the state inside my KeyedCoProcessFunction when unit testing it with a test harness?
    d
    • 2
    • 2
  • m

    Matyas Orhidi

    12/06/2022, 2:17 AM
    Hi folks what is the recommended setting for these properties on a busier K8s cluster?
    Copy code
    high-availability.kubernetes.leader-election.lease-duration
    high-availability.kubernetes.leader-election.renew-deadline
    s
    • 2
    • 19
  • k

    Krish Narukulla

    12/06/2022, 5:39 AM
    How to create Cassandra/scylla source lookup functionality? i have followed similar approach as hbase , but scylladb can have more than one column in primary key. How can i get those columns and values in the same order.
    Copy code
    public abstract class LookupFunction extends TableFunction<RowData> {
      public LookupFunction() {
      }
    
      public abstract Collection<RowData> lookup(RowData var1) throws IOException;
    
      public final void eval(Object... keys) {
        GenericRowData keyRow = GenericRowData.of(keys);
    
        try {
          Collection<RowData> lookup = this.lookup(keyRow);
          if (lookup != null) {
            lookup.forEach(this::collect);
          }
        } catch (IOException var4) {
          throw new RuntimeException(String.format("Failed to lookup values with given key row '%s'", keyRow), var4);
        }
      }
  • t

    Tudor Plugaru

    12/06/2022, 8:43 AM
    Hey team 👋 I have a Flink pipeline with a Kafka source and I want to set some monitoring for it. I am really interested in something like Consumer Lag, but since Flink is not having an active consumer registered on Kafka, it’s hard to get those metrics. So, now I am looking at
    records_lag_max
    metrics, but the values aren’t really reflecting the real numbers of the lag, for example I have around 400mil records in the topic, but the metic shows a value of around 4mil. Just trying to understand how best to monitor the backlog the job has based only on the metrics Flink exports. Thanks in advance.
    👀 3
    • 1
    • 1
  • g

    Gaurav Miglani

    12/06/2022, 12:20 PM
    we want to write parquet from flink to s3 using table api, able to do it, but there is no any option to specify compression snappy, is it even possible ?
    c
    • 2
    • 8
  • t

    Tamas Kiss

    12/06/2022, 1:29 PM
    Hi Team, in the flink global dashboard we can see a few jobs in the running section which are actually in SUSPENDED state and we would like to get rid of them. My understanding that these jobs are retrieved from historyserver’s archive dir. Based on this we plan to remove the files from that folder and apply
    historyserver.archive.clean-expired-jobs = true
    to do the clean up. Is this a safe solution, or something else is recommended instead?
    m
    • 2
    • 5
  • n

    Nick Pocock

    12/06/2022, 2:01 PM
    Hey, I've managed to plug in the GCP PubSub dependency into the Flink Statefun JAR and then build the Docker image. Its not too clear how I now specify my PubSub ingress and egress in the
    module.yaml
    that we use with the StateFun image https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/ For example, for Kakfa you use
    Copy code
    kind: io.statefun.kafka.v1/egress
    spec:
      id: com.example/my-egress
      address: kafka-broker:9092
      deliverySemantic:
        type: exactly-once
        transactionTimeout: 15min
    Has anyone used a custom egress with the StateFun stuff? Thanks
  • e

    Emmanuel Leroy

    12/06/2022, 3:31 PM
    I’m a bit confused about tasks, nb of task and task managers slots. I thought it was 1 task = 1 slot, but I have a pipeline that has parallelism 2, and 6 tasks, so 12 total. yet it runs on a single task manager that has 2 task slots ?!?. Another pipeline with a single task, parallelism 2, also runs on a single TM with 2 slots. Even if I consider CPU cores, it doesn’t add up.
    c
    • 2
    • 2
  • t

    Thiruvenkadesh Someswaran

    12/06/2022, 4:59 PM
    not sure if this is the correct channel I would like to hook up my CI/CD (which pushes out a jar to s3) . and then the flink deployment uses an s3 location to launch? Is that the normal way to do it?
    e
    • 2
    • 1
  • m

    Marco Villalobos

    12/06/2022, 5:36 PM
    Are there any steps needed to switch a job from incremental checkpointing to non-incremental when using Flink 1.14.2?
  • r

    Rommel

    12/06/2022, 6:24 PM
    i want to ask about reading the batch input and output to streaming . Is there a way for flink to read a hive table (slowly changing, mostly add new data into the table/partition) to read those record out and put into kafka? has anyeone done that ? hive doesn’t have CDC, so that rules out using Flink CDC.
  • s

    Sami Badawi

    12/06/2022, 9:43 PM
    Trying to run PyFlink 1.16.0 on Mac ARM M1 I tried with both Python 3.8 and Python 3.9, using variations of the following instructions I was able to install PyFlink:
    Copy code
    export FLINK_CONDA_HOME=$(dirname $(dirname $CONDA_EXE))
    export GRPC_PYTHON_BUILD_SYSTEM_OPENSSL=1
    export GRPC_PYTHON_BUILD_SYSTEM_ZLIB=1
    
    ~/miniconda3/bin/conda create -n pyflink_38 python=3.8
    conda activate pyflink_38
    pip install -r ./dev/dev-requirements.txt
    conda install -c conda-forge grpcio
    pip install -r ./dev/dev-requirements.txt
    pip install --upgrade pip setuptools wheel
    pip install -r ./dev/dev-requirements.txt
    
    pip install apache-flink
    But I was still not able to run the basic_operations.py example from GitHub:
    Copy code
    python /Users/sami/Documents/GitHub/flink/flink-python/pyflink/examples/datastream/basic_operations.py
    
      File "/Users/sami/miniconda3/envs/pyflink_39/lib/python3.9/site-packages/pyflink/fn_execution/flink_fn_execution_pb2.py", line 38, in <module>
        _INPUT = DESCRIPTOR.message_types_by_name['Input']
    AttributeError: 'NoneType' object has no attribute 'message_types_by_name'
    I read that the problem has been fixed, and this will go out with Flink 1.17, but I was wondering if anybody found a workaround, so I could experiment with PyFlink now?
    x
    • 2
    • 11
  • m

    Marco Villalobos

    12/06/2022, 10:20 PM
    Our system just crashed in production again. We are using incremental checkpoints, and there are two errors in the logs. The task manager has too many open files:
    Copy code
    2022-12-06 16:32:42,550 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - aggregate tags (3/4) (f72fc395431b2f2f8cd17b75f27a040e) switched from RUNNING to FAILED on container_1664952361217_0019_01_000003 @ ip-10-45-1-55.us-west-2.compute.internal (dataPort=42231).
    org.apache.flink.util.SerializedThrowable: Could not perform checkpoint 240017 for operator aggregate tags (3/4)#0.
    ...
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
            at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_352]
    Caused by: org.apache.flink.util.SerializedThrowable: Could not complete snapshot 240017 for operator aggregate tags (3/4)#0. Failure reason: Checkpoint was declined.
    ....
    Caused by: org.apache.flink.util.SerializedThrowable: While open a file for appending: /mnt/yarn/usercache/root/appcache/application_1664952361217_0019/flink-io-a8866c5d-1d84-43f9-9a3e-a6046505da2a/job_ee369b360e11b92b4ddf86ae4d1bc692_op_WindowOperator_6c048ec3ba20a654976042820b488880__3_4__uuid_1110507e-f1b2-4ac0-9443-9db3ac2ebdf0/chk-240017.tmp/MANIFEST-1145131: Too many open files
    ....
    2022-12-06 16:32:42,585 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task 6c048ec3ba20a654976042820b488880_2.
    2022-12-06 16:32:42,586 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 44 tasks should be restarted to recover the failed task 6c048ec3ba20a654976042820b488880_2.
    Then thirty minutes later job manager ran out of memory:
    Copy code
    2022-12-06 17:01:30,040 ERROR org.apache.flink.util.FatalExitExceptionHandler              [] - FATAL: Thread 'flink-akka.remote.default-remote-dispatcher-5' produced an uncaught exception. Stopping the process...
    java.lang.OutOfMemoryError: Java heap space
    I believe that incremental checkpoints requires too many files and too much memory somehow. Additionally, for a long time, our akka frame size has required enlargement due to error that it was too small. It is currently, akka.framesize: 85298139b. Any advice or strategies for tuning this?
  • b

    Bhupendra Yadav

    12/07/2022, 8:00 AM
    Hi everyone, we are using Flink SQL with JDBC connector to fetch data from a PostgreSQL table. We noticed that if we execute a query with WHERE clause via flink, in DB server, the logged query doesn't have WHERE clause anymore. Does flink do a full table scan and filter data(based on where clause) on the fly? If our tables are very large in few hundred GB size(partitioned), it would still do a full table scan? If yes, then wouldn't it be problematic, like a lot of IO & network bandwidth usage?
    Copy code
    -- SELECT c1, c2 FROM table t LIMIT 5
    DB LOG:  execute <unnamed>: SELECT c1, c2 FROM table LIMIT 5
    
    -- SELECT c1, c2 FROM table t WHERE c1=123 LIMIT 5
    DB LOG:  execute <unnamed>: SELECT c1,c2 FROM table
    Any references that can help me understand the query execution better?
    • 1
    • 1
  • t

    Tiansu Yu

    12/07/2022, 9:26 AM
    Problem:
    currentInputWatermark
    stays at
    Long.MIN_VALUE
    . This is the setup I use for watermarks of all sources:
    Copy code
    WatermarkStrategy.<T>forBoundedOutOfOrderness(windowConf.getWatermarkBoundedness())
    				.withIdleness(windowConf.getWatermarkIdleness())
    				.withTimestampAssigner((SerializableTimestampAssigner<T>) (value, recordTimestamp) -> timestampExtractor.apply(value));
    Somehow, only the currentOutputWatermark advances. But the input watermark stays
    Long.MIN_VALUE
    .
  • s

    Suparn Lele

    12/07/2022, 1:24 PM
    Hi Team, I am creating a table in following manner
    Copy code
    val table = streamTableEnvironment.fromValues(
      DataTypes.ROW(
        DataTypes.FIELD("timestamp", DataTypes.STRING()),
        DataTypes.FIELD("organization_id", DataTypes.STRING()),
        DataTypes.FIELD("cluster_id", DataTypes.STRING()),
        DataTypes.FIELD("total", <http://DataTypes.INT|DataTypes.INT>())
      ),
      row("2022-01-01 00:02:00", "A", "B", Int.box(2)),
      row("2022-01-01 00:03:00", "A", "C", Int.box(1))
    )
    After this I am converting it to temporary view
    Copy code
    streamTableEnvironment.createTemporaryView("table1", table)
    After this I am running the following query
    Copy code
    val table2 = streamTableEnvironment.sqlQuery("select cast(`timestamp` as TIMESTAMP(3)) as trunc_time, organization_id, cluster_id, total FROM table1")
    Till this point everything works fine but when I am running the following query
    Copy code
    streamTableEnvironment.createTemporaryView("temp", table2)
    val aggregatedTable = streamTableEnvironment.sqlQuery("select TUMBLE_END(trunc_time, INTERVAL '10' MINUTE) `timestamp`, cluster_id, sum(total) as total from temp group by TUMBLE(trunc_time, INTERVAL '10' MINUTE), organization_id, cluster_id")
    It throws following exception
    Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.
    My settings are as follows
    Copy code
    val streamExecutionEnvironment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    val streamTableEnvironment: StreamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment)
    val unmodifiableCollection: Class[_] = Class.forName("java.util.Collections$UnmodifiableCollection")
    streamExecutionEnvironment.getConfig.addDefaultKryoSerializer(unmodifiableCollection, classOf[UnmodifiableCollectionsSerializer])
    streamExecutionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH)
    Please help. So my general question is how can I apply windowing operations for a table which I have loaded from DB. The table has timestamp columns. And I am running job in batch mode. P.S - Flink - 1.14, Scala - 2.11
    • 1
    • 1
1...373839...98Latest