Hi, i am new to flink
i am using pyflink 1.17.0, Working on use case, where i need to read data from kafka and upsert to postgresql database but i am getting error
below is my code and error
CODE:
from pyflink.common import Encoder
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSink
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
from pyflink.datastream.connectors.kafka import KafkaSource
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.formats.json import JsonRowSerializationSchema, JsonRowDeserializationSchema
from pyflink.common import Types
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from pyflink.table.catalog import JdbcCatalog
from pyflink.table import EnvironmentSettings, TableEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
deserialization_schema = JsonRowDeserializationSchema.Builder() \
.type_info(Types.ROW([
Types.INT(), Types.STRING(),
Types.INT()])) \
.build()
kafka_consumer = FlinkKafkaConsumer(
topics='flinkevents',
deserialization_schema=deserialization_schema,
properties={'bootstrap.servers': 'localhost:9092'}
)
kafka_consumer.set_start_from_earliest()
name = "my_table"
default_database = "postgres" # Replace with your PostgreSQL database name
username = "postgres"
password = "S@iling0844"
base_url = "jdbc:
postgresql://localhost:5432" # Replace with your PostgreSQL host and port
catalog = JdbcCatalog(name, default_database, username, password, base_url)
t_env.register_catalog("my_table", catalog)
t_env.use_catalog("my_table")
env.add_source(kafka_consumer).print()
#t_env.create_temporary_table('kafka_source', kafka_consumer)
t_env.execute_sql('''
CREATE TABLE sink_table (
id INT,
name STRING,
age INT
) WITH (
'connector' = 'jdbc',
'table-name' = 'postgres_sink',
'sink.buffer-flush.max-rows' = '1'
)
''')
t_env.execute_sql('''
INSERT INTO jdbc_sink
SELECT id, name, age FROM kafka_source
''')
env.execute('Kafka to POSTGRESQL')
ERROR:
File "/Users/Documents/Flink app/discriptors.py", line 44, in <module>
t_env.execute_sql('''
File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/pyflink/table/table_environment.py", line 837, in execute_sql
return TableResult(self._j_tenv.executeSql(stmt))
File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in
call
return_value = get_return_value(
File "/Users/Documents/Flink app/env/lib/python3.9/site-packages/pyflink/util/exceptions.py", line 158, in deco
raise java_exception
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not execute CreateTable in path
my_table
.
postgres
.
sink_table
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:915)
at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:652)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1000)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:765)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsupportedOperationException
at org.apache.flink.connector.jdbc.catalog.AbstractJdbcCatalog.createTable(AbstractJdbcCatalog.java:208)
at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$11(CatalogManager.java:663)
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:909)
... 14 more
Appreciate any help here
Thanks