Hi whatever is wrong with my sql insert statement?...
# troubleshooting
j
Hi whatever is wrong with my sql insert statement?
Copy code
INSERT INTO etdr_kafka
SELECT *
from etdr_mongo
DISTRIBUTE BY `pi`
SORT BY ts ASC;
I get this error:
Copy code
Flink SQL> [ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "BY" at line 5, column 12.
Was expecting one of:
    <EOF>
    "EXCEPT" ...
    "FETCH" ...
    "GROUP" ...
    "HAVING" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
    "MINUS" ...
    "TABLESAMPLE" ...
    "UNION" ...
    "WHERE" ...
    "WINDOW" ...
    "(" ...
    ";" ...
    "," ...
    "NATURAL" ...
    "JOIN" ...
    "INNER" ...
    "LEFT" ...
    "RIGHT" ...
    "FULL" ...
    "CROSS" ...
    "OUTER" ...
d
It’s because DISTRIBUTE BY is not standard SQL but and not supported by Flink. To do the operation without optimization in standard SQL it would be something like this
Copy code
INSERT INTO etdr_kafka
SELECT * FROM etdr_mongo;
So I believe DISTRIBUTE BY does not exist in Flink SQL at all.
j
d
So you could do something similar in Flink Table API
Although you can’t use DISTRIBUTE BY directly, you can influence data distribution when defining the sink, especially if the sink connector supports partitioning keys. Here’s an example where you might choose a field to partition by when defining the sink table:
Copy code
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkTableIndirectDistributionExample {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // Assume you've defined and registered the source table "etdr_mongo"

        // Define the sink table with partitioning keys implicitly influencing distribution
        // This is hypothetical and depends on the sink connector supporting such configuration
        tableEnv.executeSql(
            "CREATE TABLE etdr_kafka ("
            + "column1 STRING, "
            + "column2 INT, "
            + "pi STRING, "
            + "ts TIMESTAMP"
            + ") WITH ("
            + "'connector' = 'kafka', "
            + "'topic' = 'your-topic', "
            + "'properties.bootstrap.servers' = 'localhost:9092', "
            + "'key.format' = '...', " // Key format for partitioning if applicable
            + "'key.fields' = 'pi', " // Implicitly distributes data by 'pi'
            + "'format' = 'json'"
            + ")"
        );

        // Insert into the sink table
        tableEnv.executeSql(
            "INSERT INTO etdr_kafka SELECT * FROM etdr_mongo"
        );

        env.execute("Flink Table Indirect Distribution Example");
    }
}
It would need to be customized to your specific use case. It can also be done using Data Stream API which is at a lower level.
j
I am loading data from mongo to kafka. I want to sort it by timestamp. The bottleneck is sorting if I use normal order by, because it sorts globally and cannot be parallel. I only need the data to be sorted per partition in kafka. So I was hoping to achieve that with the sql statement I tried initially. I guess I have to use datastream API and even then I have to derive the partition index myself, and already keyBy the partition index in flink.
d
Yes, I think its either that or use an intermediate Kafka topic during processing but that would not be as efficient.