This message was deleted.
# hamilton-help
s
This message was deleted.
r
And actually I would even want to be more flexible and have an x amount of transformations (that I would like to place in separate functions/nodes) that I can choose to apply to the world_of_warcraft node.
e
Yes! Two ways to do it. AFK now but we have: •
@config.when
— this allows you to optionally include certain nodes. The optional one would be gated by a config, and the downstream would take in none (if it also takes that in) or choose what to take in via the first config. •
@pipe
is the most ergonomic— you can conditionally apply transformations on a node
s
@Roel Bertens how dynamic is this need, and what drives it? That information to me seems important to determine what a good solution could be. What it sounds like is that you’re asking for a dynamic way to create edges? We do have that with @resolve combined with @inject — but that could make the code harder to read and follow. Versus, just being “verbose” and explicitly defining the alternate path(s) you’d like - i.e. with @config.when or using different modules, or using @pipe.
r
Thanks. My use case is a feature catalog. So That means that sometimes some filters are transformations need to be applied and sometimes not. Most importantly, I don’t know all the use cases upfront so the DAG needs to be somewhat future proof and facilitate different options, without becoming too complex to understand what is possible.
I’m working with @pipe so far and I’m quite happy with it. I do have a question. Because of the pipe a new node is created dynamically. How can I add an @tag to that node? Tagging the function that is referenced in the @pipe does not work.
And another question. Because I use the @pipe + the .when constructs the output of the node can be different. In my case I optionally add a column to the resulting df. How do I make sure to also dynamically use @schema.output to include the additional column based on the config?
e
So, just to confirm my mental model: 1. You have fields you want to optionally include 2. These are represented as transforms 3. You may want to toggle them on/off conditionally 4. You want these to show up in the schema you have, right? And more broadly, this is a feature catalog, right? Two things: 1. You’re going to face a configurability/legibility trade-off — if you hvae groups of features/configs (rather than toggling them individually) it’ll get much easier to manage. 2. With dataframes in/out and individual features it gets a little dicey to configure it well — this is why we have
@with_columns
for map transforms, although that won’t solve the config-driven thing.. So, I’ll show you the recipe but recommend that you thnk about the required dimension of the configuration and it might be able to get easier: First, tagging intermediate results — here’s a full POC. The
target_
has a ton of capabilities as well, so there’s probably a tool to do what you want!
Copy code
import pandas as pd

from hamilton import driver
from hamilton.function_modifiers import pipe, step, schema, tag, tag_outputs


def df() -> pd.DataFrame:
    return pd.DataFrame({"col1": [1, 2], "col2": [4, 3]})


