https://flink.apache.org/ logo
Join Slack
Powered by
# troubleshooting
  • l

    Lucky

    08/27/2024, 12:17 PM
    Hi all, I am trying to sink data to azure blob storage (abfss) but I am getting this exception *org.apache.hadoop.fs.UnsupportedFileSystemExcep:no FileSystem for scheme "file". My output file patha looks like abfss://concon@accountname.dfs.windows.net/files/processed.csv. Can anybody help me. I am stuck on this since 5 days
  • m

    Matt Camp

    08/28/2024, 12:48 AM
    I am having a hell of a time getting the sql client to load my args correctly. If I run the
    sql-client.sh
    one way my init file doesn't load but I am able to manually run commands to register a python UDF function and that all works... if I run it with the init file arg first then the init file loads but then my udf function doesn't get registered correctly.. seeming like my python path is off or the pyfiles arg is borked... this is how I am running things: Init SQL File doesn't load
    Copy code
    #!/nix/store/wckka8fxv4h5hp74cbkhaw3fw7kbvcs1-bash-5.2p26/bin/bash
    set -o errexit
    set -o nounset
    set -o pipefail
    
    export PATH="/nix/store/8p75w2cs6lgzakvz6q9xns7j876d0i8m-gnugrep-3.11/bin:/nix/store/h2gq8hngnsnwphzpq7992cip77lwrphm-gnused-4.9/bin:/nix/store/ddkcg6irdsn0w2q05gphaaw3cblkml69-gawk-5.2.2/bin:/nix/store/lhns6bwqlwfs4z6hd8jf08v4di08qqdy-glibc-2.39-52-bin/bin:/nix/store/p1jwaambbgk5wg963dpn7xq0v200c18v-su-exec-0.2/bin:/nix/store/8sgpjfbmlalr0xvybfj68540kz57rx6c-gosu-1.17/bin:/nix/store/r08ns1vm8vvvz996frwql0c52vwgqaw2-hostname-net-tools-2.10/bin:/nix/store/k0haaab80wycif0k8f5xm8ykdxpq21jy-jemalloc-5.3.0/bin:/nix/store/wckka8fxv4h5hp74cbkhaw3fw7kbvcs1-bash-5.2p26/bin:/nix/store/v8sjbyscx6r58xngbhf0rsdf5czfyf8q-findutils-4.9.0/bin:/nix/store/2wvb4326f069mz8zan43yx6nak6lsjqk-util-linux-2.39.4-bin/bin:/nix/store/xfm4mg874w5n39zbqx24yiw7hmka94n7-coreutils-9.5/bin:/nix/store/hsd3ppzdlkz31qdlnrc2dizylxp7wp9f-openjdk-11.0.23+9/bin:$PATH"
    
    # Ensure FLINK_CONF_DIR is set
    if [ -z "${FLINK_CONF_DIR:-}" ]; then
      export FLINK_CONF_DIR="/nix/store/hv2biq5cpmy4iri7lph12q85lfav91h6-flink-conf-drv/conf"
      echo "FLINK_CONF_DIR set to $FLINK_CONF_DIR"
    else
      echo "FLINK_CONF_DIR already set to $FLINK_CONF_DIR"
    fi
    
    # Additional script logic
    export PYTHONPATH="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/lib/python3.11/site-packages:/nix/store/dkj9ygpkkz2iw9sa7jrnb5khs2q3k4w0-example-flink-job"
    PYFILES="$(echo "$PYTHONPATH" | tr ':' ',')"
    export PATH="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/:$PATH"
    export PYFLINK_PYTHON="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/python"
    export JAVA_HOME="/nix/store/hsd3ppzdlkz31qdlnrc2dizylxp7wp9f-openjdk-11.0.23+9"
    export FLINK_HOME="/nix/store/sn2s860l0dxhf8n5dl4sz46falnilwjg-flink-1.19.1/opt/flink"
    
    echo "PYFILES: $PYFILES"
    echo "PYTHONPATH: $PYTHONPATH"
    
    /nix/store/jxpi70rg23rlppdn3i1qhg91bvrg792y-flink-1.19.1/opt/flink/bin/sql-client.sh -j=/nix/store/dj0w4a1m2lvx7w55bfagibn67qxglhkc-flink-sql-connector-kafka-3.2.0-1.19.jar -pyclientexec=/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/python --pyFiles="$PYFILES" "$@"
    SQL Init File loads but Python is messed up
    Copy code
    #!/nix/store/wckka8fxv4h5hp74cbkhaw3fw7kbvcs1-bash-5.2p26/bin/bash
    set -o errexit
    set -o nounset
    set -o pipefail
    
    export PATH="/nix/store/8p75w2cs6lgzakvz6q9xns7j876d0i8m-gnugrep-3.11/bin:/nix/store/h2gq8hngnsnwphzpq7992cip77lwrphm-gnused-4.9/bin:/nix/store/ddkcg6irdsn0w2q05gphaaw3cblkml69-gawk-5.2.2/bin:/nix/store/lhns6bwqlwfs4z6hd8jf08v4di08qqdy-glibc-2.39-52-bin/bin:/nix/store/p1jwaambbgk5wg963dpn7xq0v200c18v-su-exec-0.2/bin:/nix/store/8sgpjfbmlalr0xvybfj68540kz57rx6c-gosu-1.17/bin:/nix/store/r08ns1vm8vvvz996frwql0c52vwgqaw2-hostname-net-tools-2.10/bin:/nix/store/k0haaab80wycif0k8f5xm8ykdxpq21jy-jemalloc-5.3.0/bin:/nix/store/wckka8fxv4h5hp74cbkhaw3fw7kbvcs1-bash-5.2p26/bin:/nix/store/v8sjbyscx6r58xngbhf0rsdf5czfyf8q-findutils-4.9.0/bin:/nix/store/2wvb4326f069mz8zan43yx6nak6lsjqk-util-linux-2.39.4-bin/bin:/nix/store/xfm4mg874w5n39zbqx24yiw7hmka94n7-coreutils-9.5/bin:/nix/store/hsd3ppzdlkz31qdlnrc2dizylxp7wp9f-openjdk-11.0.23+9/bin:$PATH"
    
    # Ensure FLINK_CONF_DIR is set
    if [ -z "${FLINK_CONF_DIR:-}" ]; then
      export FLINK_CONF_DIR="/nix/store/hv2biq5cpmy4iri7lph12q85lfav91h6-flink-conf-drv/conf"
      echo "FLINK_CONF_DIR set to $FLINK_CONF_DIR"
    else
      echo "FLINK_CONF_DIR already set to $FLINK_CONF_DIR"
    fi
    
    # Additional script logic
    export PYTHONPATH="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/lib/python3.11/site-packages:/nix/store/dkj9ygpkkz2iw9sa7jrnb5khs2q3k4w0-example-flink-job"
    PYFILES="$(echo "$PYTHONPATH" | tr ':' ',')"
    export PATH="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/:$PATH"
    export PYFLINK_PYTHON="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/python"
    export JAVA_HOME="/nix/store/hsd3ppzdlkz31qdlnrc2dizylxp7wp9f-openjdk-11.0.23+9"
    export FLINK_HOME="/nix/store/sn2s860l0dxhf8n5dl4sz46falnilwjg-flink-1.19.1/opt/flink"
    
    echo "PYFILES: $PYFILES"
    echo "PYTHONPATH: $PYTHONPATH"
    
    /nix/store/jxpi70rg23rlppdn3i1qhg91bvrg792y-flink-1.19.1/opt/flink/bin/sql-client.sh \
      -i=/config/packages/examples/example-flink-job/flink.sql \
      -j=/nix/store/dj0w4a1m2lvx7w55bfagibn67qxglhkc-flink-sql-connector-kafka-3.2.0-1.19.jar \
      -pyclientexec=/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/python \
      -pyfs=/nix/store/dkj9ygpkkz2iw9sa7jrnb5khs2q3k4w0-example-flink-job
    This is the error when Python isn't working
    Copy code
    [ERROR] Could not execute SQL statement. Reason:
    org.apache.flink.api.python.shaded.py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
      File "/nix/store/jxpi70rg23rlppdn3i1qhg91bvrg792y-flink-1.19.1/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 2466, in _call_proxy
        return_value = getattr(self.pool[obj_id], method)(*params)
                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/nix/store/jxpi70rg23rlppdn3i1qhg91bvrg792y-flink-1.19.1/opt/flink/opt/python/pyflink.zip/pyflink/java_gateway.py", line 179, in getPythonFunction
        udf_wrapper = getattr(importlib.import_module(moduleName), objectName)
                              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "/nix/store/5w07wfs288qpmnvjywk24f3ak5k1np7r-python3-3.11.9/lib/python3.11/importlib/__init__.py", line 126, in import_module
        return _bootstrap._gcd_import(name[level:], package, level)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
      File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
      File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
      File "<frozen importlib._bootstrap>", line 1126, in _find_and_load_unlocked
      File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
      File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
      File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
      File "<frozen importlib._bootstrap>", line 1140, in _find_and_load_unlocked
    ModuleNotFoundError: No module named '\n    jobs'
    This happens if I try to call the udf function
    • 1
    • 3
  • a

    Arthur Catrisse

    08/28/2024, 8:39 AM
    Hello ! We're running into an issue when deploying flink on k8s using the
    flink-kubernetes-operator
    Occasionally, when a JobManager gets rotated out (by karpenter in our case), the next JobManager is incapable of getting into a stable state and is stuck in a crash loop by a
    DuplicateJobSubmissionException
    We did increase the
    terminationGracePeriodSeconds
    but it doesn't seem to help. Is it expected that the operator isn't able get jobmanagers back into a stable state ? Perhaps we configured something wrong ? Thanks ⬇️ our configurations in thread
    • 1
    • 1
  • a

    Ashvin S

    08/28/2024, 9:49 AM
    Hi, we are using flink-kubernetes-operator Is it possible to remove CPU limit from taskmanager and jobmanager template? I see this, but
    TASK_MANAGER_CPU_LIMIT_FACTOR
    cannot be lesser than 1 as stated in the source code.
  • v

    Vishva Mahadevan

    08/28/2024, 10:02 AM
    Hi everyone, Currenlty In a operator we are keying by using pimkey (uuid), sometimes the key can be null from the source . the volume we are processing is 2TB, The percentage of key is null is 40% and one task manager can't take that up, it causing our batch job fail. How can i distribute the data evenly for the null cases. can anyone help here?
  • v

    VISHAL B

    08/28/2024, 2:10 PM
    Hi everyone, I'm Vishal, and I work as a Data Engineer. I have been working with Flink for the past year, and I would like to become a Committer. To achieve this, I need to understand the Flink codebase better. However, I'm not sure where to start. Can anyone guide me on which features or parts of the Flink streaming codebase I should focus on first, and how to approach other features step by step? Thank you!
  • m

    Matan Perelmuter

    08/28/2024, 7:39 PM
    Hi, I have this code which create view from a datastream
    Copy code
    tEnv.createTemporaryView("view", ds,
         Schema.newBuilder()
           .columnByExpression("proc_time", "PROCTIME()")
           .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
           .watermark("rowtime", "SOURCE_WATERMARK()")
         .build()
    );
    I'm trying yo upgrade from flink 1.18 to 1.19 and I get an error, but from the documentation it doesn't seem like anything has changed
    Copy code
    org.apache.flink.table.api.ValidationException: Invalid expression for watermark 'WATERMARK FOR `rowtime` AS [SOURCE_WATERMARK()]'.
    any idea?
    • 1
    • 1
  • s

    Saketh

    08/29/2024, 5:30 AM
    I'm encountering an issue with a job that's repeatedly restarting due to an "Unrecognized Property Exception." I've modified the code to address this and updated the old table schema. Now, I need to stop the job in Kubernetes from a specific savepoint so it can resume from that point without losing any data. However, I'm facing a "Checkpoint Exception" error stating that "not all required tasks are running." Could someone help me resolve this?
  • b

    Boris Zubov

    08/29/2024, 7:36 AM
    Hi, I am using Flink 1.15 with the old Kafka source function. I have a Kafka source chained with two transformations, displayed as one chained operator in web ui. Here’s my code:
    Copy code
    val myConsumer = new FlinkKafkaConsumer<>("myTopic", new MySchema(kafkaConf.schemaRegistry), properties)
    myConsumer.setStartFromTimestamp(someTimestamp)
    
    val myStream = env
      .addSource(myConsumer)
      .name(s"${myConsumer.topic}_v1").uid(s"${myConsumer.topic}_v1")
      .assignTimestampsAndWatermarks(
        WatermarkStrategy.forMonotonousTimestamps[MyData]().withIdleness(Duration.ofSeconds(20))
      )
      .name("Data with watermarks").uid("Data with watermarks")
      .map(Entry.fromInput(_))
      .name("Data to entry").uid("Data to entry")
      .unNone // custom syntax
      .name("Entry not empty").uid("Entry not empty")
      .keyBy(_.id)
    What I'm doing: I am changing the UID at the head of the chain to
    .name(s"${myConsumer.topic}_v2").uid(s"${myConsumer.topic}_v2")
    and then restoring from a savepoint with the
    --allowNonRestoredState
    flag. What I expect to happen: The
    myConsumer
    should read from the offsets starting from the provided timestamp. What actually happens: The
    myConsumer
    tries to restore from the state and falls back to the earliest offsets. Why? Will switching to the new DataSource API help? I would appreciate any help with this issue.
    a
    • 2
    • 5
  • a

    Aravind Musigumpula

    08/29/2024, 9:11 AM
    Hi, I am trying out Flink SQL to sink to opensearch which I am able to do. In our use case we also want to query the data in opensearch, can we use Flink SQL to do so? Here, https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/overview/ , it is mentioned that opensearch has only sink connector, Is there any reason why it doesn't have a source connector for opensearch ? Can I look into direction of building a source connector for opensearch? Please suggest me.
  • a

    Akash Patel

    08/29/2024, 4:30 PM
    Hi, I am using flink k8s operator and running flink session cluster in HA mode - Config- high-availability.type: “KUBERNETES” high-availability.storageDir: “/some/aks/pvc/path” And setting replicas to 3. I see all 3 pods are coming up. One is leader and 2 are in standby. But the issue I am having is when I restart the leader pod, leader is not switching to available options instead waiting for the old leader to restart and reassign leader tag to it. Causing all task managers to restart. Does anyone know if am missing any configuration? Flink 1.18
    a
    • 2
    • 3
  • a

    Aly Ayman

    08/29/2024, 5:50 PM
    I have a big problem.. we have a simple app that read from kafka and write into Filesystem The throughput is 70Mb/s and the messages count is 400k per second I tried to develop a new app that parse csv message into Pojo class and reduce number of columns from 250 to 70 .. this parsing is very processing intensive (I don’t know why) Flink read from 16 partitions from 5 broker and I tried set parallelism of flink app 16 but I get back pressure so I modified the code do to the parsing in map function rather than in consumer operator and parallelism this map operator to 108 parallelism (I have a machine with 104 core) but I can’t get the first app throughput And the busy metric of mapping cores is 90% !! 😂 It was a really disappointing because it is a simple app where this degraded performance comes from
    k
    • 2
    • 16
  • w

    windwheel

    08/30/2024, 1:08 AM
    Has anyone used pyflink to run batch tasks? I would like to ask a question.
  • w

    windwheel

    08/30/2024, 1:12 AM
    Currently we are using pyflink1.16.1 to read data from
    flink-connector-clickhouse
    . Due to the historical reasons of the company's framework, pyflink has to be used. Since in batch mode, pyflink1.16.1 only supports writing udf through pandas udf. . I wrote a multi-column switching function, which ran stably in SQL. I performed memory-based tuning twice. Unfortunately, the parameters of my first tuning were lost. But I roughly remember that they were adjusted
    Copy code
    taskmanager.memory.process.size: 4gb
    taskmanager.memory.network.fraction: 0.1
    taskmanager.memory.managed.fraction: 0.4
    make it effective But when I tuned for the second time, when pyflink executed the
    over window aggration
    operator The operator is always in the INITIALIZING state and no data flows in. The parameters are as follows
    Copy code
    taskmanager.memory.process.size: 4gb
    taskmanager.memory.network.fraction: 0.3
    taskmanager.memory.managed.fraction: 0.45
    taskmanager.memory.jvm-overhead.fraction: 0.1
    taskmanager.memory.framework.off-heap.size: 128mb
    taskmanager.memory.managed.consumer-weights: OPERATOR:60,STATE_BACKEND:60,PYTHON:40
    Since there is too little information about pyflink on the Internet, after reading the source code, judge based on the logs Log: Obtained shared Python process of size 536870920 bytes It may be that the python interpreter process estimated by pyflink based on managed memory requires too much memory. The machine does not have too much memory and cannot start the python interpreter process. What makes me curious is that the total memory of managed memory is configured as 4g. Why is the estimated memory so large? What configuration will I have to do to make it receive data properly and send it downstream? Slack Conversation
  • w

    windwheel

    08/30/2024, 1:12 AM
    before: Slack Conversation
  • w

    windwheel

    08/30/2024, 1:12 AM
    after: Slack Conversation
  • w

    windwheel

    08/30/2024, 1:13 AM
    over window aggration :
    Copy code
    parallelism = self.param.get('parallelism', 1)
    
    result = t_env.execute_sql(""" CREATE TABLE clickhouse_sample_verfired (  
                                        node_id BIGINT, 
                                        uuid STRING, 
                                        batch_id STRING,  
                                        device_time TIMESTAMP(3),  
                                        device_timestamp BIGINT,   
                                        key_id STRING,   
                                        str_v STRING,   
                                        is_delete INTEGER,  
                                        create_time TIMESTAMP(3),  
                                    PRIMARY KEY (uuid) NOT ENFORCED 
                                ) WITH (  
                                    'connector' = 'clickhouse',   
                                    'url' = '<clickhouse://192.168.30.74:30123?compress_algorithm=gzip>',   
                                    'username' = 'admin', 
                                    'password' = 'Cljslrl0620.', 
                                    'database-name' = 'ddps',  
                                    'table-name' = 'ts_kv_iot_main', 
                                    'scan.partition.column' = '{}',
                                    'scan.partition.num' = '{}',  
                                    'sink.batch-size' = '500',   
                                    'sink.flush-interval' = '1000',  
                                    'sink.max-retries' = '3' ); """.format("device_time", parallelism))
    
    # 对传入的时间区间做合并 收集需要过滤的nodeid列表
    
    node_list = []
    time_list = []
    for i in range(0, len(node_sharding)):
        tmp_node = node_sharding[i]
        if len(tmp_node):
            node_list.append(tmp_node['node_id'])
            time_list.append(
                [tmp_node['clickhouse_start_time'], tmp_node['clickhouse_end_time'], tmp_node['node_id']])
    
    node_list_len = len(node_list)
    node_list = str(node_list).strip("[]")
    
    combine_time_list = merge_interval(time_list)
    
    time_list = transfer_format(combine_time_list)
    
    time_combine_str = ""
    for time in time_list:
        time_condition = "device_time >= " + "'" + time[0] + "'" + " AND " + "device_time <= " + "'" + time[
            1] + "'" + " AND "
        time_combine_str = time_combine_str + time_condition
    
    # 对传入的时间区间做合并, 收集nodeid list
    
    t_env.create_temporary_system_function("format_time", stream_format_time_refactor)
    
    t_env.create_temporary_system_function("pivot_multi_column", group_device_data_agg)
    
    t_env.execute_sql("""
    create table print_table (
            nodeId BIGINT,
            uuid STRING,
            pageBatchID STRING,
            device_time TIMESTAMP(3),
            device_timestamp BIGINT,
            uniq_array STRING
            ) WITH (
                'connector' = 'print'
            )""")
    # 做设备的行转列逻辑
    # table = t_env.from_path("clickhouse_sample_verfired").group_by(col('node_id'), col('uuid'), col('batch_id'),
    #                                                                col('device_time'), col('device_timestamp')) \
    #     .flat_aggregate(call("pivot", col('key_id'), col('str_v')).alias("uniq_array")) \
    #     .select(col('node_id'), col('uuid'), col('batch_id'))
    
    sql = """ SELECT node_id AS nodeId,  
                             uuid,
                             batch_id AS pageBatchID,  
                             device_time, 
                             device_timestamp, 
                             format_time(device_timestamp, device_time) as new_time,
                             pivot_multi_column(key_id, str_v) 
                             over (PARTITION BY 
                             node_id, uuid, batch_id, device_time, device_timestamp
                              ORDER BY device_time
                                 )  AS uniq_array
                       FROM clickhouse_sample_verfired  
                       WHERE  {}
                         node_id in ( {} )  """.format(time_combine_str, node_list)
    
    table = t_env.sql_query(sql)
    stream = t_env.to_append_stream(table=table, type_info=Types.ROW([Types.LONG(), Types.STRING(), Types.STRING(),
                                          Types.SQL_TIMESTAMP(), Types.LONG(), Types.SQL_TIMESTAMP(), Types.STRING()]))
    stream = stream.flat_map(FeatureTuple())
    # stream.execute_and_collect("source_test")
    return stream
    udf:
    Copy code
    @udf(result_type=DataTypes.TIMESTAMP(3), func_type="pandas")
    def stream_format_time_refactor(deviceTimeStamp: pd.Series, deviceTime: pd.Series) -> pd.Series:
        time_len = len(deviceTime)
    
        result_series_index = []
        result_series_data = []
        for i in range(time_len):
            result_series_index.append(i)
            current_device_time = str(deviceTime[i])
            timearray = time.strptime(current_device_time, '%Y-%m-%d %H:%M:%S')
            timestamp = int(time.mktime(timearray))
            current_time = timestamp * 1000
            result_series_data.append(current_time)
    
        result = pd.Series(data=result_series_data, index=result_series_index)
        return result
    
    
    @udaf(result_type=DataTypes.STRING(), func_type="pandas")
    def group_device_data_agg(keyId: pd.Series, strV: pd.Series) -> str:
        df = pd.DataFrame({"key_id": keyId, "str_v": strV})
    
        item_len = len(keyId)
    
        dict_list = []
        for i in range(item_len):
            item = keyId[i] + '@' + batch_replace_string(strV[i], ',', ';')
            dict_list.append(item)
        # concat(key_id, '@', replaceAll(str_v,',',';'))
        result_str = str(dict_list)
        return result_str
    Slack Conversation
  • w

    windwheel

    08/30/2024, 1:13 AM
    Is there any community friends or experts who are deeply using pyflink who can give some answers? Due to historical reasons, we hope to use pyflink extensively in the next version, and hope to get some suggestions or configuration strategies. After forming the plan, we will try our best to give back to the community. Slack Conversation
  • i

    Ihor

    08/30/2024, 11:10 AM
    Hello! I have a question, is it possible that
    onTimer
    method in
    CoProcessFunction
    can be executed concurrently? For example if I scheduled a few timers for the next minute, and after that I got an event with timestamp +X minutes. Or another case when pipeline was stopped/failed, and was restarted from checkpoint with a lot of timers due. Will these scheduled timers all run simultaneously or will be run one by one?
  • k

    Ken Krugler

    08/30/2024, 1:38 PM
    The onTimer method will be called sequentially, not in parallel.
  • m

    Matt Braymer-Hayes

    08/30/2024, 3:58 PM
    🌄 👋🏻 Hi folks, question: when iterating over
    MapState
    (e.g.,
    MapState.keys()
    ,
    MapState.values()
    ,
    MapState.entries()
    ), are there any ordering guarantees (e.g., lexicographical order)?
    k
    • 2
    • 2
  • k

    Ken Krugler

    08/30/2024, 4:22 PM
    I don’t believe so. Even if one state backend did guarantee this, another might not (e.g. the HashMap-based in-memory backend provides no such guarantee).
    gratitude thank you 1
  • a

    Aly Ayman

    09/01/2024, 12:22 PM
    I want to compress the records using snappy In file system sink but this error comes
    java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()'
    at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
    at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
    at org.apache.hadoop.io.compress.SnappyCodec.getCompressorType(SnappyCodec.java:136)
    at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
    at org.apache.hadoop.io.compress.CompressionCodec$Util.createOutputStreamWithCodecPool(CompressionCodec.java:131)
    at org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:101)
    d
    • 2
    • 14
  • y

    Yiorgos Panayiotakis

    09/02/2024, 1:09 PM
    Hello, I am trying to deploy a Flink app on a Kubernetes cluster (application-mode) using the Flink Kubernetes operator. The issue is that my app executes an SQL insert statement and later on an update statement using Flink SQL API in batch mode with tableEnv.executeSql back to back statements. When deploying the app, this leads to an error like the following :
    "Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment"
    How can I avoid this issue and deploy the application without having to create different apps for insert and update ?
    d
    • 2
    • 8
  • s

    Shreeram Narayanan

    09/03/2024, 3:31 AM
    Hi Flinkers, I am trying to read Files (in byte array format) and these are legacy system files (mainframe). these will be files with millions to 100s of millions of records in them and I want to read them in my Flink Job. I am contemplating which API to use for this? Table API vs DataStream API? Could you please help me out with what approach to go with for this kind of use case?
  • a

    Aravind Musigumpula

    09/03/2024, 5:47 AM
    Hi, I have created a flink table using jdbc mysql conector. Then I gave a select query on that table, it gives only the existing records, when a new row is added or updated in mysql it is not getting reflected in select results, I am new to flink and trying to understand how to get the realtime result in flink select query. Following is the create table command that I have used:
    Copy code
    CREATE TABLE users_mysql (
        account_id BIGINT,
        id BIGINT,
        name STRING
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:<mysql://mysql:3306/freshservice>',
        'table-name' = 'tdetails',
        'username' = 'root',
        'password' = 'root',
        'lookup.cache.max-rows' = '5000',
        'lookup.cache.ttl' = '10min'
    );
  • m

    Marco Scalerandi

    09/03/2024, 8:31 AM
    I'm using Apache Flink 1.15 with RocksDB state backend. Is the Compaction filter strategy enabled by default?
    d
    • 2
    • 45
  • j

    Jaideep C

    09/03/2024, 9:46 AM
    Hi all, How can I volume mount Flink?
    Copy code
    services:
      jobmanager:
        image: flink:1.20.0-scala_2.12
        ports:
          - "8081:8081"
        command: jobmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager
            rest.flamegraph.enabled: true
    
      taskmanager:
        image: flink:1.20.0-scala_2.12
        depends_on:
          - jobmanager
        command: taskmanager
        scale: 2
        environment:
          FLINK_PROPERTIES: |
            jobmanager.rpc.address: jobmanager
            taskmanager.numberOfTaskSlots: 2
            rest.flamegraph.enabled: true
    So that when I restart the docker I can have my rocksdb state and my job both resumed. Currently my job and the state both disappear. I am uploading the job via the webui. Thanks
    d
    • 2
    • 2
  • a

    Alessio Bernesco Làvore

    09/03/2024, 10:43 AM
    Hello, I have a local (non dockerized) test setup working nice with: Flink (1.19), High Availability through Zookeeper, Hive Catalog on PostgresSQL for the SQL Client, Kafka integrations, etc. Now i'm moving to docker compose, everything works fine but one: starting the SQL Client it doesn't find any custom catalog, but, if i search for them in Hive, or I recreate them from the SQL Client (
    CREATE CATALOG hive_catalog WITH ('type' = 'hive','hive-conf-dir' = '/opt/flink/conf');
    ) then all the previous tables created during the previous sessions are available, so the catalog is persisted in Hive/Postgres but seems it's just not available on startup. The catalog is configured as a file catalog and persisted on a mounted volume in docker: From flink config.yaml
    Copy code
    table:
      catalog-store:
        kind: file
        file:
          path: file:///opt/flink/catalogs/
    From compose:
    Copy code
    jobmanager:
        image: flink:1.19
        volumes:
          - ./jobmanager/:/tmp/
          - ./jobmanager/:/opt/flink/flink-web
          - ./conf/config.yaml:/opt/flink/conf/config.yaml
          - ./catalogs/:/opt/flink/catalogs/
          - ./conf/hive-site.xml:/opt/flink/conf/hive-site.xml
    I also moved a hive_catalog.yaml file retrieved from the non dockerized setup inside the catalogs directory but nothing changes
    Copy code
    type: "hive"
    hive-conf-dir: "/opt/flink/conf"
    Any hints? Thanks!
    r
    • 2
    • 4
  • p

    Paul Annesley

    09/03/2024, 11:21 AM
    Hi! Can two state descriptors be accessed at the same time from
    applyToKeyedState
    , e.g. to copy items from one state to another? I'm writing a Flink job that uses Broadcast State to conditionally buffer data from the main keyed stream into state, and then trigger processing that data via events on the broadcast stream. I see that in
    processBroadcastElement
    I can process all keys of a state descriptor using
    applyToKeyedState(descriptor, (key, state) -> { … })
    but I need access to two descriptors at once to move items between two MapStates. Is this possible?
    d
    • 2
    • 3
1...9495969798Latest