Hello i have a question on Flink CDC ```from pyfli...
# troubleshooting
s
Hello i have a question on Flink CDC
Copy code
from pyflink.table import EnvironmentSettings, TableEnvironment
import os


# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# Get the current working directory
CURRENT_DIR = os.getcwd()

# Define a list of JAR file names you want to add
jar_files = [
    "flink-sql-connector-postgres-cdc-2.4.1.jar",
    "postgresql-42.6.0.jar",
    "flink-connector-jdbc-1.16.1.jar",
    "flink-s3-fs-hadoop-1.16.1.jar",
    "hudi-flink1.16-bundle-0.13.1.jar"
]

# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]

table_env.get_config().get_configuration().set_string(
    "pipeline.jars",
    ";".join(jar_urls)
)
table_env.get_config().get_configuration().set_string(
    "execution.checkpointing.interval",
    "5000"
)

table_env.get_config().get_configuration().set_string(
    "parallelism.default",
    "4"
)

# Configure checkpointing
table_env.get_config().get_configuration().set_string(
    "execution.checkpointing.mode",
    "EXACTLY_ONCE"
)

# Set the checkpointing directory to the current directory
table_env.get_config().get_configuration().set_string(
    "execution.checkpointing.checkpoints-directory",
    CURRENT_DIR
)

# Create a source for the "order_source" table
order_source = f"""
CREATE TABLE IF NOT EXISTS order_source (
    order_id INT,
    order_value DOUBLE PRECISION,
    customer_id INT,
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    table_name STRING METADATA  FROM 'table_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
   'connector' = 'postgres-cdc',
   'hostname' = 'localhost',
   'port' = '5432',
   'username' = 'postgres',
   'password' = 'postgres',
   'database-name' = 'postgres',
   'schema-name' = 'public',
   'slot.name' = 'order_slot',
   'decoding.plugin.name' = 'pgoutput',
    'scan.incremental.snapshot.enabled' = 'true',
    'scan.startup.mode' = 'latest-offset',
   'table-name' = 'orders'
 );
"""

# Execute the SQL to create the sources
table_env.execute_sql(order_source)


table_env.execute_sql("""
    SELECT *
     FROM  order_source
""").print()

print("Job started.")
code works fine how do i ensure if job stops i want to resume from point where it last left can someone guide me or point me in right direction References https://github.com/ververica/flink-cdc-connectors/blob/master/docs/content/connectors/postgres-cdc.md#startup-reading-position
j
If the failure is due to the Debezium side, the connector will continue reading the WAL where it last left off? See https://debezium.io/documentation/reference/stable/connectors/postgresql.html
s
HI @Jaehyeon Kim Thank you very much for reply i am new to flink i did do a test where i stopped the flink application and ran my code again i was getting same items which was already processed back what am i missing here
j
Can you try to restore the previous job's state using savepoint? https://stackoverflow.com/questions/71899889/how-to-recover-flink-sql-jobs-from-checkpoint
s
Thanks you let me read the link throughly and if I have further questions post on this thread thank you for your time