piby 180
04/10/2023, 2:00 PM'sink.partition-commit.trigger'='partition-time'. `Caused by: java.time.format.DateTimeParseException: Text '2023/04/10/13/49' could not be parsed at index 4`Looks like it is not able to commit the partition. When I use 'sink.partition-commit.trigger'='process-time', it works and I can see _SUCCESS empty files being commited.
4. Is it better to use DataStream API for S3 Sink? From the documentation, it looks like data stream API has the ability to write file prefix and suffix
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/
Here is my code:
from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("restart-strategy.type", "fixed-delay")
t_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
t_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
jar_list = """
file:///home/ubuntu/environment/flink/lib/flink-sql-connector-kafka-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-sql-parquet-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-connector-files-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-s3-fs-hadoop-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-s3-fs-presto-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/hadoop-mapreduce-client-core-3.3.5.jar
"""
t_env.get_config().set("pipeline.jars", jar_list)
t_env.get_config().set("pipeline.classpaths", jar_list)
# set the checkpoint mode to EXACTLY_ONCE
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
t_env.get_config().set("execution.checkpointing.interval", "1min")
# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
t_env.get_config().set("state.backend.type", "rocksdb")
# set the checkpoint directory, which is required by the RocksDB statebackend
t_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
source_ddl = f"""
CREATE TABLE source_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10'
)
"""
sink_ddl = f"""
CREATE TABLE sink_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
dt VARCHAR
) PARTITIONED BY (dt) WITH (
'connector' = 'filesystem',
'path' = 's3a://<bucket_name>/flink/data',
'format' = 'parquet',
'auto-compaction' = 'true',
'sink.rolling-policy.file-size'='1MB',
'sink.rolling-policy.rollover-interval'='60s',
'sink.partition-commit.delay'='0s',
'partition.time-extractor.timestamp-pattern' = '$dt',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='success-file'
);
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
statement_set = t_env.create_statement_set()
statement_set.add_insert_sql("INSERT INTO sink_table SELECT user_id, order_amount, ts, DATE_FORMAT(ts, 'yyyy/MM/dd/HH/mm') as dt FROM source_table")
statement_set.execute().wait()Dian Fu
04/11/2023, 7:44 AMHow can I customize the filename stored in S3? I couldn’t find any setting for it.It currently doesn’t provide config options to customize the filename.
Dian Fu
04/11/2023, 7:46 AMI get the following error when I useBy default the format of the partition should be`Caused by: java.time.format.DateTimeParseException: Text '2023/04/10/13/49' could not be parsed at index 4`Looks like it is not able to commit the partition. When I use'sink.partition-commit.trigger'='partition-time'.it works and I can see _SUCCESS empty files being commited.'sink.partition-commit.trigger'='process-time',
yyyy-MM-dd HH:mm:ss , you could customize it via config partition.time-extractor.timestamp-formatter, see https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/#partition-time-extractor-timestamp-formatter for more details.Dian Fu
04/11/2023, 7:47 AMIs it better to use DataStream API for S3 Sink? From the documentation, it looks like data stream API has the ability to write file prefix and suffixIf you want to define the prefix and suffix yourself, I guess you have to use DataStream API as it has not provided this ability in Table API & SQL.
piby 180
04/11/2023, 9:51 AMRaghunadh Nittala
04/11/2023, 9:56 AMDian Fu
04/11/2023, 11:45 AMThanks for your comments. Do you know how I can fix this “%2F” problem (screenshot)? S3 is converting / to “%”F” . There seems to be some encoding problem.Can I know how you set the S3 key? From the sample code, I have not found that part of code.
Dian Fu
04/11/2023, 11:47 AMAs the discussion is around Sinking parquet files to S3, I just want to add a point here - When I have set ‘auto-compaction’ to true, I observed that the files got deleted from S3. I’m wondering why the existing files got deleted. I expected the new files to be compactedI’m sorry that I’m also not quite familiar with FileSystem connector. It would be better to send this questions to the main thread to see if anybody others knows this.
piby 180
04/11/2023, 12:14 PMCan I know how you set the S3 key? From the sample code, I have not found that part of code.This is the entire code. I create a dt column where I inject datetime string in format yyyy/MM/dd/HH/mm using DATE_FORMAT. And I am trying to partition data on minute level. I have managed to make it work by creating separate columns for year, month, day, hour, minute like this (not an elegant solution but it works)
import urllib
from pyflink.table.udf import udtf
from pyflink.table import DataTypes
from pyflink.table import EnvironmentSettings, TableEnvironment
t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("restart-strategy.type", "fixed-delay")
t_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
t_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")
jar_list = """
file:///home/ubuntu/environment/flink/lib/flink-sql-connector-kafka-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-sql-parquet-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-connector-files-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-s3-fs-hadoop-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/flink-s3-fs-presto-1.17.0.jar;
file:///home/ubuntu/environment/flink/lib/hadoop-mapreduce-client-core-3.3.5.jar
"""
t_env.get_config().set("pipeline.jars", jar_list)
t_env.get_config().set("pipeline.classpaths", jar_list)
# set the checkpoint mode to EXACTLY_ONCE
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
t_env.get_config().set("execution.checkpointing.interval", "1min")
# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
t_env.get_config().set("state.backend.type", "rocksdb")
# set the checkpoint directory, which is required by the RocksDB statebackend
t_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
source_ddl = f"""
CREATE TABLE source_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100'
)
"""
sink_ddl = f"""
CREATE TABLE sink_table (
user_id STRING,
order_amount DOUBLE,
ts TIMESTAMP(3),
`year` STRING,
`month` STRING,
`day` STRING,
`hour` STRING,
`minute` STRING
) PARTITIONED BY (`year`, `month`, `day`, `hour`, `minute`) WITH (
'connector' = 'filesystem',
'path' = 's3a://<bucket-name>/flink/data',
'format' = 'parquet',
'partition.time-extractor.timestamp-pattern' = '$year/$month/$day/$hour/$minute',
'partition.time-extractor.timestamp-formatter' = 'yyyy/MM/dd/HH/mm',
'auto-compaction' = 'true',
'compaction.file-size' = '128MB',
'sink.rolling-policy.file-size'='128MB',
'sink.rolling-policy.rollover-interval'='5min',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='success-file'
);
"""
t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)
# option 2: Python function
@udtf(result_types=[DataTypes.STRING()])
def quote_plus(s):
return urllib.parse.quote_plus(s)
statement_set = t_env.create_statement_set()
statement_set.add_insert_sql(f"""
INSERT INTO sink_table
SELECT user_id,
order_amount,
ts,
DATE_FORMAT(ts, 'yyyy'),
DATE_FORMAT(ts, 'MM'),
DATE_FORMAT(ts, 'dd'),
DATE_FORMAT(ts, 'HH'),
DATE_FORMAT(ts, 'mm')
FROM source_table""")
statement_set.execute().wait()Dian Fu
04/11/2023, 12:19 PMDian Fu
04/11/2023, 12:21 PMS3 key is not getting parsed properly. "/" is replaced by "%2F"piby 180
04/11/2023, 12:24 PMPARTITIONED BY (dt)
or
'partition.time-extractor.timestamp-pattern' = '$dt',