Cooper Snyder
10/07/2024, 9:48 PMclass OrchestratableTask(BaseModel):
def setup(self, *args, **kwargs):
#environment, application, runtime specific setup.
def extract(self, *args, **kwargs):
#external state and external data from target system
def run_pure_transform(self, *args, **kwargs):
#pure, deterministic (enough) function based on inputs
def load(self, *args, **kwargs):
# load results to external database
def run_transform_w_io_side_effects(self, *args, **kwargs):
extracted_data = self.extract()
transformed_data = self.run_pure_transform(extracted_data)
self.load(extracted_data)
if __name__ == __main__:
# add arg parser
task = OrchestratableTask()
task.setup(*args, **kwargs)
task.run_transform_w_io_side_effects(*args, **kwargs)
where id have like a command/strategy pattern with args and kwargs controlling the behavior of the functions flow (i know it'd go into those config when decorators), and have whatever business logic right there in the transform flow, but im running into the code smells of mixing object oriented with functional, doing like a hamilton dag for each step and then another hamilton dag for those dags (i dont think this works well...)
but im feeling a bit analysis paralysis; has anyone run into this idea or anything like it? any criticism for that design? I feel like from reading the docs, idiomatically you'd just make it one hamilton dag with the dataloader and datasavers and config.when decorators, but I REALLY wanted to try to make it obvious to developers that those are the main 4 abstractions required for a singular OrchestratableTask and let someone pip install package that houses all of the subclass tasks and be able to run the pure function however they like in a discovery environment like a notebook.
Is this overcomplicating it with the Task class?
Thank you!Elijah Ben Izzy
10/07/2024, 10:02 PMsetup.py
— any env stuff (I’d recommend considering whether you want this in the driver or in the Hamilton DAG, both are feasible, but you could also make a pre-graph-execute hook
• extract.py
- all tooling for loading data
• transform.py
— tooling for transforming data (pure functions)
• load.py
— tooling for saving data
Then your driver can choose which to run:
# run with test data, no side effects
dr = driver.with_modules(transform).build()
# inputs declared in the transform module, would be produced by the extract module
dr.execute(["my_artifact"], inputs={"dependent_dataset" : pd.DataFrame(...)})
# run with just extract + transform:
dr = driver.with_modules(extract, transform).build()
# inputs declared in transform module, E.G. need to provide a path
res = dr.execute(["my_artifact"], inputs={"path" : ...})
You can imagine making this more complex, but the nice thing is that you have a very simple structure. IMO this is generally a good approach, but it requires you to be OK structuring your project in a certain way.
I’d recommend doing some higher-level stuff to wrap the above in an API like you had, allowing you to make modifications as you want.
There are a few other solutions as well that you might want to explore:
1. @config.when
allows you to use the config to toggle things on/off. This could allow you to specify a config var stage
= extract
, load
, transform
, but you’ll have to do a custom config decorator (E.G. create one that is called @stage()
and pass in a list of stages to the driver
2. You could use overrides as well — E.G. pass in the overrides from the prior stage. You can use tags/list out variables, but there’s a bit of driver algebra to do. We could expose a “subset” functionality, which wouldn’t be crazy (allowing you to subset based on tags)
3. Materializers can help decouple IO, but move a little flexibility out of the user’s hands. This is good if you want a common set of loaders/Stefan Krawczyk
10/08/2024, 12:51 AMCooper Snyder
10/09/2024, 10:55 AM