```SET 'execution.checkpointing.interval' = '10s';...
# troubleshooting
u
Copy code
SET 'execution.checkpointing.interval' = '10s';

CREATE TABLE flume (
    imsi String,
    time_stamp String,
    domain String,
    this_time_stamp AS CAST(CURRENT_TIMESTAMP AS timestamp(3)),
    WATERMARK FOR this_time_stamp AS this_time_stamp-INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'flume',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.zookeeper.connect' = 'localhost:2181',
    'properties.group.id' = 'flink-runtime-14124',
    -- 'scan.startup.mode' = 'latest-offset',
    'scan.startup.mode' = 'earliest-offset',
    --earliest
    'format' = 'csv',
    'csv.field-delimiter' = ','
);



CREATE TEMPORARY VIEW whitelist_view AS SELECT * FROM nd_wt_mysql;


SELECT * FROM flume WHERE domain NOT IN (select name from whitelist_view);
This "not in" unable to find data. Is it because of the data type mismatch?
j
I suggest you first perform some data exploration by printing the data from the
whitelist_view
and
flume
separately to see.
u
The data has indeed entered, and I can see the correct results using SELECT * FROM flume WHERE domain NOT IN ('a','b');
Copy code
SELECT * FROM flume WHERE domain NOT IN (select name from whitelist_view);
but change to this , I get no result
j
Do you mean that you cannot receive results from the query, or do you mean the "Records Sent" is zero? If the join operator is the last, it's by design that this metric is zero. Btw how is
nd_wt_mysql
defined?
u
Copy code
CREATE TABLE nd_wt_mysql (
    name VARCHAR(50)
) WITH (
    'connector' = 'doris',
    'sink.label-prefix' = 'doris-ndx-14192',
    'fenodes' = '127.0.0.1:8030',
    'table.identifier' = 'ndu.nd_wt_mysql',
    'username' = 'admin',
    'password' = ''
);
nd_wt_mysql is a doris table, with one filed
the flume table changed as follow:
Copy code
CREATE TABLE flume (
    imsi VARCHAR(50),
    time_stamp VARCHAR(50),
    domain VARCHAR(50)
) WITH (
    'connector' = 'kafka',
    -- 'topic' = 'flume',
    'topic' = 'ndu01',
    'properties.bootstrap.servers' = 'localhost:9092',
    'properties.zookeeper.connect' = 'localhost:2181',
    'properties.group.id' = 'flink-runtime-14124121',
    'scan.startup.mode' = 'latest-offset',
    -- 'scan.startup.mode' = 'earliest-offset',
    'format' = 'csv',
    'csv.field-delimiter' = ','
);
domain and name both VARCHAR(50) , but still get 0 result
I switched nd_wt_mysql connector from doris to mysql, and the program ran normally. It seems flink do not totally support for doris storage.
If I remove
Copy code
SET 'execution.checkpointing.interval' = '10s';
this line, the doris connector also works, but without this line, records can not insert into doris.