Slackbot
03/01/2022, 7:34 PMEduardo
Alexander Saltzman
03/01/2022, 7:39 PMAlexander Saltzman
03/01/2022, 7:40 PMif product.exists(): continue
Ido (Ploomber)
Eduardo
Eduardo
Eduardo
Alexander Saltzman
03/01/2022, 7:58 PMAlexander Saltzman
03/01/2022, 7:59 PM# we build a new DAG to accumulate all masic tasks
dag = DAG(executor=Parallel(processes=8, print_progress=True))
# ============================================================
def add_task(raw_file):
final_env = set_env(orig_env, raw_file)
expected_output = calculate_masic_output(raw_file)
_masic_output_dir = orig_env["masic"]["outputdir"]
product = File(Path(_masic_output_dir) / Path(expected_output))
if product.exists():
print(f"{product} already exists")
return
print(f"adding env: {final_env}")
ShellScript(
Path("./scripts/run_masic.sh"),
dag=dag,
product=product,
name=raw_file,
params=final_env,
)
# ============================================================
# walk
[x for x in map(add_task, raw_files)]
dag.build(force=True)
Eduardo
force=True
to dag.build
Eduardo
Alexander Saltzman
03/02/2022, 3:23 AMEduardo