Jacob Jona Fahlenkamp
09/18/2024, 11:52 AMINSERT INTO etdr_kafka
SELECT *
from etdr_mongo
DISTRIBUTE BY `pi`
SORT BY ts ASC;
I get this error:
Flink SQL> [ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "BY" at line 5, column 12.
Was expecting one of:
<EOF>
"EXCEPT" ...
"FETCH" ...
"GROUP" ...
"HAVING" ...
"INTERSECT" ...
"LIMIT" ...
"OFFSET" ...
"ORDER" ...
"MINUS" ...
"TABLESAMPLE" ...
"UNION" ...
"WHERE" ...
"WINDOW" ...
"(" ...
";" ...
"," ...
"NATURAL" ...
"JOIN" ...
"INNER" ...
"LEFT" ...
"RIGHT" ...
"FULL" ...
"CROSS" ...
"OUTER" ...
D. Draco O'Brien
09/18/2024, 12:59 PMINSERT INTO etdr_kafka
SELECT * FROM etdr_mongo;
D. Draco O'Brien
09/18/2024, 1:06 PMJacob Jona Fahlenkamp
09/18/2024, 1:09 PMD. Draco O'Brien
09/18/2024, 1:10 PMD. Draco O'Brien
09/18/2024, 1:12 PMimport org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkTableIndirectDistributionExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Assume you've defined and registered the source table "etdr_mongo"
// Define the sink table with partitioning keys implicitly influencing distribution
// This is hypothetical and depends on the sink connector supporting such configuration
tableEnv.executeSql(
"CREATE TABLE etdr_kafka ("
+ "column1 STRING, "
+ "column2 INT, "
+ "pi STRING, "
+ "ts TIMESTAMP"
+ ") WITH ("
+ "'connector' = 'kafka', "
+ "'topic' = 'your-topic', "
+ "'properties.bootstrap.servers' = 'localhost:9092', "
+ "'key.format' = '...', " // Key format for partitioning if applicable
+ "'key.fields' = 'pi', " // Implicitly distributes data by 'pi'
+ "'format' = 'json'"
+ ")"
);
// Insert into the sink table
tableEnv.executeSql(
"INSERT INTO etdr_kafka SELECT * FROM etdr_mongo"
);
env.execute("Flink Table Indirect Distribution Example");
}
}
D. Draco O'Brien
09/18/2024, 1:12 PMJacob Jona Fahlenkamp
09/18/2024, 1:44 PMD. Draco O'Brien
09/19/2024, 3:56 AM