soumilshah1995
09/27/2023, 12:47 AMfrom pyflink.table import EnvironmentSettings, TableEnvironment
import os
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-postgres-cdc-2.4.1.jar",
"postgresql-42.6.0.jar",
"flink-connector-jdbc-1.16.1.jar",
"flink-s3-fs-hadoop-1.16.1.jar",
"hudi-flink1.16-bundle-0.13.1.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval",
"5000"
)
table_env.get_config().get_configuration().set_string(
"parallelism.default",
"4"
)
# Configure checkpointing
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.mode",
"EXACTLY_ONCE"
)
# Set the checkpointing directory to the current directory
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.checkpoints-directory",
CURRENT_DIR
)
# Create a source for the "order_source" table
order_source = f"""
CREATE TABLE IF NOT EXISTS order_source (
order_id INT,
order_value DOUBLE PRECISION,
customer_id INT,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'localhost',
'port' = '5432',
'username' = 'postgres',
'password' = 'postgres',
'database-name' = 'postgres',
'schema-name' = 'public',
'slot.name' = 'order_slot',
'decoding.plugin.name' = 'pgoutput',
'scan.incremental.snapshot.enabled' = 'true',
'scan.startup.mode' = 'latest-offset',
'table-name' = 'orders'
);
"""
# Execute the SQL to create the sources
table_env.execute_sql(order_source)
table_env.execute_sql("""
SELECT *
FROM order_source
""").print()
print("Job started.")
code works fine how do i ensure if job stops i want to resume from point where it last left
can someone guide me or point me in right direction
References
https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/postgres-cdc.md#startup-reading-positionJaehyeon Kim
09/27/2023, 2:21 AMsoumilshah1995
09/27/2023, 1:06 PMJaehyeon Kim
09/27/2023, 7:50 PMsoumilshah1995
09/27/2023, 8:17 PM