Hey everyone, Is it possible to specify the equiv...
# hamilton-help
f
Hey everyone, Is it possible to specify the equivalent of an
or
condition with the
.when()
construct? For instance, suppose I want to run
_some_function1()
when either
param1=True
or
param2=True
, and I have the following base code:
Copy code
from hamilton.function_modifiers import step, pipe, source
from my_module import _some_function1, _some_function2

@pipe(
    step(_some_function1, node1=source("node1")).named("add_1", namespace=None).when(param1=True),
    step(_some_function2, node2=source("node2")).named("add_2", namespace=None).when(param2=True),
)
def some_node(df: 1, param1: bool, param2: bool) -> int:
    return df
Where: -
param1
controls when
_some_function1()
should run. -
param2
controls when
_some_function2()
should run. Both functions add columns to the same dataframe. However
_some_function2()
has a dependency on the columns generated by
_some_function1()
So, when
param1 == True
, then
_some_function1()
runs and that's it. However, when
param2 == True
, then both
_some_function1()
and then
_some_function2()
need to run. I found a way to make this work by doing this:
Copy code
from hamilton.function_modifiers import step, pipe, source
from my_module import _some_function1, _some_function2

@pipe(
    step(_some_function1, node1=source("node1")).named("add_1", namespace=None).when(param1=True),
    step(_some_function1, node1=source("node1")).named("add_1_and_2", namespace=None).when(param2=True),
    step(_some_function2, node2=source("node2")).named("add_2", namespace=None).when(param2=True),
)
def some_node(df: 1, param1: bool, param2: bool) -> int:
    return df
And then in
_some_function1()
I added a first check to see if it already ran (by looking the columns available). But I get the feeling that I'm stretching a bit too far with how the @pipe decorator should be used 😅
e
This is very clever but I think we can make it easier. To be clear, are you using this just with pipe, or attempting to use
@config.when
and building it with pipe?
f
Let me build a more representative example of what I'm actually using, and I'll share it here!
👍 1
e
Thanks!
f
It's quite tricky to create a dummy example haha. There might be mistakes/typos in there, but you can assume that the real code on my side works as expected. I'm just curious to know if there is a better way to achieve what I want. For some context: I would like some_node to be a PySpark DataFrame users on the other side can request. With the ability to generate - or not - certain feature columns with just the config. And ideally they shouldn't need to worry about what function needs to run when. I would prefer to handle that in the code.
e
This is exactly @Roel Bertens’s case (is it based off of it?). To me this is a question of how much you want to fit inside the DAG versus inside the functions.
@step
makes a lot of sense as pyspark dataframess are linear. That said, you may want to look into
@with_columns
— you can have it be a submodule that runs column-level operations defined in a module: https://hamilton.dagworks.io/en/latest/reference/decorators/with_columns/. Then you can have
@config.when
on those and the config will get passed through…
f
Yes indeed, this is based off of Roel's case. In fact, we were colleagues! He introduced me to Hamilton, I liked it, and now I'm introducing it to my team. The basic functionalities of Hamilton answer a lot of the challenges we're facing. Though the above is as far as we're going to go in terms of "complexity", and I think it's manageable with proper doc, I was hoping there might be a neater way to achieve what I wanted. The idea for these functions is that they are general enough to run on multiple nodes. The team I'm part of does a lot of experimentation/exploration. What we need per use case varies greatly. Which makes it quite challenging to define which parts of the DAG should be static and which shouldn't. Either way, thanks for the reference! I haven't had a look at
@with_columns
yet. I'll play with it a bit. It sounds like that might be the answer.
🙌 1
e
Nice! Cool to hear 🙂 I think there’s a pattern that we could do with custom decorators to make this simpler. That said,
with_columns
might just solve it — it abstracts away the set of transformations to allow you to build specify them in a module and then it topologically sorts it. You can use
@config.when
to handle that. The nice thing is that you can then take that module and run it on a smaller dataset/test it.
Here’s a quick example:
Copy code
# transforms.py

@config.when(param1=True)
def add_1(node1: pd.Series) -> pd.Series:
    return ....

@config.when(param2=True)
def add_1_and_2(node1: pd.Series, node2: pd.Series) -> pd.Series:
    return ...

@config.when(param2=True)
def add_2(node2: pd.Series) -> pd.Series:
    return ...
Then, in another module:
Copy code
@with_columns(
    load_from=[transforms],
)
def processed_dataset(input_df: ps.DataFrame) -> ps.DataFrame:
    # input_df has add_1, add_2, and add_1_and_2 optionally applied
    return input_df # or process it
Basically what happens is that the transforms in the module get applied as a subdag to the dataframe, then those get run, and you get the resulting dataframe. They’re often done as pandas series, but they can be dataframe-dataframe transforms as well. See this blog post for more + background: https://blog.dagworks.io/p/expressing-pyspark-transformations.
🙌 2
t
@Elijah Ben Izzy I think that would solve a very common pattern in a cleaner way (easier to make sense of at least) than previous approaches