over window aggration : ```parallelism = self.para...
# troubleshooting
w
over window aggration :
Copy code
parallelism = self.param.get('parallelism', 1)

result = t_env.execute_sql(""" CREATE TABLE clickhouse_sample_verfired (  
                                    node_id BIGINT, 
                                    uuid STRING, 
                                    batch_id STRING,  
                                    device_time TIMESTAMP(3),  
                                    device_timestamp BIGINT,   
                                    key_id STRING,   
                                    str_v STRING,   
                                    is_delete INTEGER,  
                                    create_time TIMESTAMP(3),  
                                PRIMARY KEY (uuid) NOT ENFORCED 
                            ) WITH (  
                                'connector' = 'clickhouse',   
                                'url' = '<clickhouse://192.168.30.74:30123?compress_algorithm=gzip>',   
                                'username' = 'admin', 
                                'password' = 'Cljslrl0620.', 
                                'database-name' = 'ddps',  
                                'table-name' = 'ts_kv_iot_main', 
                                'scan.partition.column' = '{}',
                                'scan.partition.num' = '{}',  
                                'sink.batch-size' = '500',   
                                'sink.flush-interval' = '1000',  
                                'sink.max-retries' = '3' ); """.format("device_time", parallelism))

# 对传入的时间区间做合并 收集需要过滤的nodeid列表

node_list = []
time_list = []
for i in range(0, len(node_sharding)):
    tmp_node = node_sharding[i]
    if len(tmp_node):
        node_list.append(tmp_node['node_id'])
        time_list.append(
            [tmp_node['clickhouse_start_time'], tmp_node['clickhouse_end_time'], tmp_node['node_id']])

node_list_len = len(node_list)
node_list = str(node_list).strip("[]")

combine_time_list = merge_interval(time_list)

time_list = transfer_format(combine_time_list)

time_combine_str = ""
for time in time_list:
    time_condition = "device_time >= " + "'" + time[0] + "'" + " AND " + "device_time <= " + "'" + time[
        1] + "'" + " AND "
    time_combine_str = time_combine_str + time_condition

# 对传入的时间区间做合并, 收集nodeid list

t_env.create_temporary_system_function("format_time", stream_format_time_refactor)

t_env.create_temporary_system_function("pivot_multi_column", group_device_data_agg)

t_env.execute_sql("""
create table print_table (
        nodeId BIGINT,
        uuid STRING,
        pageBatchID STRING,
        device_time TIMESTAMP(3),
        device_timestamp BIGINT,
        uniq_array STRING
        ) WITH (
            'connector' = 'print'
        )""")
# 做设备的行转列逻辑
# table = t_env.from_path("clickhouse_sample_verfired").group_by(col('node_id'), col('uuid'), col('batch_id'),
#                                                                col('device_time'), col('device_timestamp')) \
#     .flat_aggregate(call("pivot", col('key_id'), col('str_v')).alias("uniq_array")) \
#     .select(col('node_id'), col('uuid'), col('batch_id'))

sql = """ SELECT node_id AS nodeId,  
                         uuid,
                         batch_id AS pageBatchID,  
                         device_time, 
                         device_timestamp, 
                         format_time(device_timestamp, device_time) as new_time,
                         pivot_multi_column(key_id, str_v) 
                         over (PARTITION BY 
                         node_id, uuid, batch_id, device_time, device_timestamp
                          ORDER BY device_time
                             )  AS uniq_array
                   FROM clickhouse_sample_verfired  
                   WHERE  {}
                     node_id in ( {} )  """.format(time_combine_str, node_list)

table = t_env.sql_query(sql)
stream = t_env.to_append_stream(table=table, type_info=Types.ROW([Types.LONG(), Types.STRING(), Types.STRING(),
                                      Types.SQL_TIMESTAMP(), Types.LONG(), Types.SQL_TIMESTAMP(), Types.STRING()]))
stream = stream.flat_map(FeatureTuple())
# stream.execute_and_collect("source_test")
return stream
udf:
Copy code
@udf(result_type=DataTypes.TIMESTAMP(3), func_type="pandas")
def stream_format_time_refactor(deviceTimeStamp: pd.Series, deviceTime: pd.Series) -> pd.Series:
    time_len = len(deviceTime)

    result_series_index = []
    result_series_data = []
    for i in range(time_len):
        result_series_index.append(i)
        current_device_time = str(deviceTime[i])
        timearray = time.strptime(current_device_time, '%Y-%m-%d %H:%M:%S')
        timestamp = int(time.mktime(timearray))
        current_time = timestamp * 1000
        result_series_data.append(current_time)

    result = pd.Series(data=result_series_data, index=result_series_index)
    return result


@udaf(result_type=DataTypes.STRING(), func_type="pandas")
def group_device_data_agg(keyId: pd.Series, strV: pd.Series) -> str:
    df = pd.DataFrame({"key_id": keyId, "str_v": strV})

    item_len = len(keyId)

    dict_list = []
    for i in range(item_len):
        item = keyId[i] + '@' + batch_replace_string(strV[i], ',', ';')
        dict_list.append(item)
    # concat(key_id, '@', replaceAll(str_v,',',';'))
    result_str = str(dict_list)
    return result_str
Slack Conversation