陈博
09/12/2023, 11:24 AMSET 'execution.checkpointing.interval' = '10s';
CREATE TABLE flume (
imsi String,
time_stamp String,
domain String,
this_time_stamp AS CAST(CURRENT_TIMESTAMP AS timestamp(3)),
WATERMARK FOR this_time_stamp AS this_time_stamp-INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'flume',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.zookeeper.connect' = 'localhost:2181',
'properties.group.id' = 'flink-runtime-14124',
-- 'scan.startup.mode' = 'latest-offset',
'scan.startup.mode' = 'earliest-offset',
--earliest
'format' = 'csv',
'csv.field-delimiter' = ','
);
CREATE TEMPORARY VIEW whitelist_view AS SELECT * FROM nd_wt_mysql;
SELECT * FROM flume WHERE domain NOT IN (select name from whitelist_view);
This "not in" unable to find data. Is it because of the data type mismatch?Jane Chan
09/12/2023, 12:35 PMwhitelist_view
and flume
separately to see.陈博
09/12/2023, 12:59 PM陈博
09/12/2023, 1:00 PMSELECT * FROM flume WHERE domain NOT IN (select name from whitelist_view);
but change to this , I get no resultJane Chan
09/12/2023, 1:09 PMnd_wt_mysql
defined?陈博
09/13/2023, 1:13 AMCREATE TABLE nd_wt_mysql (
name VARCHAR(50)
) WITH (
'connector' = 'doris',
'sink.label-prefix' = 'doris-ndx-14192',
'fenodes' = '127.0.0.1:8030',
'table.identifier' = 'ndu.nd_wt_mysql',
'username' = 'admin',
'password' = ''
);
nd_wt_mysql is a doris table, with one filed陈博
09/13/2023, 1:21 AM陈博
09/13/2023, 1:38 AMCREATE TABLE flume (
imsi VARCHAR(50),
time_stamp VARCHAR(50),
domain VARCHAR(50)
) WITH (
'connector' = 'kafka',
-- 'topic' = 'flume',
'topic' = 'ndu01',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.zookeeper.connect' = 'localhost:2181',
'properties.group.id' = 'flink-runtime-14124121',
'scan.startup.mode' = 'latest-offset',
-- 'scan.startup.mode' = 'earliest-offset',
'format' = 'csv',
'csv.field-delimiter' = ','
);
陈博
09/13/2023, 1:39 AM陈博
09/13/2023, 3:06 AM陈博
09/13/2023, 3:19 AMSET 'execution.checkpointing.interval' = '10s';
this line, the doris connector also works, but without this line, records can not insert into doris.