Hey! I want to store streaming data into parquet ...
# troubleshooting
p
Hey! I want to store streaming data into parquet files on S3. I am testing it with the following code but it is not working. I have double checked the IAM role and permissions and there are no S3 access issues.
Copy code
from pyflink.table import EnvironmentSettings, TableEnvironment


t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
source_ddl = f"""
            CREATE TABLE source_table (
                  user_id STRING,
                  order_amount DOUBLE,
                  ts TIMESTAMP(3),
                  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
            ) WITH (
              'connector' = 'datagen',
              'rows-per-second' = '10'
            )
            """
            


sink_ddl = f"""
        CREATE TABLE sink_table (
              user_id STRING,
              order_amount DOUBLE,
              ts TIMESTAMP(3),
              dt STRING
        ) PARTITIONED BY (dt) WITH (
          'connector' = 'filesystem',
          'path' = 's3a://<bucket-name>/flink/data',
          'format' = 'parquet',
          'auto-compaction' = 'true',
          'partition.time-extractor.timestamp-pattern' = '$dt',
          'sink.rolling-policy.file-size'='1MB',
          'sink.rolling-policy.rollover-interval'='60s',
          'sink.partition-commit.delay'='0s',
          'sink.partition-commit.trigger'='partition-time',
          'sink.partition-commit.policy.kind'='success-file'

        );
"""

t_env.execute_sql(source_ddl)
t_env.execute_sql(sink_ddl)

statement_set = t_env.create_statement_set()


statement_set.add_insert_sql("INSERT INTO sink_table SELECT user_id, order_amount, ts, DATE_FORMAT(ts, 'yyyy/MM/dd/HH/mm') as dt FROM source_table")
statement_set.execute().wait()
The code is running indefinitely but there are no files on S3, Here are the logs
Copy code
2023-04-10 11:20:45,212 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
2023-04-10 11:20:45,211 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
2023-04-10 11:21:01,046 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
2023-04-10 11:21:01,047 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
2023-04-10 11:22:00,964 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
2023-04-10 11:22:00,999 INFO  org.apache.hadoop.io.compress.CodecPool                      [] - Got brand-new compressor [.snappy]
d
Per the following documentation [1], you should enable the checkpoint. [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/#rolling-policy
p
It worked after enabling checkpoints. Thank you!
πŸŽ‰ 1
r
How did you enable checkpoints?
p
Copy code
# set the checkpoint mode to EXACTLY_ONCE
t_env.get_config().set("execution.checkpointing.mode", "EXACTLY_ONCE")
t_env.get_config().set("execution.checkpointing.interval", "1min")

# set the statebackend type to "rocksdb", other available options are "filesystem" and "jobmanager"
# you can also set the full qualified Java class name of the StateBackendFactory to this option
# e.g. org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
t_env.get_config().set("state.backend.type", "rocksdb")

# set the checkpoint directory, which is required by the RocksDB statebackend
t_env.get_config().set("state.checkpoints.dir", "file:///tmp/checkpoints/")
πŸ‘ 1
r
Okay