Slackbot
03/08/2023, 6:49 PMStefan Krawczyk
03/08/2023, 6:56 PMDavid Foster
03/08/2023, 7:02 PMconfig = {"derive": [
{
"output_name": "derived_column_1",
"function_name": "sum_columns",
"function_parameters": {
"source": {"columns_to_sum": ["column_1", "column_2"]},
"value": {},
}
}
]
}
I think there's a possible issue around how we get the right things wrapped in source
and value
here too, for dynamic examples.David Foster
03/08/2023, 7:04 PMStefan Krawczyk
03/08/2023, 7:14 PMStefan Krawczyk
03/08/2023, 7:29 PMdef _sum(**kwargs) -> pd.Series:
return sum(kwargs.values())
@does(_sum) # maps to `sum_columns` in config.
def derived_column_1(column_1: pd.Series, column_3: pd.Series) -> pd.Series:
"""function name maps to output name. function arguments map to required column names"""
pass
@does(_sum)
def derived_column_2(column_1: pd.Series, column_2: pd.Series) -> pd.Series:
pass
if __name__ == "__main__":
temp_module = ad_hoc_utils.create_temporary_module(derived_column_1, derived_column_2)
input_df = pd.DataFrame({
"column_1": [1, 2, 3, 4, 5],
"column_2": [1, 1, 1, 1, 1],
"column_3": [2, 2, 2, 2, 2],
"unused_column_1": [0, 0, 0, 0, 0],
})
adapter = base.SimplePythonGraphAdapter(base.PandasDataFrameResult())
pipeline_driver = driver.Driver(input_df.to_dict(orient="series"), temp_module, adapter=adapter)
output_columns = ['column_1', 'column_2', 'derived_column_1', 'derived_column_2']
result = pipeline_driver.execute(output_columns)
print(result)
Stefan Krawczyk
03/08/2023, 7:35 PMStefan Krawczyk
03/08/2023, 7:47 PMextract_columns
â here we assume a single dataframe as input, and are then performing some operation. Now I like this solution less, because of the need to expose columns passed in.
# some_module.py
# right now we don't have a way to inject this from configuration, but we could.
derived_column_names = ["derived_column_1", "derived_column_2"]
regular_column_names = ["column_1", "column_2", "column_3", "unused_column_1"]
# transforms.py
# assume a single input dataframe
@extract_columns(*some_module.derived_column_names)
def summed_dfs(
input_df: pd.DataFrame,
sum_config: Dict[str, List[str]]) -> pd.DataFrame:
df = pd.DataFrame()
for key, values in sum_config.items():
df[key] = sum(input_df[value] for value in values)
return df
@extract_columns(*some_module.regular_column_names)
def columns_to_expose(input_df: pd.DataFrame) -> pd.DataFrame:
return input_df
if __name__ == "__main__":
temp_module = ad_hoc_utils.create_temporary_module(columns_to_expose, summed_dfs)
_input_df = pd.DataFrame({
"column_1": [1, 2, 3, 4, 5],
"column_2": [1, 1, 1, 1, 1],
"column_3": [2, 2, 2, 2, 2],
"unused_column_1": [0, 0, 0, 0, 0],
})
adapter = base.SimplePythonGraphAdapter(base.PandasDataFrameResult())
_sum_config = {
"derived_column_1": ["column_1", "column_3"],
"derived_column_2": ["column_1", "column_2"],
}
pipeline_driver = driver.Driver({"input_df": _input_df, "sum_config": _sum_config}, temp_module, adapter=adapter)
output_columns = ['column_1', 'column_2', 'derived_column_1', 'derived_column_2']
result = pipeline_driver.execute(output_columns)
print(result)
Stefan Krawczyk
03/08/2023, 7:58 PMStefan Krawczyk
03/08/2023, 9:41 PMconfig
to a decorator, and also create a new one that doesnât require you to ahead of time specify the function input arguments â this has downsides (e.g. canât search for what it depends on, weâd need to figure out âtypingâ) â but yeah just a thought:
@parameterized_input_group(dynamic_nodes=config("sum_config"))
def dynamic_sum(dynamic_nodes: Dict[str, source]) -> pd.Series:
"""Create column based on sum of one or more columns"""
return sum(dynamic_nodes.values())
if __name__ == "__main__":
_dynamic_sum_config = {
"derived_column_1": {"dynamic_nodes": {"column_1": source("column_1"), "column_3": source("column_3")}},
"derived_column_2": {"dynamic_nodes": {"column_1": source("column_1"), "column_2": source("column_2")}},
}
temp_module = ad_hoc_utils.create_temporary_module(dynamic_sum)
_input_df = pd.DataFrame({
"column_1": [1, 2, 3, 4, 5],
"column_2": [1, 1, 1, 1, 1],
"column_3": [2, 2, 2, 2, 2],
"unused_column_1": [0, 0, 0, 0, 0],
})
adapter = base.SimplePythonGraphAdapter(base.PandasDataFrameResult())
pipeline_driver = driver.Driver({"input_df": _input_df, "sum_config": _dynamic_sum_config}, temp_module, adapter=adapter)
output_columns = ['column_1', 'column_2', 'derived_column_1', 'derived_column_2']
result = pipeline_driver.execute(output_columns)
print(result)
Stefan Krawczyk
03/08/2023, 9:43 PMElijah Ben Izzy
03/08/2023, 9:52 PMStefan Krawczyk
03/08/2023, 11:19 PMDavid Foster
03/09/2023, 9:27 AM@does
example in the docs, which would have covered the initial part of the problem.
There's some great food for thought here. I think our team need to discuss whether we need the DAG to be defined in a dedicated config, or whether using approach (2) is suitable and concise enough. But it would be great to chat after we've clarified what we're looking for a little moreDavid Foster
03/09/2023, 11:45 AMinput_df = pd.DataFrame({
"column_1": [1, 2, 3, 4, 5],
"column_2": [1, 1, 1, 1, 1],
"unused_column_1": [0, 0, 0, 0, 0],
})
config = {"derive": [
{
"output_name": "derived_column_1",
"function_name": "sum_columns",
"function_parameters": {
"columns_to_sum": ["column_1", "column_2"]
},
},
{
"output_name": "derived_column_2",
"function_name": "add_value",
"function_parameters": {
"reference_column": "column_1",
"value_to_add": 3,
},
}
]
}
def sum_columns(columns_to_sum: List[pd.Series]) -> pd.Series:
"""Create column based on sum of one or more columns"""
return sum(columns_to_sum)
def add_value_to_column(reference_column: pd.Series, value_to_add: Union[int, float]) -> pd.Series:
"""Add a scalar value to a column."""
return reference_column + value_to_add
adapter = base.SimplePythonGraphAdapter(base.PandasDataFrameResult())
pipeline_driver = driver.Driver(input_df, None, adapter=adapter)
temp_module = ad_hoc_utils.create_temporary_module(sum_columns, add_value_to_column)
for derivation in config["derive"]:
derivation_function = getattr(temp_module, derivation["function_name"])
pipeline_driver.DAG.add_node(derivation_function, derivation["function_parameters"])
output_columns = ['column_1', 'column_2', 'derived_column_1', 'derived_column_2']
result = pipeline_driver.execute(output_columns)
print(result)
This demonstrates a few desired features:
⢠value/source are determined by the typing on the function arguments
⢠an alternative API, that allows the user to have more control over the node creation
⢠an option to not pass a module to the driver, which would otherwise try to generate the nodes itself
But I'm very aware that this is looking to avoid your nodes-as-functions API, so might not be something that you want to design for!Elijah Ben Izzy
03/09/2023, 2:27 PMconfig = {
"column_sources" : {
"derived_column_1":["column_1","column_2"],
"derived_column_2: :["column_3", "column_4"]}
}
dr = driver.Driver(config, modules)
Then in the function:
@parameterize.config(
lambda column_sources: {
key: {'cols_to_sum' : group(*[source(item) for item in value]) for key, value in column_sources.items()})}
)
def sum_columns(cols_to_sum: List[pd.Series]):
return sum_series(columns_to_sum)
What this does is:
1. Evaluates this decorator after the config is available
2. Utilizes the new group
feature weâre building
Effectively allowing you to make fully configuration-driven pipelines but still have everything referenceable in code and somewhat readable. For really dynamic (E.G. ones you need to mess around with a bunch), this is a good out that shouldnât be too tricky to implement
That all said, I want to (in the most friendly way) push back on the notion of config in config-files being the end-goal here. IMO the use of it is specifically for things that should change a lot. Weâve often found that expressing in python instead of config actually brings the code together with the shape of the DAG, making it super easy for you to figure out whatâs going on! You only have to look at one file, and can track that easily. Once you have functions and config files, youâre jumping in-between them. While it might be initially expediant for development/alterations, I think its where a lot of other orchestration systems have failed. While you get the power of delayed execution, the code doesnât become much more readable in my opinion, and you end up relying on the DAG representation.
Anyway, some food for thought! Curious to see what you come up with. Iâm prototyping out the config piece cause I think there are places where you really need to use the config
as an out (E.G. summing some arbitrary set of columns, in your case), but I think that over-relying on it could end up with pretty messy code.
Really appreciate your feedback!David Foster
03/13/2023, 10:12 AM@does
).Elijah Ben Izzy
03/13/2023, 4:11 PMdf = ...
df['foo'] = ...
df['foo_again'] = ...
df['bar'] = f(df['foo'], df['foo_again'])
df['baz'] = g(df['bar']) if not alt else h(df['bar'])
And how you might see it in a delayed execution framework:
foo = Step1()
foo_again = Step1('foo')
bar = Step2()
baz = Step3() if alt else Step3Alt(specific_alt_param)
pipeline = baz(bar(foo(), foo_again())
We found this often got really messy, especially when config was involved. So the core design philosophy is about decoupling the definition from the execution, but not the shape from the code itself, as we consider those to be quite linked. While config files are pretty common â Hamilton tries to do (in the majority of cases) without one â we find it much cleaner to put this in the code itself.
That said, there are plenty of cases where this doesnât work perfectly, which is why we added decorators. The way I think of them is a trade-off â when the extra verbosity Hamilton brings doesnât help, decorators allow you to avoid it, but at a slight readability cost. Weâre trying to reduce the readability cost (and would love feedback!).
⢠@parameterize
allows you to avoid repeating definitions
⢠@does
allows you to avoid repeating logic (although I agree its not the best â the cleaner way to do this is just use a helper function that starts with _
, I think we regret @does
in the first place đ )
⢠@check_output
allows you to add data quality/assertions in the pipeline and control how its executed
⢠@inject(param=group(source('input_1'), source('input_2'), âŚ)
allows you to utilize a list of values and inject them into an argument in case you want a little more flexibility.
And furthermore, there are some places where the shape of the DAG isnât actually static:
⢠@config.when(key=value)
allows you to swap out implementations/DAG shapes at compile time
⢠@resolve(when=CONFIG_AVAILABLE, lambda config_item: decorator)
(in the new PR) allows you to utilize the config to determine how to decorate it. While it does decrease readability, in our experience it generally shows up once or twice in a codebase â allowing you to, for example, join/aggregate a dynamic number of columns.
Anyway, the goal was to make it so that you could search the codebase for where a node is defined and automatically determine how its implemented without having to search around/click into its implementation in a different place â weâve found fewer hops helps make pipelines easier to read/grok. When one really needs extra flexibility, we have some outs for them that we try to optimize for readability (although thereâs only so much we can do).Stefan Krawczyk
03/20/2023, 8:24 PMStefan Krawczyk
03/20/2023, 8:25 PM