Apollo Elon
09/24/2024, 7:20 AMD. Draco O'Brien
09/24/2024, 3:29 PMD. Draco O'Brien
09/24/2024, 3:30 PMimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.Table;
public class DataViewExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Assume we have a source of data, e.g., a Kafka stream
tableEnv.executeSql(
"CREATE TABLE kafkaSource (\n" +
" id INT,\n" +
" amount DOUBLE,\n" +
" proctime AS PROCTIME(),\n" +
" WATERMARK FOR proctime AS proctime - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" + // Example connector setup
"...);"
);
// define a table based on the source
Table rawTable = tableEnv.from("kafkaSource");
// create a materialized view (conceptually a DataView) for total sales per hour
tableEnv.createTemporaryView("totalSalesPerHour",
rawTable
.groupBy(tableEnv.window(rawTable.proctime, "TUMBLE(proctime, INTERVAL '1' HOUR)"), rawTable.id)
.select(rawTable.id, rawTable.amount.sum())
);
// query the materialized view
Table result = tableEnv.sqlQuery("SELECT * FROM totalSalesPerHour");
// to emit the results to another sink, convert the Table back to a DataStream/Dataset
// this step is illustrative and may vary based on your use case
tableEnv.toAppendStream(result, Row.class).print();
env.execute("Flink DataView Example");
}
}
D. Draco O'Brien
09/24/2024, 3:32 PMD. Draco O'Brien
09/24/2024, 3:34 PMD. Draco O'Brien
09/24/2024, 3:35 PMD. Draco O'Brien
09/24/2024, 3:40 PM