Fefun
04/29/2024, 1:17 PMor
condition with the .when()
construct?
For instance, suppose I want to run _some_function1()
when either param1=True
or param2=True
, and I have the following base code:
from hamilton.function_modifiers import step, pipe, source
from my_module import _some_function1, _some_function2
@pipe(
step(_some_function1, node1=source("node1")).named("add_1", namespace=None).when(param1=True),
step(_some_function2, node2=source("node2")).named("add_2", namespace=None).when(param2=True),
)
def some_node(df: 1, param1: bool, param2: bool) -> int:
return df
Where:
- param1
controls when _some_function1()
should run.
- param2
controls when _some_function2()
should run.
Both functions add columns to the same dataframe.
However _some_function2()
has a dependency on the columns generated by _some_function1()
So, when param1 == True
, then _some_function1()
runs and that's it.
However, when param2 == True
, then both _some_function1()
and then _some_function2()
need to run.
I found a way to make this work by doing this:
from hamilton.function_modifiers import step, pipe, source
from my_module import _some_function1, _some_function2
@pipe(
step(_some_function1, node1=source("node1")).named("add_1", namespace=None).when(param1=True),
step(_some_function1, node1=source("node1")).named("add_1_and_2", namespace=None).when(param2=True),
step(_some_function2, node2=source("node2")).named("add_2", namespace=None).when(param2=True),
)
def some_node(df: 1, param1: bool, param2: bool) -> int:
return df
And then in _some_function1()
I added a first check to see if it already ran (by looking the columns available).
But I get the feeling that I'm stretching a bit too far with how the @pipe decorator should be used 😅Elijah Ben Izzy
04/29/2024, 2:17 PM@config.when
and building it with pipe?Fefun
04/29/2024, 2:25 PMElijah Ben Izzy
04/29/2024, 2:32 PMFefun
04/29/2024, 2:54 PMElijah Ben Izzy
04/29/2024, 4:19 PM@step
makes a lot of sense as pyspark dataframess are linear. That said, you may want to look into @with_columns
— you can have it be a submodule that runs column-level operations defined in a module: https://hamilton.dagworks.io/en/latest/reference/decorators/with_columns/. Then you can have @config.when
on those and the config will get passed through…Fefun
04/29/2024, 6:30 PM@with_columns
yet. I'll play with it a bit. It sounds like that might be the answer.Elijah Ben Izzy
04/29/2024, 6:38 PMwith_columns
might just solve it — it abstracts away the set of transformations to allow you to build specify them in a module and then it topologically sorts it. You can use @config.when
to handle that.
The nice thing is that you can then take that module and run it on a smaller dataset/test it.Elijah Ben Izzy
04/29/2024, 6:47 PM# transforms.py
@config.when(param1=True)
def add_1(node1: pd.Series) -> pd.Series:
return ....
@config.when(param2=True)
def add_1_and_2(node1: pd.Series, node2: pd.Series) -> pd.Series:
return ...
@config.when(param2=True)
def add_2(node2: pd.Series) -> pd.Series:
return ...
Then, in another module:
@with_columns(
load_from=[transforms],
)
def processed_dataset(input_df: ps.DataFrame) -> ps.DataFrame:
# input_df has add_1, add_2, and add_1_and_2 optionally applied
return input_df # or process it
Basically what happens is that the transforms in the module get applied as a subdag to the dataframe, then those get run, and you get the resulting dataframe. They’re often done as pandas series, but they can be dataframe-dataframe transforms as well. See this blog post for more + background: https://blog.dagworks.io/p/expressing-pyspark-transformations.Thierry Jean
04/30/2024, 12:21 AM