Trần Hoàng Nguyên
02/17/2025, 6:40 AMStefan Krawczyk
02/17/2025, 7:18 AMTrần Hoàng Nguyên
02/17/2025, 7:24 AMfetch_doc_from_source
-> classification
-> case when type a,b,c,d
-> ocr + LLM structured output type a,b,c,d
-> load_to_sink
In case an error is thrown in one of these steps (could be because the result coming back is some LLM non-sense that my validation rules consider to be no good), how does Hamilton handle retry? Or how do I write retry code on the orchestration level so that I can specify either to rerun the entire DAG or to rerun at particular step based on some pre-defined errors.Stefan Krawczyk
02/17/2025, 6:00 PMasync
you could implement your own adapter to do (1) too.
Throw some error and catch it / do something with it.
dr = driver.Builder().with_config(...).with_modules(...).with_adapters(...).build()
overrides = {}
outputs = [..]
retry_count = 0
while retry_count < 3:
try:
results = dr.execute(outputs, inputs={...}, overrides=overrids)
except SomeException as e:
# introspect and set overrides / outputs
overrides = ... # use values from 'e' somehow / or results, or adapter to build this...
return_count += 1
else:
break
# do something with results
Then in your Hamilton logic you could:
1. In you code check and throw specific errors that you catch on the outside
2. You could write some custom validators (e.g. using data quality angle) that will run on the output of the function that could throw specific errors that you could catch - if you didn’t want to put that logic into the function itself.
3. If not using async
you could use the above in conjunction with the graceful error adapter in case you wanted to run as much of the graph as possible. Note: you can also write your own adapter easily to capture this things you want to help in retrying.
Does that help?