piby 180
04/10/2023, 11:25 AMfrom pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
source_ddl = f"""
CREATE TABLE source_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
)
"""
sink_ddl = f"""
CREATE TABLE sink_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
dt STRING
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = 's3a://<bucket-name>/flink/data',
'format' = 'parquet',
'auto-compaction' = 'true',
'partition.time-extractor.timestamp-pattern' = '$dt',
'sink.rolling-policy.file-size'='1MB',
'sink.rolling-policy.rollover-interval'='60s',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='success-file'
);
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
statement_set = t_env.create_statement_set()
statement_set.add_insert_sql("INSERT INTO sink_table SELECT user_id, order_amount, ts, DATE_FORMAT(ts, 'yyyy/MM/dd/HH/mm') as dt FROM source_table")
statement_set.execute().wait()
The code is running indefinitely but there are no files on S3, Here are the logs
2023-04-10 11:20:45,212 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:20:45,211 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:21:01,046 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:21:01,047 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:22:00,964 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]
2023-04-10 11:22:00,999 INFO org.apache.hadoop.io.compress.CodecPool [] - Got brand-new compressor [.snappy]Dian Fu
04/10/2023, 11:47 AMpiby 180
04/10/2023, 12:12 PMRaghunadh Nittala
04/10/2023, 1:34 PMpiby 180
04/10/2023, 1:37 PM# set the checkpoint mode to EXACTLY_ONCE
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
t_env.get_config().set("execution.checkpointing.interval", "1min")
# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
t_env.get_config().set("state.backend.type", "rocksdb")
# set the checkpoint directory, which is required by the RocksDB statebackend
t_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")Raghunadh Nittala
04/10/2023, 2:01 PM