This message was deleted.
# hamilton-help
s
This message was deleted.
👀 1
s
Thanks for the question @David Foster. I have to jump to a meeting in a few minutes - so will respond back later. NVM it’s in an hour. Question around the “dynamism”: Do you expect the configuration file to also be checked into git? or something that is created dynamically?
d
Cheers for the quick response @Stefan Krawczyk! We're looking to replace an existing process, so might be able to generate it from documentation of that process. But it would probably be manually curated as the requirements change and versioned in a separate git repo. We'd be flexible on the structure, but we thought something like this in a config file format:
Copy code
config = {"derive": [
    {
        "output_name": "derived_column_1",
        "function_name": "sum_columns",
        "function_parameters": {
            "source": {"columns_to_sum": ["column_1", "column_2"]},
            "value": {},         
        }
        }
        ]
}
I think there's a possible issue around how we get the right things wrapped in
source
and
value
here too, for dynamic examples.
But at it's root, we're looking to use hamilton for the DAGs but retain our standards of functions being: • informatively named (not using the column name) • documented based on the logic • unit tested Quite a lot of our logic is reused, which is why there's an emphasis on having the logic in one place and the DAG definition somewhere else.
s
Cool. Yeah so I think there’s a few ways, and maybe a feature or two to add to Hamilton make it easier. Let me sketch some code.
🙌 1
Option 1: Create functions that mirror the config — e.g. if it’s being checked in, instead define Hamilton functions like so — this is more verbose, but easy to modify/understand. You can organize the functions into python modules then — and then easily switch them out at Driver creation time.
Copy code
def _sum(**kwargs) -> pd.Series:
    return sum(kwargs.values())


@does(_sum) # maps to `sum_columns` in config.
def derived_column_1(column_1: pd.Series, column_3: pd.Series) -> pd.Series:
    """function name maps to output name. function arguments map to required column names"""
    pass


@does(_sum)
def derived_column_2(column_1: pd.Series, column_2: pd.Series) -> pd.Series:
    pass


if __name__ == "__main__":
    temp_module = ad_hoc_utils.create_temporary_module(derived_column_1, derived_column_2)
    input_df = pd.DataFrame({
                        "column_1": [1, 2, 3, 4, 5],
                        "column_2": [1, 1, 1, 1, 1],
                        "column_3": [2, 2, 2, 2, 2],
                        "unused_column_1": [0, 0, 0, 0, 0],
                        })
    adapter = base.SimplePythonGraphAdapter(base.PandasDataFrameResult())
    pipeline_driver = driver.Driver(input_df.to_dict(orient="series"), temp_module, adapter=adapter)

    output_columns = ['column_1', 'column_2', 'derived_column_1', 'derived_column_2']
    result = pipeline_driver.execute(output_columns)
    print(result)
(working on option 2)
Option 2: add a feature to Hamilton to enable passing in values from configuration to the decorators (or deal with updating code, that could be imported from another module, to get the python values from), in this example
extract_columns
— here we assume a single dataframe as input, and are then performing some operation. Now I like this solution less, because of the need to expose columns passed in.
Copy code
# some_module.py
# right now we don't have a way to inject this from configuration, but we could.
derived_column_names = ["derived_column_1", "derived_column_2"]
regular_column_names = ["column_1", "column_2", "column_3", "unused_column_1"]

# transforms.py
# assume a single input dataframe
@extract_columns(*some_module.derived_column_names)
def summed_dfs(
        input_df: pd.DataFrame,
        sum_config: Dict[str, List[str]]) -> pd.DataFrame:
    df = pd.DataFrame()
    for key, values in sum_config.items():
        df[key] = sum(input_df[value] for value in values)
    return df


@extract_columns(*some_module.regular_column_names)
def columns_to_expose(input_df: pd.DataFrame) -> pd.DataFrame:
    return input_df

if __name__ == "__main__":
    temp_module = ad_hoc_utils.create_temporary_module(columns_to_expose, summed_dfs)
    _input_df = pd.DataFrame({
        "column_1": [1, 2, 3, 4, 5],
        "column_2": [1, 1, 1, 1, 1],
        "column_3": [2, 2, 2, 2, 2],
        "unused_column_1": [0, 0, 0, 0, 0],
    })
    adapter = base.SimplePythonGraphAdapter(base.PandasDataFrameResult())
    _sum_config = {
        "derived_column_1": ["column_1", "column_3"],
        "derived_column_2": ["column_1", "column_2"],
    }
    pipeline_driver = driver.Driver({"input_df": _input_df, "sum_config": _sum_config}, temp_module, adapter=adapter)

    output_columns = ['column_1', 'column_2', 'derived_column_1', 'derived_column_2']
    result = pipeline_driver.execute(output_columns)
    print(result)
