Hi, i am new to flink i am using pyflink 1.17.0, ...
# troubleshooting
m
Hi, i am new to flink i am using pyflink 1.17.0, Working on use case, where i need to read data from kafka and upsert to postgresql database but i am getting error below is my code and error CODE: from pyflink.common import Encoder from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors.file_system import FileSink from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes from pyflink.datastream.connectors.kafka import KafkaSource from pyflink.common.serialization import SimpleStringSchema from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema from pyflink.common import Types from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings, StreamTableEnvironment from pyflink.table.catalog import JdbcCatalog from pyflink.table import EnvironmentSettings, TableEnvironment env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) deserialization_schema = JsonRowDeserializationSchema.Builder() \ .type_info(Types.ROW([Types.INT(), Types.STRING(),Types.INT()])) \ .build() kafka_consumer = FlinkKafkaConsumer( topics='flinkevents', deserialization_schema=deserialization_schema, properties={'bootstrap.servers': 'localhost:9092'} ) kafka_consumer.set_start_from_earliest() name = "my_table" default_database = "postgres" # Replace with your PostgreSQL database name username = "postgres" password = "S@iling0844" base_url = "jdbc:postgresql://localhost:5432" # Replace with your PostgreSQL host and port catalog = JdbcCatalog(name, default_database, username, password, base_url) t_env.register_catalog("my_table", catalog) t_env.use_catalog("my_table") env.add_source(kafka_consumer).print() #t_env.create_temporary_table('kafka_source', kafka_consumer) t_env.execute_sql(''' CREATE TABLE sink_table ( id INT, name STRING, age INT ) WITH ( 'connector' = 'jdbc', 'table-name' = 'postgres_sink', 'sink.buffer-flush.max-rows' = '1' ) ''') t_env.execute_sql(''' INSERT INTO jdbc_sink SELECT id, name, age FROM kafka_source ''') env.execute('Kafka to POSTGRESQL') ERROR: File "/Users/Documents/Flink app/discriptors.py", line 44, in <module> t_env.execute_sql(''' File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/pyflink/table/table_environment.py", line 837, in execute_sql return TableResult(self._j_tenv.executeSql(stmt)) File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in call return_value = get_return_value( File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 158, in deco raise java_exception pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not execute CreateTable in path
my_table
.
postgres
.
sink_table
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:915) at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:652) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1000) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.UnsupportedOperationException at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:208) at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$11(CatalogManager.java:663) at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:909) ... 14 more Appreciate any help here Thanks
m
Looking at your CREATE TABLE statement for the sink, you are missing required connector options. You can find the list at https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/jdbc/