Aditya Pratap Singh
11/02/2023, 11:50 AMRewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
.execute();
Also can i delete orphan files using Flink?Perfect Stranger
11/02/2023, 12:18 PM.option
RewriteDataFilesActionResult result = Actions.forTable(table)
.option("strategy", "sort")
.option("sort_order", "id DESC NULLS LAST,name ASC NULLS FIRST")
.rewriteDataFiles()
.execute();
Read this guide about which options can be passed into this action:
https://iceberg.apache.org/docs/latest/spark-procedures/#rewrite_data_files
It's for spark, but flink should support the same I think, iceberg docs are behind what's implemented.
Regarding deleting orphan files using Flink, you can't delete them using Flink, it's not implemented, so you will need to calculate the difference between what is in your table folder like so:
1. get all the files from all your snapshots (https://iceberg.apache.org/docs/latest/flink-queries/#all-data-files)
sql> SELECT * FROM prod.db.table$all_data_files;
2) list the files from S3/HDFS in your table's folder
3) calculate the difference.
This difference will be the files that you will need to delete. Then you take these files, iterate over them one by one (I guess you can parallelize this as well) and remove them using the S3/HDFS API.
I think you may get more help if you use iceberg workspace instead of Flink.Aditya Pratap Singh
11/03/2023, 4:57 PMPerfect Stranger
11/05/2023, 8:53 AMTable table = ...
SparkActions
.get()
.rewriteDataFiles(table)
.filter(Expressions.equal("date", "2020-08-18"))
.option("target-file-size-bytes", Long.toString(500 * 1024 * 1024)) // 500 MB
.execute();
See if Flink also has the same thing