(working on option 3) — will have that in a bit. Have meeting now.
With option 3: we could try to implement the following API with Hamilton --- haven’t spec’ed out the changes required. But we’d need the ability to pass in
config
to a decorator, and also create a new one that doesn’t require you to ahead of time specify the function input arguments — this has downsides (e.g. can’t search for what it depends on, we’d need to figure out “typing”) — but yeah just a thought:
Copy code
@parameterized_input_group(dynamic_nodes=config("sum_config"))
def dynamic_sum(dynamic_nodes: Dict[str, source]) -> pd.Series:
    """Create column based on sum of one or more columns"""
    return sum(dynamic_nodes.values())

if __name__ == "__main__":
    _dynamic_sum_config = {
        "derived_column_1": {"dynamic_nodes": {"column_1": source("column_1"), "column_3": source("column_3")}},
        "derived_column_2": {"dynamic_nodes": {"column_1": source("column_1"), "column_2": source("column_2")}},
    }
    temp_module = ad_hoc_utils.create_temporary_module(dynamic_sum)
    _input_df = pd.DataFrame({
        "column_1": [1, 2, 3, 4, 5],
        "column_2": [1, 1, 1, 1, 1],
        "column_3": [2, 2, 2, 2, 2],
        "unused_column_1": [0, 0, 0, 0, 0],
    })
    adapter = base.SimplePythonGraphAdapter(base.PandasDataFrameResult())
    pipeline_driver = driver.Driver({"input_df": _input_df, "sum_config": _dynamic_sum_config}, temp_module, adapter=adapter)
    output_columns = ['column_1', 'column_2', 'derived_column_1', 'derived_column_2']
    result = pipeline_driver.execute(output_columns)
    print(result)
I’m sure @Elijah Ben Izzy might have another way to do this
e
Have some thoughts, will report back but these are quite clean! IMO it depends on how you want to configure this. If it’s just a static file that you load up you can get everything out of the box with (2). (1) is meant to allow easy reuse of logic while keeping things in function names (might not be your style). (3) is probably possible (need to scope out), but makes the pipeline less readable (In my humble opinion — if you prefer configuration-driven shaping then it’s kind of perfect)
s
@David Foster happy to jump on a call to walkthrough these options/ideas.
d
Thanks very much for putting these together @Stefan Krawczyk! Sorry that I missed the
@does
example in the docs, which would have covered the initial part of the problem. There's some great food for thought here. I think our team need to discuss whether we need the DAG to be defined in a dedicated config, or whether using approach (2) is suitable and concise enough. But it would be great to chat after we've clarified what we're looking for a little more
I've put together an example of how I'd like to interface with it:
Copy code
input_df = pd.DataFrame({
                    "column_1": [1, 2, 3, 4, 5],
                    "column_2": [1, 1, 1, 1, 1],
                    "unused_column_1": [0, 0, 0, 0, 0],
                    })

config = {"derive": [
    {
        "output_name": "derived_column_1",
        "function_name": "sum_columns",
        "function_parameters": {
            "columns_to_sum": ["column_1", "column_2"]
            },
        },
    {
        "output_name": "derived_column_2",
        "function_name": "add_value",
        "function_parameters": {
            "reference_column": "column_1",
            "value_to_add": 3,
            },
        }
        ]
        }


def sum_columns(columns_to_sum: List[pd.Series]) -> pd.Series:
    """Create column based on sum of one or more columns"""
    return sum(columns_to_sum)

def add_value_to_column(reference_column: pd.Series, value_to_add: Union[int, float]) -> pd.Series:
    """Add a scalar value to a column."""
    return reference_column + value_to_add


adapter = base.SimplePythonGraphAdapter(base.PandasDataFrameResult())
pipeline_driver = driver.Driver(input_df, None, adapter=adapter)

temp_module = ad_hoc_utils.create_temporary_module(sum_columns, add_value_to_column)

for derivation in config["derive"]:
    derivation_function = getattr(temp_module, derivation["function_name"])
    pipeline_driver.DAG.add_node(derivation_function, derivation["function_parameters"])

output_columns = ['column_1', 'column_2', 'derived_column_1', 'derived_column_2']
result = pipeline_driver.execute(output_columns)

print(result)
This demonstrates a few desired features: • value/source are determined by the typing on the function arguments • an alternative API, that allows the user to have more control over the node creation • an option to not pass a module to the driver, which would otherwise try to generate the nodes itself But I'm very aware that this is looking to avoid your nodes-as-functions API, so might not be something that you want to design for!
🤔 1
e
OK, so yeah, definitely a different approach, but happy to talk over. That said, I think we can have the best of both worlds. @Stefan Krawczyk and I were discussing an alternative for more “configuration-driven” pipelines that would both keep the pipeline defined in functions and give you full config power:
Copy code
config = {
    "column_sources" : {
        "derived_column_1":["column_1","column_2"], 
        "derived_column_2: :["column_3", "column_4"]}
}
dr = driver.Driver(config, modules)
Then in the function:
Copy code
@parameterize.config(
    lambda column_sources: {
        key: {'cols_to_sum' : group(*[source(item) for item in value]) for key, value in column_sources.items()})}
)
def sum_columns(cols_to_sum: List[pd.Series]):
    return sum_series(columns_to_sum)
