windwheel
08/30/2024, 1:13 AMparallelism = self.param.get('parallelism', 1)
result = t_env.execute_sql(""" CREATE TABLE clickhouse_sample_verfired (
node_id BIGINT,
uuid STRING,
batch_id STRING,
device_time TIMESTAMP(3),
device_timestamp BIGINT,
key_id STRING,
str_v STRING,
is_delete INTEGER,
create_time TIMESTAMP(3),
PRIMARY KEY (uuid) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = '<clickhouse://192.168.30.74:30123?compress_algorithm=gzip>',
'username' = 'admin',
'password' = 'Cljslrl0620.',
'database-name' = 'ddps',
'table-name' = 'ts_kv_iot_main',
'scan.partition.column' = '{}',
'scan.partition.num' = '{}',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3' ); """.format("device_time", parallelism))
# 对传入的时间区间做合并 收集需要过滤的nodeid列表
node_list = []
time_list = []
for i in range(0, len(node_sharding)):
tmp_node = node_sharding[i]
if len(tmp_node):
node_list.append(tmp_node['node_id'])
time_list.append(
[tmp_node['clickhouse_start_time'], tmp_node['clickhouse_end_time'], tmp_node['node_id']])
node_list_len = len(node_list)
node_list = str(node_list).strip("[]")
combine_time_list = merge_interval(time_list)
time_list = transfer_format(combine_time_list)
time_combine_str = ""
for time in time_list:
time_condition = "device_time >= " + "'" + time[0] + "'" + " AND " + "device_time <= " + "'" + time[
1] + "'" + " AND "
time_combine_str = time_combine_str + time_condition
# 对传入的时间区间做合并, 收集nodeid list
t_env.create_temporary_system_function("format_time", stream_format_time_refactor)
t_env.create_temporary_system_function("pivot_multi_column", group_device_data_agg)
t_env.execute_sql("""
create table print_table (
nodeId BIGINT,
uuid STRING,
pageBatchID STRING,
device_time TIMESTAMP(3),
device_timestamp BIGINT,
uniq_array STRING
) WITH (
'connector' = 'print'
)""")
# 做设备的行转列逻辑
# table = t_env.from_path("clickhouse_sample_verfired").group_by(col('node_id'), col('uuid'), col('batch_id'),
# col('device_time'), col('device_timestamp')) \
# .flat_aggregate(call("pivot", col('key_id'), col('str_v')).alias("uniq_array")) \
# .select(col('node_id'), col('uuid'), col('batch_id'))
sql = """ SELECT node_id AS nodeId,
uuid,
batch_id AS pageBatchID,
device_time,
device_timestamp,
format_time(device_timestamp, device_time) as new_time,
pivot_multi_column(key_id, str_v)
over (PARTITION BY
node_id, uuid, batch_id, device_time, device_timestamp
ORDER BY device_time
) AS uniq_array
FROM clickhouse_sample_verfired
WHERE {}
node_id in ( {} ) """.format(time_combine_str, node_list)
table = t_env.sql_query(sql)
stream = t_env.to_append_stream(table=table, type_info=Types.ROW([Types.LONG(), Types.STRING(), Types.STRING(),
Types.SQL_TIMESTAMP(), Types.LONG(), Types.SQL_TIMESTAMP(), Types.STRING()]))
stream = stream.flat_map(FeatureTuple())
# stream.execute_and_collect("source_test")
return stream
udf:
@udf(result_type=DataTypes.TIMESTAMP(3), func_type="pandas")
def stream_format_time_refactor(deviceTimeStamp: pd.Series, deviceTime: pd.Series) -> pd.Series:
time_len = len(deviceTime)
result_series_index = []
result_series_data = []
for i in range(time_len):
result_series_index.append(i)
current_device_time = str(deviceTime[i])
timearray = time.strptime(current_device_time, '%Y-%m-%d %H:%M:%S')
timestamp = int(time.mktime(timearray))
current_time = timestamp * 1000
result_series_data.append(current_time)
result = pd.Series(data=result_series_data, index=result_series_index)
return result
@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def group_device_data_agg(keyId: pd.Series, strV: pd.Series) -> str:
df = pd.DataFrame({"key_id": keyId, "str_v": strV})
item_len = len(keyId)
dict_list = []
for i in range(item_len):
item = keyId[i] + '@' + batch_replace_string(strV[i], ',', ';')
dict_list.append(item)
# concat(key_id, '@', replaceAll(str_v,',',';'))
result_str = str(dict_list)
return result_str
Slack Conversation