soumilshah1995
09/30/2023, 1:34 PM# start a checkpoint every 1000 ms
env.enable_checkpointing(1000)
When i set Variable in table_env thorws error
whats the right way to set and enable checkpointing in FLink Python
Here is code i have
from pyflink.table import EnvironmentSettings, TableEnvironment
import os
from faker import Faker
# Create a batch TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
env = StreamExecutionEnvironment.get_execution_environment()
# Get the current working directory
CURRENT_DIR = os.getcwd()
# Define a list of JAR file names you want to add
jar_files = [
"flink-sql-connector-postgres-cdc-2.4.1.jar",
"postgresql-42.6.0.jar",
"flink-connector-jdbc-1.16.1.jar",
"flink-s3-fs-hadoop-1.16.1.jar",
"hudi-flink1.16-bundle-0.13.1.jar"
]
# Build the list of JAR URLs by prepending 'file:///' to each file name
jar_urls = [f"file:///{CURRENT_DIR}/{jar_file}" for jar_file in jar_files]
table_env.get_config().get_configuration().set_string(
"pipeline.jars",
";".join(jar_urls)
)
table_env.enableCheckpointing(1000)
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.interval",
"5000"
)
table_env.get_config().get_configuration().set_string(
"parallelism.default",
"4"
)
# Configure checkpointing
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.mode",
"EXACTLY_ONCE"
)
# Set the checkpointing directory to the current directory
table_env.get_config().get_configuration().set_string(
"execution.checkpointing.checkpoints-directory",
CURRENT_DIR
)