What this does is: 1. Evaluates this decorator after the config is available 2. Utilizes the new
group
feature we’re building Effectively allowing you to make fully configuration-driven pipelines but still have everything referenceable in code and somewhat readable. For really dynamic (E.G. ones you need to mess around with a bunch), this is a good out that shouldn’t be too tricky to implement That all said, I want to (in the most friendly way) push back on the notion of config in config-files being the end-goal here. IMO the use of it is specifically for things that should change a lot. We’ve often found that expressing in python instead of config actually brings the code together with the shape of the DAG, making it super easy for you to figure out what’s going on! You only have to look at one file, and can track that easily. Once you have functions and config files, you’re jumping in-between them. While it might be initially expediant for development/alterations, I think its where a lot of other orchestration systems have failed. While you get the power of delayed execution, the code doesn’t become much more readable in my opinion, and you end up relying on the DAG representation. Anyway, some food for thought! Curious to see what you come up with. I’m prototyping out the config piece cause I think there are places where you really need to use the
config
as an out (E.G. summing some arbitrary set of columns, in your case), but I think that over-relying on it could end up with pretty messy code. Really appreciate your feedback!
d
Thanks again for looking into this - I'll keep an eye on the PR 😄 I think it does come down to the design principles - I'm looking for weaker coupling between the data definition and the logic that's used to calculate it, but the aim of your design is to make these very tightly coupled. I don't wanna change your mind (I'm not even sure where mine is at!) and I can totally see the benefit of having the DAG nodes defined as objects that can be referenced in the code. This does make it easier to follow the data, but IMO the more general readability of the code is impacted by using decorators that are acting entirely through side effects or defining functions that aren't actually functions (in the case of
@does
).
e
Yeah! So, this is all enabled (or will be shortly) with these two PRs: config-driven and grouped-inputs. Basically the first one makes it so you can use any sort of config to define DAG-shape, and the second means you can inject a list in. This is something we’ve been meaning to do for a while (and have some OS users asking for it), so I’m pretty excited. That said, we buried the first one (config-driven) behind a “power-user” mode 🙂 Goal being that we want people to try to represent their DAG as logically/statically as possible before bailing out and writing it all in config. Happy to walk you through how it works! Would love to catch up and get your thoughts on readability. The TL;DR of our design that we want readability to be the primary goal but occasionally need an out — data pipelines are complicated and can’t always be expressed in all the configurations one wants in a way that anyone looking at code can undersatnd. Feedback on APIs/readability is super valuable — would love to hear more about what you like and don’t! Long-winded explanation ahead 🙂 Our design philosophy comes from having seen a lot of dealing with super procedurally-specified codebases. E.G. stuff like this:
Copy code
df = ...
df['foo'] = ...
df['foo_again'] = ...
df['bar'] = f(df['foo'], df['foo_again'])
df['baz'] = g(df['bar']) if not alt else h(df['bar'])
And how you might see it in a delayed execution framework:
Copy code
foo = Step1()
foo_again = Step1('foo')
bar = Step2()
baz = Step3() if alt else Step3Alt(specific_alt_param)
pipeline = baz(bar(foo(), foo_again())
We found this often got really messy, especially when config was involved. So the core design philosophy is about decoupling the definition from the execution, but not the shape from the code itself, as we consider those to be quite linked. While config files are pretty common — Hamilton tries to do (in the majority of cases) without one — we find it much cleaner to put this in the code itself. That said, there are plenty of cases where this doesn’t work perfectly, which is why we added decorators. The way I think of them is a trade-off — when the extra verbosity Hamilton brings doesn’t help, decorators allow you to avoid it, but at a slight readability cost. We’re trying to reduce the readability cost (and would love feedback!). •
@parameterize
allows you to avoid repeating definitions •
@does
allows you to avoid repeating logic (although I agree its not the best — the cleaner way to do this is just use a helper function that starts with
_
, I think we regret
@does
in the first place 😆 ) •
@check_output
allows you to add data quality/assertions in the pipeline and control how its executed •
@inject(param=group(source('input_1'), source('input_2'), …)
allows you to utilize a list of values and inject them into an argument in case you want a little more flexibility. And furthermore, there are some places where the shape of the DAG isn’t actually static: •
@config.when(key=value)
allows you to swap out implementations/DAG shapes at compile time •
@resolve(when=CONFIG_AVAILABLE, lambda config_item: decorator)
(in the new PR) allows you to utilize the config to determine how to decorate it. While it does decrease readability, in our experience it generally shows up once or twice in a codebase — allowing you to, for example, join/aggregate a dynamic number of columns. Anyway, the goal was to make it so that you could search the codebase for where a node is defined and automatically determine how its implemented without having to search around/click into its implementation in a different place — we’ve found fewer hops helps make pipelines easier to read/grok. When one really needs extra flexibility, we have some outs for them that we try to optimize for readability (although there’s only so much we can do).
s
@David Foster we just pushed 1.20.0 see #C03AJNGDGQL. It gives you more flexibility to be configuration driven.
Happy to find time to walk you through it 🙂