Hey guys, I am testing how to store streaming dat...
# troubleshooting
p
Hey guys, I am testing how to store streaming data to parquet files on S3 and have encountered the following issues: 1. S3 key is not getting parsed properly. "/" is replaced by "%2F" 2. How can I customize the filename stored in S3? I couldn't find any setting for it. 3. I get the following error when I use
'sink.partition-commit.trigger'='partition-time'.
`Caused by: java.time.format.DateTimeParseException: Text '2023/04/10/13/49' could not be parsed at index 4`Looks like it is not able to commit the partition. When I use
'sink.partition-commit.trigger'='process-time',
it works and I can see _SUCCESS empty files being commited. 4. Is it better to use DataStream API for S3 Sink? From the documentation, it looks like data stream API has the ability to write file prefix and suffix https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/filesystem/ Here is my code:
Copy code
from pyflink.table import EnvironmentSettings, TableEnvironment


t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("restart-strategy.type", "fixed-delay")
t_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
t_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")

jar_list = """
    file:///home/ubuntu/environment/flink/lib/flink-sql-connector-kafka-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/flink-sql-parquet-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/flink-connector-files-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/flink-s3-fs-hadoop-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/flink-s3-fs-presto-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/hadoop-mapreduce-client-core-3.3.5.jar
"""
t_env.get_config().set("pipeline.jars", jar_list)
t_env.get_config().set("pipeline.classpaths", jar_list)
# set the checkpoint mode to EXACTLY_ONCE
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
t_env.get_config().set("execution.checkpointing.interval", "1min")

# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
t_env.get_config().set("state.backend.type", "rocksdb")

# set the checkpoint directory, which is required by the RocksDB statebackend
t_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
source_ddl = f"""
            CREATE TABLE source_table (
                  user_id STRING,
                  order_amount DOUBLE,
                  ts TIMESTAMP(3),
                  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
            ) WITH (
              'connector' = 'datagen',
              'rows-per-second' = '10'
            )
            """
            


sink_ddl = f"""
        CREATE TABLE sink_table (
              user_id STRING,
              order_amount DOUBLE,
              ts TIMESTAMP(3),
              dt VARCHAR
        ) PARTITIONED BY (dt) WITH (
          'connector' = 'filesystem',
          'path' = 's3a://<bucket_name>/flink/data',
          'format' = 'parquet',
          'auto-compaction' = 'true',
          'sink.rolling-policy.file-size'='1MB',
          'sink.rolling-policy.rollover-interval'='60s',
          'sink.partition-commit.delay'='0s',
          'partition.time-extractor.timestamp-pattern' = '$dt',
          'sink.partition-commit.trigger'='partition-time',
          'sink.partition-commit.policy.kind'='success-file'

        );
"""

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

statement_set = t_env.create_statement_set()


statement_set.add_insert_sql("INSERT INTO sink_table SELECT user_id, order_amount, ts, DATE_FORMAT(ts, 'yyyy/MM/dd/HH/mm') as dt FROM source_table")
statement_set.execute().wait()
d
How can I customize the filename stored in S3? I couldn’t find any setting for it.
It currently doesn’t provide config options to customize the filename.
I get the following error when I use
'sink.partition-commit.trigger'='partition-time'.
`Caused by: java.time.format.DateTimeParseException: Text '2023/04/10/13/49' could not be parsed at index 4`Looks like it is not able to commit the partition. When I use
'sink.partition-commit.trigger'='process-time',
it works and I can see _SUCCESS empty files being commited.
By default the format of the partition should be
yyyy-MM-dd HH:mm:ss
, you could customize it via config
partition.time-extractor.timestamp-formatter
, see https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/#partition-time-extractor-timestamp-formatter for more details.
Is it better to use DataStream API for S3 Sink? From the documentation, it looks like data stream API has the ability to write file prefix and suffix
If you want to define the prefix and suffix yourself, I guess you have to use DataStream API as it has not provided this ability in Table API & SQL.
p
Thanks for your comments. Do you know how I can fix this "%2F" problem (screenshot)? S3 is converting / to "%"F" . There seems to be some encoding problem.
r
As the discussion is around Sinking parquet files to S3, I just want to add a point here - When I have set ‘auto-compaction’ to true, I observed that the files got deleted from S3. I’m wondering why the existing files got deleted. I expected the new files to be compacted
d
Thanks for your comments. Do you know how I can fix this “%2F” problem (screenshot)? S3 is converting / to “%”F” . There seems to be some encoding problem.
Can I know how you set the S3 key? From the sample code, I have not found that part of code.
As the discussion is around Sinking parquet files to S3, I just want to add a point here - When I have set ‘auto-compaction’ to true, I observed that the files got deleted from S3. I’m wondering why the existing files got deleted. I expected the new files to be compacted
I’m sorry that I’m also not quite familiar with FileSystem connector. It would be better to send this questions to the main thread to see if anybody others knows this.
p
Can I know how you set the S3 key? From the sample code, I have not found that part of code.
This is the entire code. I create a dt column where I inject datetime string in format yyyy/MM/dd/HH/mm using DATE_FORMAT. And I am trying to partition data on minute level. I have managed to make it work by creating separate columns for year, month, day, hour, minute like this (not an elegant solution but it works)
Copy code
import urllib
from pyflink.table.udf import udtf
from pyflink.table import DataTypes
from pyflink.table import EnvironmentSettings, TableEnvironment


