Lucky
08/27/2024, 12:17 PMMatt Camp
08/28/2024, 12:48 AMsql-client.sh
one way my init file doesn't load but I am able to manually run commands to register a python UDF function and that all works... if I run it with the init file arg first then the init file loads but then my udf function doesn't get registered correctly.. seeming like my python path is off or the pyfiles arg is borked... this is how I am running things:
Init SQL File doesn't load
#!/nix/store/wckka8fxv4h5hp74cbkhaw3fw7kbvcs1-bash-5.2p26/bin/bash
set -o errexit
set -o nounset
set -o pipefail
export PATH="/nix/store/8p75w2cs6lgzakvz6q9xns7j876d0i8m-gnugrep-3.11/bin:/nix/store/h2gq8hngnsnwphzpq7992cip77lwrphm-gnused-4.9/bin:/nix/store/ddkcg6irdsn0w2q05gphaaw3cblkml69-gawk-5.2.2/bin:/nix/store/lhns6bwqlwfs4z6hd8jf08v4di08qqdy-glibc-2.39-52-bin/bin:/nix/store/p1jwaambbgk5wg963dpn7xq0v200c18v-su-exec-0.2/bin:/nix/store/8sgpjfbmlalr0xvybfj68540kz57rx6c-gosu-1.17/bin:/nix/store/r08ns1vm8vvvz996frwql0c52vwgqaw2-hostname-net-tools-2.10/bin:/nix/store/k0haaab80wycif0k8f5xm8ykdxpq21jy-jemalloc-5.3.0/bin:/nix/store/wckka8fxv4h5hp74cbkhaw3fw7kbvcs1-bash-5.2p26/bin:/nix/store/v8sjbyscx6r58xngbhf0rsdf5czfyf8q-findutils-4.9.0/bin:/nix/store/2wvb4326f069mz8zan43yx6nak6lsjqk-util-linux-2.39.4-bin/bin:/nix/store/xfm4mg874w5n39zbqx24yiw7hmka94n7-coreutils-9.5/bin:/nix/store/hsd3ppzdlkz31qdlnrc2dizylxp7wp9f-openjdk-11.0.23+9/bin:$PATH"
# Ensure FLINK_CONF_DIR is set
if [ -z "${FLINK_CONF_DIR:-}" ]; then
export FLINK_CONF_DIR="/nix/store/hv2biq5cpmy4iri7lph12q85lfav91h6-flink-conf-drv/conf"
echo "FLINK_CONF_DIR set to $FLINK_CONF_DIR"
else
echo "FLINK_CONF_DIR already set to $FLINK_CONF_DIR"
fi
# Additional script logic
export PYTHONPATH="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/lib/python3.11/site-packages:/nix/store/dkj9ygpkkz2iw9sa7jrnb5khs2q3k4w0-example-flink-job"
PYFILES="$(echo "$PYTHONPATH" | tr ':' ',')"
export PATH="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/:$PATH"
export PYFLINK_PYTHON="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/python"
export JAVA_HOME="/nix/store/hsd3ppzdlkz31qdlnrc2dizylxp7wp9f-openjdk-11.0.23+9"
export FLINK_HOME="/nix/store/sn2s860l0dxhf8n5dl4sz46falnilwjg-flink-1.19.1/opt/flink"
echo "PYFILES: $PYFILES"
echo "PYTHONPATH: $PYTHONPATH"
/nix/store/jxpi70rg23rlppdn3i1qhg91bvrg792y-flink-1.19.1/opt/flink/bin/sql-client.sh -j=/nix/store/dj0w4a1m2lvx7w55bfagibn67qxglhkc-flink-sql-connector-kafka-3.2.0-1.19.jar -pyclientexec=/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/python --pyFiles="$PYFILES" "$@"
SQL Init File loads but Python is messed up
#!/nix/store/wckka8fxv4h5hp74cbkhaw3fw7kbvcs1-bash-5.2p26/bin/bash
set -o errexit
set -o nounset
set -o pipefail
export PATH="/nix/store/8p75w2cs6lgzakvz6q9xns7j876d0i8m-gnugrep-3.11/bin:/nix/store/h2gq8hngnsnwphzpq7992cip77lwrphm-gnused-4.9/bin:/nix/store/ddkcg6irdsn0w2q05gphaaw3cblkml69-gawk-5.2.2/bin:/nix/store/lhns6bwqlwfs4z6hd8jf08v4di08qqdy-glibc-2.39-52-bin/bin:/nix/store/p1jwaambbgk5wg963dpn7xq0v200c18v-su-exec-0.2/bin:/nix/store/8sgpjfbmlalr0xvybfj68540kz57rx6c-gosu-1.17/bin:/nix/store/r08ns1vm8vvvz996frwql0c52vwgqaw2-hostname-net-tools-2.10/bin:/nix/store/k0haaab80wycif0k8f5xm8ykdxpq21jy-jemalloc-5.3.0/bin:/nix/store/wckka8fxv4h5hp74cbkhaw3fw7kbvcs1-bash-5.2p26/bin:/nix/store/v8sjbyscx6r58xngbhf0rsdf5czfyf8q-findutils-4.9.0/bin:/nix/store/2wvb4326f069mz8zan43yx6nak6lsjqk-util-linux-2.39.4-bin/bin:/nix/store/xfm4mg874w5n39zbqx24yiw7hmka94n7-coreutils-9.5/bin:/nix/store/hsd3ppzdlkz31qdlnrc2dizylxp7wp9f-openjdk-11.0.23+9/bin:$PATH"
# Ensure FLINK_CONF_DIR is set
if [ -z "${FLINK_CONF_DIR:-}" ]; then
export FLINK_CONF_DIR="/nix/store/hv2biq5cpmy4iri7lph12q85lfav91h6-flink-conf-drv/conf"
echo "FLINK_CONF_DIR set to $FLINK_CONF_DIR"
else
echo "FLINK_CONF_DIR already set to $FLINK_CONF_DIR"
fi
# Additional script logic
export PYTHONPATH="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/lib/python3.11/site-packages:/nix/store/dkj9ygpkkz2iw9sa7jrnb5khs2q3k4w0-example-flink-job"
PYFILES="$(echo "$PYTHONPATH" | tr ':' ',')"
export PATH="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/:$PATH"
export PYFLINK_PYTHON="/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/python"
export JAVA_HOME="/nix/store/hsd3ppzdlkz31qdlnrc2dizylxp7wp9f-openjdk-11.0.23+9"
export FLINK_HOME="/nix/store/sn2s860l0dxhf8n5dl4sz46falnilwjg-flink-1.19.1/opt/flink"
echo "PYFILES: $PYFILES"
echo "PYTHONPATH: $PYTHONPATH"
/nix/store/jxpi70rg23rlppdn3i1qhg91bvrg792y-flink-1.19.1/opt/flink/bin/sql-client.sh \
-i=/config/packages/examples/example-flink-job/flink.sql \
-j=/nix/store/dj0w4a1m2lvx7w55bfagibn67qxglhkc-flink-sql-connector-kafka-3.2.0-1.19.jar \
-pyclientexec=/nix/store/x74v1acazbqcr1yhfidyyx452xnyvih4-python3-3.11.9-env/bin/python \
-pyfs=/nix/store/dkj9ygpkkz2iw9sa7jrnb5khs2q3k4w0-example-flink-job
This is the error when Python isn't working
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.api.python.shaded.py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
File "/nix/store/jxpi70rg23rlppdn3i1qhg91bvrg792y-flink-1.19.1/opt/flink/opt/python/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 2466, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/jxpi70rg23rlppdn3i1qhg91bvrg792y-flink-1.19.1/opt/flink/opt/python/pyflink.zip/pyflink/java_gateway.py", line 179, in getPythonFunction
udf_wrapper = getattr(importlib.import_module(moduleName), objectName)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/nix/store/5w07wfs288qpmnvjywk24f3ak5k1np7r-python3-3.11.9/lib/python3.11/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
File "<frozen importlib._bootstrap>", line 1126, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
File "<frozen importlib._bootstrap>", line 1204, in _gcd_import
File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
File "<frozen importlib._bootstrap>", line 1140, in _find_and_load_unlocked
ModuleNotFoundError: No module named '\n jobs'
This happens if I try to call the udf functionArthur Catrisse
08/28/2024, 8:39 AMflink-kubernetes-operator
Occasionally, when a JobManager gets rotated out (by karpenter in our case), the next JobManager is incapable of getting into a stable state and is stuck in a crash loop by a DuplicateJobSubmissionException
We did increase the terminationGracePeriodSeconds
but it doesn't seem to help.
Is it expected that the operator isn't able get jobmanagers back into a stable state ? Perhaps we configured something wrong ?
Thanks
⬇️ our configurations in threadAshvin S
08/28/2024, 9:49 AMTASK_MANAGER_CPU_LIMIT_FACTOR
cannot be lesser than 1 as stated in the source code.Vishva Mahadevan
08/28/2024, 10:02 AMVISHAL B
08/28/2024, 2:10 PMMatan Perelmuter
08/28/2024, 7:39 PMtEnv.createTemporaryView("view", ds,
Schema.newBuilder()
.columnByExpression("proc_time", "PROCTIME()")
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build()
);
I'm trying yo upgrade from flink 1.18 to 1.19 and I get an error, but from the documentation it doesn't seem like anything has changed
org.apache.flink.table.api.ValidationException: Invalid expression for watermark 'WATERMARK FOR `rowtime` AS [SOURCE_WATERMARK()]'.
any idea?Saketh
08/29/2024, 5:30 AMBoris Zubov
08/29/2024, 7:36 AMval myConsumer = new FlinkKafkaConsumer<>("myTopic", new MySchema(kafkaConf.schemaRegistry), properties)
myConsumer.setStartFromTimestamp(someTimestamp)
val myStream = env
.addSource(myConsumer)
.name(s"${myConsumer.topic}_v1").uid(s"${myConsumer.topic}_v1")
.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[MyData]().withIdleness(Duration.ofSeconds(20))
)
.name("Data with watermarks").uid("Data with watermarks")
.map(Entry.fromInput(_))
.name("Data to entry").uid("Data to entry")
.unNone // custom syntax
.name("Entry not empty").uid("Entry not empty")
.keyBy(_.id)
What I'm doing: I am changing the UID at the head of the chain to .name(s"${myConsumer.topic}_v2").uid(s"${myConsumer.topic}_v2")
and then restoring from a savepoint with the --allowNonRestoredState
flag.
What I expect to happen: The myConsumer
should read from the offsets starting from the provided timestamp.
What actually happens: The myConsumer
tries to restore from the state and falls back to the earliest offsets. Why?
Will switching to the new DataSource API help?
I would appreciate any help with this issue.Aravind Musigumpula
08/29/2024, 9:11 AMAkash Patel
08/29/2024, 4:30 PMAly Ayman
08/29/2024, 5:50 PMwindwheel
08/30/2024, 1:08 AMwindwheel
08/30/2024, 1:12 AMflink-connector-clickhouse
.
Due to the historical reasons of the company's framework, pyflink has to be used.
Since in batch mode, pyflink1.16.1 only supports writing udf through pandas udf.
. I wrote a multi-column switching function, which ran stably in SQL. I performed memory-based tuning twice.
Unfortunately, the parameters of my first tuning were lost. But I roughly remember that they were adjusted
taskmanager.memory.process.size: 4gb
taskmanager.memory.network.fraction: 0.1
taskmanager.memory.managed.fraction: 0.4
make it effective
But when I tuned for the second time, when pyflink executed the over window aggration
operator
The operator is always in the INITIALIZING state and no data flows in. The parameters are as follows
taskmanager.memory.process.size: 4gb
taskmanager.memory.network.fraction: 0.3
taskmanager.memory.managed.fraction: 0.45
taskmanager.memory.jvm-overhead.fraction: 0.1
taskmanager.memory.framework.off-heap.size: 128mb
taskmanager.memory.managed.consumer-weights: OPERATOR:60,STATE_BACKEND:60,PYTHON:40
Since there is too little information about pyflink on the Internet, after reading the source code, judge based on the logs
Log: Obtained shared Python process of size 536870920 bytes
It may be that the python interpreter process estimated by pyflink based on managed memory requires too much memory.
The machine does not have too much memory and cannot start the python interpreter process.
What makes me curious is that the total memory of managed memory is configured as 4g. Why is the estimated memory so large?
What configuration will I have to do to make it receive data properly and send it downstream?
Slack Conversationwindwheel
08/30/2024, 1:12 AMwindwheel
08/30/2024, 1:12 AMwindwheel
08/30/2024, 1:13 AMparallelism = self.param.get('parallelism', 1)
result = t_env.execute_sql(""" CREATE TABLE clickhouse_sample_verfired (
node_id BIGINT,
uuid STRING,
batch_id STRING,
device_time TIMESTAMP(3),
device_timestamp BIGINT,
key_id STRING,
str_v STRING,
is_delete INTEGER,
create_time TIMESTAMP(3),
PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = '<clickhouse://192.168.30.74:30123?compress_algorithm=gzip>',
'username' = 'admin',
'password' = 'Cljslrl0620.',
'database-name' = 'ddps',
'table-name' = 'ts_kv_iot_main',
'scan.partition.column' = '{}',
'scan.partition.num' = '{}',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3' ); """.format("device_time", parallelism))
# 对传入的时间区间做合并 收集需要过滤的nodeid列表
node_list = []
time_list = []
for i in range(0, len(node_sharding)):
tmp_node = node_sharding[i]
if len(tmp_node):
node_list.append(tmp_node['node_id'])
time_list.append(
[tmp_node['clickhouse_start_time'], tmp_node['clickhouse_end_time'], tmp_node['node_id']])
node_list_len = len(node_list)
node_list = str(node_list).strip("[]")
combine_time_list = merge_interval(time_list)
time_list = transfer_format(combine_time_list)
time_combine_str = ""
for time in time_list:
time_condition = "device_time >= " + "'" + time[0] + "'" + " AND " + "device_time <= " + "'" + time[
1] + "'" + " AND "
time_combine_str = time_combine_str + time_condition
# 对传入的时间区间做合并, 收集nodeid list
t_env.create_temporary_system_function("format_time", stream_format_time_refactor)
t_env.create_temporary_system_function("pivot_multi_column", group_device_data_agg)
t_env.execute_sql("""
create table print_table (
nodeId BIGINT,
uuid STRING,
pageBatchID STRING,
device_time TIMESTAMP(3),
device_timestamp BIGINT,
uniq_array STRING
) WITH (
'connector' = 'print'
)""")
# 做设备的行转列逻辑
# table = t_env.from_path("clickhouse_sample_verfired").group_by(col('node_id'), col('uuid'), col('batch_id'),
# col('device_time'), col('device_timestamp')) \
# .flat_aggregate(call("pivot", col('key_id'), col('str_v')).alias("uniq_array")) \
# .select(col('node_id'), col('uuid'), col('batch_id'))
sql = """ SELECT node_id AS nodeId,
uuid,
batch_id AS pageBatchID,
device_time,
device_timestamp,
format_time(device_timestamp, device_time) as new_time,
pivot_multi_column(key_id, str_v)
over (PARTITION BY
node_id, uuid, batch_id, device_time, device_timestamp
ORDER BY device_time
) AS uniq_array
FROM clickhouse_sample_verfired
WHERE {}
node_id in ( {} ) """.format(time_combine_str, node_list)
table = t_env.sql_query(sql)
stream = t_env.to_append_stream(table=table, type_info=Types.ROW([Types.LONG(), Types.STRING(), Types.STRING(),
Types.SQL_TIMESTAMP(), Types.LONG(), Types.SQL_TIMESTAMP(), Types.STRING()]))
stream = stream.flat_map(FeatureTuple())
# stream.execute_and_collect("source_test")
return stream
udf:
@udf(result_type=DataTypes.TIMESTAMP(3), func_type="pandas")
def stream_format_time_refactor(deviceTimeStamp: pd.Series, deviceTime: pd.Series) -> pd.Series:
time_len = len(deviceTime)
result_series_index = []
result_series_data = []
for i in range(time_len):
result_series_index.append(i)
current_device_time = str(deviceTime[i])
timearray = time.strptime(current_device_time, '%Y-%m-%d %H:%M:%S')
timestamp = int(time.mktime(timearray))
current_time = timestamp * 1000
result_series_data.append(current_time)
result = pd.Series(data=result_series_data, index=result_series_index)
return result
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def group_device_data_agg(keyId: pd.Series, strV: pd.Series) -> str:
df = pd.DataFrame({"key_id": keyId, "str_v": strV})
item_len = len(keyId)
dict_list = []
for i in range(item_len):
item = keyId[i] + '@' + batch_replace_string(strV[i], ',', ';')
dict_list.append(item)
# concat(key_id, '@', replaceAll(str_v,',',';'))
result_str = str(dict_list)
return result_str
Slack Conversationwindwheel
08/30/2024, 1:13 AMIhor
08/30/2024, 11:10 AMonTimer
method in CoProcessFunction
can be executed concurrently? For example if I scheduled a few timers for the next minute, and after that I got an event with timestamp +X minutes. Or another case when pipeline was stopped/failed, and was restarted from checkpoint with a lot of timers due. Will these scheduled timers all run simultaneously or will be run one by one?Ken Krugler
08/30/2024, 1:38 PMMatt Braymer-Hayes
08/30/2024, 3:58 PMMapState
(e.g., MapState.keys()
, MapState.values()
, MapState.entries()
), are there any ordering guarantees (e.g., lexicographical order)?Ken Krugler
08/30/2024, 4:22 PMAly Ayman
09/01/2024, 12:22 PMjava.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy()'
at org.apache.hadoop.util.NativeCodeLoader.buildSupportsSnappy(Native Method)
at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:63)
at org.apache.hadoop.io.compress.SnappyCodec.getCompressorType(SnappyCodec.java:136)
at org.apache.hadoop.io.compress.CodecPool.getCompressor(CodecPool.java:150)
at org.apache.hadoop.io.compress.CompressionCodec$Util.createOutputStreamWithCodecPool(CompressionCodec.java:131)
at org.apache.hadoop.io.compress.SnappyCodec.createOutputStream(SnappyCodec.java:101)
Yiorgos Panayiotakis
09/02/2024, 1:09 PM"Caused by: org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment"
How can I avoid this issue and deploy the application without having to create different apps for insert and update ?Shreeram Narayanan
09/03/2024, 3:31 AMAravind Musigumpula
09/03/2024, 5:47 AMCREATE TABLE users_mysql (
account_id BIGINT,
id BIGINT,
name STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:<mysql://mysql:3306/freshservice>',
'table-name' = 'tdetails',
'username' = 'root',
'password' = 'root',
'lookup.cache.max-rows' = '5000',
'lookup.cache.ttl' = '10min'
);
Marco Scalerandi
09/03/2024, 8:31 AMJaideep C
09/03/2024, 9:46 AMservices:
jobmanager:
image: flink:1.20.0-scala_2.12
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
rest.flamegraph.enabled: true
taskmanager:
image: flink:1.20.0-scala_2.12
depends_on:
- jobmanager
command: taskmanager
scale: 2
environment:
FLINK_PROPERTIES: |
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
rest.flamegraph.enabled: true
So that when I restart the docker I can have my rocksdb state and my job both resumed. Currently my job and the state both disappear. I am uploading the job via the webui.
ThanksAlessio Bernesco Làvore
09/03/2024, 10:43 AMCREATE CATALOG hive_catalog WITH ('type' = 'hive','hive-conf-dir' = '/opt/flink/conf');
) then all the previous tables created during the previous sessions are available, so the catalog is persisted in Hive/Postgres but seems it's just not available on startup.
The catalog is configured as a file catalog and persisted on a mounted volume in docker:
From flink config.yaml
table:
catalog-store:
kind: file
file:
path: file:///opt/flink/catalogs/
From compose:
jobmanager:
image: flink:1.19
volumes:
- ./jobmanager/:/tmp/
- ./jobmanager/:/opt/flink/flink-web
- ./conf/config.yaml:/opt/flink/conf/config.yaml
- ./catalogs/:/opt/flink/catalogs/
- ./conf/hive-site.xml:/opt/flink/conf/hive-site.xml
I also moved a hive_catalog.yaml file retrieved from the non dockerized setup inside the catalogs directory but nothing changes
type: "hive"
hive-conf-dir: "/opt/flink/conf"
Any hints?
Thanks!Paul Annesley
09/03/2024, 11:21 AMapplyToKeyedState
, e.g. to copy items from one state to another?
I'm writing a Flink job that uses Broadcast State to conditionally buffer data from the main keyed stream into state, and then trigger processing that data via events on the broadcast stream. I see that in processBroadcastElement
I can process all keys of a state descriptor using applyToKeyedState(descriptor, (key, state) -> { … })
but I need access to two descriptors at once to move items between two MapStates. Is this possible?