def _op_1(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["col3"] = df["col1"] + 1


def _op_2(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["col4"] = df["col2"] + 1
    return df


@tag_outputs(
    df_processed={"role" : "final_product"},
    intermediate_1={"role" : "intermediate"},
    intermediate_2={"role" : "intermediate"},
)
@schema.output(
    ("col1", "int"),
    ("col2", "int"),
    ("col3", "int"),
    ("col4", "int"),
    target_="df_processed",
)
@pipe(
    step(_op_1).named("intermediate_1", namespace=None),
    step(_op_2).named("intermediate_2", namespace=None),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
    return df


if __name__ == '__main__':
    import __main__

    dr = driver.Builder().with_modules(__main__).build()
    dr.display_all_functions("./dag", graphviz_kwargs=dict(format="png"))
    import pprint

    pprint.pprint([(n.name, n.tags) for n in dr.list_available_variables()])
Before we go too much into the dynamic schemas, I’d be curious to understand more — do you want to turn each one on and off? Or turn a group on/off? The most powerful way is to do something with
@resolve
.If its really necessary I’d suggest wrapping it with a decorator. The strategy would be: 1. Use
@resolve
to take the inputs you use in
when
and determine the schema 2. Stick the
@schema.output
behind
resolve
so it always includes the right schema 3. Proceed as you would normally But yeah, first I’d be curious if its necessary 🙂
r
Thanks for the elaborate reply! I do want to be able to turn every transformation/function on or off. I’ll try to give a more elaborate example. I use this code now but it doesnt exactly what I want yet. The things that it doesn’t do yet: • for show_schema, only include the columns that are actually in
df_processed
based on the config • tag
with_col3
and
with_col4
nodes with
{"data_type" : "optional_data"}
Copy code
import pandas as pd

from hamilton import driver
from hamilton.function_modifiers import pipe, step, schema, tag, tag_outputs

@tag(data_type="raw_data")
def df() -> pd.DataFrame:
    return pd.DataFrame({"col1": [1, 2], "col2": [4, 3]})


def _op_1(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["col3"] = df["col1"] + 1


def _op_2(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["col4"] = df["col2"] + 1
    return df


@tag_outputs(
    df_processed={"data_type" : "intermediate_data"},
    with_col3={"data_type" : "optional_data"},
    with_col4={"data_type" : "optional_data"},
)
@schema.output(
    ("col1", "int"),
    ("col2", "int"),
    ("col3", "int"),
    ("col4", "int"),
    target_="df_processed",
)
@pipe(
    step(_op_1).named("with_col3", namespace=None).when(add_col3=True),
    step(_op_2).named("with_col4", namespace=None).when(add_col4=True),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
    return df
Oh and I create a driver like this, where
module
is the module with the code above. dr = driver.Driver({ “add_col3”: True, “add_col4": True, }, module)
I have an additional question on the config. Is it possible to do some early warning or error when users specify a config value that is not supported? E.g. in my case I would like to warn when people make a type or specify something that isn;t used anywhere
Copy code
dr = driver.Driver({
        "add__col3": True,
        "add_col4": True,
    }, module)
e
OK, makes sense. So, looking at your code, it seems that you can do anything you need (presuming it all works). That said, there might be cleaner ways. Qs: 1. Are you doing pandas or pyspark? 2. Are all your operations column-level? Wondering if 1 fn per column (that each outputs a series) is what you want, then join at the end? That way you could tag each column indicvidually then select them on query… Re: validation, yes and no: 1. You can validate that all config looks as a certain “shape” with a bit of supported customization using lifecycle methods. If you implement this class and pass it in as a list to the
adapters=
kwarg in the driver (or use
driver.Builder()
API with
with_adapters(…)
— you can write custom logic that takes in config, etc… Note we don’t expose the config in the validator API (which is somewhat more ergonomic otherwise) but I’m happy to add that. 2. We don’t know which are used or not at that point/it isn’t exposed to the user Might be worth having a “strict mode” that breaks if config is present that isn’t used, but for now that’s not exposed/easy to access. I think with a bit of convention you can build a good validator, however.
r
1. pyspark 2. operations can contain join or any kind of logic. could be adding a column but could also be extending an existing column (with a union)
Could you explain why in my example the @tag_outputs is not working, i.e. it is not adding this tag to the node
e
Ahh I’m sorry, I misread the post initially. Not at my keyboard but I’ll dig in when I am. On first glance — •
tag_outputs
should work, that may be a bug in how it interacts with schema. Will repro and test. • conditional schema is trickier but I’ll show some code that gets it to work.
🙌 1
To make the code simpler I’m wondering if the schema has to match it exactly, rather than than being a superset. The pattern we see is that Hamilton DAGs represent a superset of the available columns, and when realized create just the ones you need. With pyspark’s laziness (depending on how it’s used downstream), you actually can specify everything then just select the ones you need and not waste any compute. So you do have a choice of how much to put in config versus how much not to. Up to you (and you know your needs better), but a different strategy.
r
Yes you are right about the laziness but some of the operations I want to make optional include loading and joining additional tables which I think spark will not be able to ‘optimize away’.
With respect to the schema being a superset; you are right, but I also use these schemas for my tests and I’m still figuring out to make sure that still works
e
So yeah, will give you some code that uses “power mode” to choose, then suggest how to wrap it up in your own customization/simplify it. Then you can decide which approach you want to take — either: 1. Include everything in DAG as materialized 2. Include schema superset, just compute subset 3. Full superset and try to get spark to optimize away Then once you’re unblocked we can figure out a more ergonomic pattern — I think there’s a cleaner abstraction somewhere
r
Sounds good. Maybe we should do a short call at that point to discuss
e
Yep happy to! Let me write out some code — will be around for a call in about an hour and 20 mins if that’s not too late for you
Ok, will talk you through this on Friday (and determine if this is the right solution), but to make it more “config-driven”, you could do something like this:
Copy code
@tag_outputs(
    df_processed={"data_type" : "intermediate_data"},
    with_col3={"data_type" : "optional_data"},
    with_col4={"data_type" : "optional_data"},
)
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE, 
    resolve=lambda add_col3, add_col4 : schema.output(
        [("col1", "int"),
        ("col2", "int")] +
        [("col3", "int")] if add_col3 else [],
        [("col4", "int")] if add_col4 else [],
    target_="df_processed",
)
@pipe(
    step(_op_1).named("with_col3", namespace=None).when(add_col3=True),
    step(_op_2).named("with_col4", namespace=None).when(add_col4=True),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
    return df
Note you’ll need to run the driver with the config:
Copy code
{"add_col3" : True, "add_col4" : True, "hamilton.enable_power_user_mode" : True}
As this is a power-user feature 🙂 Given that its not particularly ergonomic, you can probably wrap this in your custom decorator, and I think this will be better if you could do something like this (not implemented):
Copy code
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE, 
    resolve=lambda cols : schema.output(
        *[(col, schema) for col, schema in cols.items()]
    target_="df_processed",
)
@pipe(
    step(_op_1).named("with_col3", namespace=None).tagged(data_type="optional_data").when_contains(cols="col3"),
    step(_op_2).named("with_col4", namespace=None).tagged(data_type="optional_data").when_contains(cols="col4"),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
    return df
Then you could even wrap
resolve
and
pipe
together in something like
@optional_features(col_3=_op_1,col_4=_op3,_ col_config_field="cols_to_include")
The missing pieces above are: •
.tagged(…)
for the
step
.when_contains
for
@config
and
@step
But I’d like to add these. FYI have a fix out for the
tag_outputs
bug! Thanks for pointing out: https://github.com/DAGWorks-Inc/hamilton/pull/697
r
This is awesome! I needed to correct the code a bit but then it does what I want
Copy code
@tag_outputs(
    df_processed={"data_type" : "intermediate_data"},
    with_col3={"data_type" : "optional_data"},
    with_col4={"data_type" : "optional_data"},
)
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE, 
    decorate_with=lambda add_col3, add_col4 : schema.output(
        *[("col1", "int"), ("col2", "int")] + 
        ([("col3", "int")] if add_col3 else []) + 
        ([("col4", "int")] if add_col4 else []),
    target_="df_processed",
    )
)
@pipe(
    step(_op_1).named("with_col3", namespace=None).when(add_col3=True),
    step(_op_2).named("with_col4", namespace=None).when(add_col4=True),
)
def df_processed(df: pd.DataFrame) -> pd.DataFrame:
    return df
Would be nice to make it more ergonomic indeed 👌
e
As a summary — met with @Roel Bertens — came up with a way to make it more ergonomic, then talked about the overall problem and a few ways Hamilton can help