Hangyu Wang
06/05/2023, 2:53 AMtableEnv.executeSql("CREATE TABLE metric (\n" +
" metric_name STRING,\n" +
" function_name STRING,\n" +
" line_number INT\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = '" + parameter.get("input_file") + "', \n" +
" 'format' = 'protobuf',\n" +
" 'protobuf.message-class-name' = 'MetricTest',\n" +
" 'protobuf.ignore-parse-errors' = 'true'\n" +
")");
Table t = tableEnv.from("metric");
t.execute().print();
Here is result:
~/Downloads/flink-1.17.0/bin/flink run target/import-metric-event-1.0-SNAPSHOT.jar --input_file metric_test
Job has been submitted with JobID 9128383c4d60cccaa252145478f31ee5
Empty set
And here is the protobuf file:
metric_name: "test_metric_name"
function_name: "test_function_name"
line_number: 111
Protobuf file:
syntax = "proto3";
package metric;
message MetricTest {
string metric_name = 1;
string function_name = 2;
uint32 line_number = 3;
}
Martijn Visser
06/05/2023, 8:55 AM