Slackbot
02/08/2024, 4:23 PMRoel Bertens
02/08/2024, 4:25 PMElijah Ben Izzy
02/08/2024, 4:29 PM@config.when
— this allows you to optionally include certain nodes. The optional one would be gated by a config, and the downstream would take in none (if it also takes that in) or choose what to take in via the first config.
• @pipe
is the most ergonomic— you can conditionally apply transformations on a nodeElijah Ben Izzy
02/08/2024, 4:30 PMStefan Krawczyk
02/08/2024, 9:53 PMRoel Bertens
02/09/2024, 8:17 AMRoel Bertens
02/09/2024, 1:55 PMRoel Bertens
02/09/2024, 1:59 PMElijah Ben Izzy
02/09/2024, 5:25 PM@with_columns
for map transforms, although that won’t solve the config-driven thing..
So, I’ll show you the recipe but recommend that you thnk about the required dimension of the configuration and it might be able to get easier:
First, tagging intermediate results — here’s a full POC. The target_
has a ton of capabilities as well, so there’s probably a tool to do what you want!
import pandas as pd
from hamilton import driver
from hamilton.function_modifiers import pipe, step, schema, tag, tag_outputs
def df() -> pd.DataFrame:
return pd.DataFrame({"col1": [1, 2], "col2": [4, 3]})
def _op_1(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["col3"] = df["col1"] + 1
def _op_2(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["col4"] = df["col2"] + 1
return df
@tag_outputs(
df_processed={"role" : "final_product"},
intermediate_1={"role" : "intermediate"},
intermediate_2={"role" : "intermediate"},
)
@schema.output(
("col1", "int"),
("col2", "int"),
("col3", "int"),
("col4", "int"),
target_="df_processed",
)
@pipe(
step(_op_1).named("intermediate_1", namespace=None),
step(_op_2).named("intermediate_2", namespace=None),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
return df
if __name__ == '__main__':
import __main__
dr = driver.Builder().with_modules(__main__).build()
dr.display_all_functions("./dag", graphviz_kwargs=dict(format="png"))
import pprint
pprint.pprint([(n.name, n.tags) for n in dr.list_available_variables()])
Elijah Ben Izzy
02/09/2024, 5:29 PM@resolve
.If its really necessary I’d suggest wrapping it with a decorator.
The strategy would be:
1. Use @resolve
to take the inputs you use in when
and determine the schema
2. Stick the @schema.output
behind resolve
so it always includes the right schema
3. Proceed as you would normally
But yeah, first I’d be curious if its necessary 🙂Roel Bertens
02/13/2024, 10:35 AMdf_processed
based on the config
• tag with_col3
and with_col4
nodes with {"data_type" : "optional_data"}
import pandas as pd
from hamilton import driver
from hamilton.function_modifiers import pipe, step, schema, tag, tag_outputs
@tag(data_type="raw_data")
def df() -> pd.DataFrame:
return pd.DataFrame({"col1": [1, 2], "col2": [4, 3]})
def _op_1(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["col3"] = df["col1"] + 1
def _op_2(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["col4"] = df["col2"] + 1
return df
@tag_outputs(
df_processed={"data_type" : "intermediate_data"},
with_col3={"data_type" : "optional_data"},
with_col4={"data_type" : "optional_data"},
)
@schema.output(
("col1", "int"),
("col2", "int"),
("col3", "int"),
("col4", "int"),
target_="df_processed",
)
@pipe(
step(_op_1).named("with_col3", namespace=None).when(add_col3=True),
step(_op_2).named("with_col4", namespace=None).when(add_col4=True),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
return df
Roel Bertens
02/13/2024, 11:04 AMmodule
is the module with the code above.
dr = driver.Driver({
“add_col3”: True,
“add_col4": True,
}, module)Roel Bertens
02/13/2024, 11:06 AMdr = driver.Driver({
"add__col3": True,
"add_col4": True,
}, module)
Elijah Ben Izzy
02/13/2024, 2:16 PMadapters=
kwarg in the driver (or use driver.Builder()
API with with_adapters(…)
— you can write custom logic that takes in config, etc… Note we don’t expose the config in the validator API (which is somewhat more ergonomic otherwise) but I’m happy to add that.
2. We don’t know which are used or not at that point/it isn’t exposed to the user
Might be worth having a “strict mode” that breaks if config is present that isn’t used, but for now that’s not exposed/easy to access. I think with a bit of convention you can build a good validator, however.Roel Bertens
02/13/2024, 3:08 PMRoel Bertens
02/13/2024, 3:15 PMElijah Ben Izzy
02/13/2024, 3:34 PMtag_outputs
should work, that may be a bug in how it interacts with schema. Will repro and test.
• conditional schema is trickier but I’ll show some code that gets it to work.Elijah Ben Izzy
02/13/2024, 3:40 PMRoel Bertens
02/13/2024, 3:50 PMRoel Bertens
02/13/2024, 3:51 PMElijah Ben Izzy
02/13/2024, 3:57 PMRoel Bertens
02/13/2024, 4:07 PMElijah Ben Izzy
02/13/2024, 4:08 PMElijah Ben Izzy
02/13/2024, 4:57 PM@tag_outputs(
df_processed={"data_type" : "intermediate_data"},
with_col3={"data_type" : "optional_data"},
with_col4={"data_type" : "optional_data"},
)
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
resolve=lambda add_col3, add_col4 : schema.output(
[("col1", "int"),
("col2", "int")] +
[("col3", "int")] if add_col3 else [],
[("col4", "int")] if add_col4 else [],
target_="df_processed",
)
@pipe(
step(_op_1).named("with_col3", namespace=None).when(add_col3=True),
step(_op_2).named("with_col4", namespace=None).when(add_col4=True),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
return df
Note you’ll need to run the driver with the config:
{"add_col3" : True, "add_col4" : True, "hamilton.enable_power_user_mode" : True}
As this is a power-user feature 🙂
Given that its not particularly ergonomic, you can probably wrap this in your custom decorator, and I think this will be better if you could do something like this (not implemented):
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
resolve=lambda cols : schema.output(
*[(col, schema) for col, schema in cols.items()]
target_="df_processed",
)
@pipe(
step(_op_1).named("with_col3", namespace=None).tagged(data_type="optional_data").when_contains(cols="col3"),
step(_op_2).named("with_col4", namespace=None).tagged(data_type="optional_data").when_contains(cols="col4"),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
return df
Then you could even wrap resolve
and pipe
together in something like @optional_features(col_3=_op_1,col_4=_op3,_ col_config_field="cols_to_include")
Elijah Ben Izzy
02/13/2024, 5:06 PM.tagged(…)
for the step
• .when_contains
for @config
and @step
But I’d like to add these.
FYI have a fix out for the tag_outputs
bug! Thanks for pointing out: https://github.com/DAGWorks-Inc/hamilton/pull/697Roel Bertens
02/16/2024, 8:57 AM@tag_outputs(
df_processed={"data_type" : "intermediate_data"},
with_col3={"data_type" : "optional_data"},
with_col4={"data_type" : "optional_data"},
)
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda add_col3, add_col4 : schema.output(
*[("col1", "int"), ("col2", "int")] +
([("col3", "int")] if add_col3 else []) +
([("col4", "int")] if add_col4 else []),
target_="df_processed",
)
)
@pipe(
step(_op_1).named("with_col3", namespace=None).when(add_col3=True),
step(_op_2).named("with_col4", namespace=None).when(add_col4=True),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
return df
Roel Bertens
02/16/2024, 8:58 AMElijah Ben Izzy
02/16/2024, 5:26 PM