t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("restart-strategy.type", "fixed-delay")
t_env.get_config().set("restart-strategy.fixed-delay.attempts", "3")
t_env.get_config().set("restart-strategy.fixed-delay.delay", "30s")


jar_list = """
    file:///home/ubuntu/environment/flink/lib/flink-sql-connector-kafka-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/flink-sql-parquet-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/flink-connector-files-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/flink-s3-fs-hadoop-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/flink-s3-fs-presto-1.17.0.jar;
    file:///home/ubuntu/environment/flink/lib/hadoop-mapreduce-client-core-3.3.5.jar
"""
t_env.get_config().set("pipeline.jars", jar_list)
t_env.get_config().set("pipeline.classpaths", jar_list)
# set the checkpoint mode to EXACTLY_ONCE
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
t_env.get_config().set("execution.checkpointing.interval", "1min")

# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
t_env.get_config().set("state.backend.type", "rocksdb")

# set the checkpoint directory, which is required by the RocksDB statebackend
t_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
source_ddl = f"""
            CREATE TABLE source_table (
                  user_id STRING,
                  order_amount DOUBLE,
                  ts TIMESTAMP(3),
                  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
            ) WITH (
              'connector' = 'datagen',
              'rows-per-second' = '100'
            )
            """
            


sink_ddl = f"""
        CREATE TABLE sink_table (
              user_id STRING,
              order_amount DOUBLE,
              ts TIMESTAMP(3),
              `year` STRING,
              `month` STRING,
              `day` STRING,
              `hour` STRING,
              `minute` STRING
        ) PARTITIONED BY (`year`, `month`, `day`, `hour`, `minute`) WITH (
          'connector' = 'filesystem',
          'path' = 's3a://<bucket-name>/flink/data',
          'format' = 'parquet',
          'partition.time-extractor.timestamp-pattern' = '$year/$month/$day/$hour/$minute',
          'partition.time-extractor.timestamp-formatter' = 'yyyy/MM/dd/HH/mm',
          'auto-compaction' = 'true',
          'compaction.file-size' = '128MB',
          'sink.rolling-policy.file-size'='128MB',
          'sink.rolling-policy.rollover-interval'='5min',
          'sink.partition-commit.delay'='0s',
          'sink.partition-commit.trigger'='partition-time',
          'sink.partition-commit.policy.kind'='success-file'

        );
"""

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

# option 2: Python function
@udtf(result_types=[DataTypes.STRING()])
def quote_plus(s):
  return urllib.parse.quote_plus(s)

statement_set = t_env.create_statement_set()


statement_set.add_insert_sql(f"""
        INSERT INTO sink_table
        SELECT user_id,
            order_amount,
            ts,
            DATE_FORMAT(ts, 'yyyy'),
            DATE_FORMAT(ts, 'MM'),
            DATE_FORMAT(ts, 'dd'),
            DATE_FORMAT(ts, 'HH'),
            DATE_FORMAT(ts, 'mm')
            FROM source_table""")
statement_set.execute().wait()
d
@piby 180 Same questions as above, I have not found where S3 key is set in the above code.
More specially, which part of code this questions come from:
S3 key is not getting parsed properly. "/" is replaced by "%2F"
p
It is coming from within flink S3 filesink connector. Most likely
Copy code
PARTITIONED BY (dt)
or
Copy code
'partition.time-extractor.timestamp-pattern' = '$dt',