Slackbot
04/06/2023, 11:53 AMElijah Ben Izzy
04/06/2023, 1:16 PM@parameterized_subdag
allows you to process each one individually. Something like:
dr = driver.Driver(*modules, config={})
# the following is a *fixed* set of files
result = dr.execute(
...,
inputs={
'file_1_path' : 'path_for_file_1',
'file_2_path' : 'path_for_file_2',
'file_3_path' : 'path_for_file_3',
...
}
)
Then, in your parameterized_subdag
, you can refer to each one specifically:
@parameterized_subdag(
file_processing_module,
parameterization={
"file_1_processed" : {"inputs" : {"path" : source("file_1")}},
"file_2_processed" : {"inputs" : {"path" : source("file_2)}},
"file_3_processed" : {"inputs" : {"path" : source("file_3)}},
...}
)
def file_n_processed(final_result_from_subdag: pd.DataFrame) -> pd.DataFrame:
return final_result_from_subdag
...
def merged(
file_1_processed: pd.DataFrame,
file_2_processed: pd.DataFrame,
file_3_processed: pd.DataFrame, ...) -> pd.DataFrame:
return ...
Elijah Ben Izzy
04/06/2023, 1:27 PM@resolve
to create the nodes after the configuration is available: https://hamilton.readthedocs.io/en/latest/reference/api-reference/decorators.html#resolve. Specifically, instead of having a static set of nodes, we use the config you pass the driver to do it. Something like this:
dr = driver.Driver(
*modules,
config={'files' :
{'file_1' : 'file_1_path.csv', 'file_2': 'file_2_path.csv', 'file_3' : 'file_3_path.csv', ...},
'hamilton.enable_power_user_mode' : True})
Then you can use @parameterized_subdag
behind resolve
to create a dynamic number of processed files, one from each config item passed in. Note the name of the parameter in the lambda is files
, same as the key in the config.
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
lambda files: parameterized_subdag(
file_processing_module,
parameterization={
{input_name: {"inputs" : {"file_name": value(input_value) for input_name, input_value in files.items}
}
)
)
def file_n_processed(final_result_from_subdag: pd.DataFrame) -> pd.DataFrame:
return final_result_from_subdag
Then you can use resolve
+ inject
to inject them all into a function:
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
lambda files: inject(processed_files=group(**{key: source(key) for key in files})
)
def joined_files(processed_files: Dict[str, pd.DataFrame]) -> pd.DataFrame:
# logic to join
Elijah Ben Izzy
04/06/2023, 1:33 PMElijah Ben Izzy
04/06/2023, 1:35 PMdef processed_files(file_list: List[str]) -> pd.DataFrame:
results = []
for file in file_list:
results.append(process_file) # large amount of LOC that could be broken into functions
return join_logic(results)
Also doesn’t have the possibiliity of parallelism/caching specific pieces, but could be nice in a pinch.Stefan Krawczyk
04/06/2023, 11:01 PMvalues_to_use = load_from_file(...)
@paramaterized_subdag(..., parameterization={... from values_to_use})
def my_subdag(...) -> ...
isn’t unreasonable either.
The design choice is really about where do you want the friction for maintaining things? What do you want a pull request to change, and where?Amos
04/10/2023, 12:41 AMAmos
04/10/2023, 6:11 AM{'file_1': 'some_path.csv'}
) and the loading_funcs
module just canotains a function def read_file(path: str) -> pd.DataFrame: ...
Then I have :
@parameterized_subdag(
loading_funcs,
parameterization={
"file_1_processed" : {"inputs" : {"path" : source("file_1")}},
"file_2_processed" : {"inputs" : {"path" : source("file_2")}},
},
)
def read_files(read_file: pd.DataFrame) -> pd.DataFrame:
"""Read files from a folder."""
return read_file
def merged(file_1_processed: pd.DataFrame, file_2_processed: pd.DataFrame) -> pd.DataFrame:
"""Merge files."""
return pd.concat([file_1_processed, file_2_processed], axis=1)
which I try to run by calling dr.execute(['merged'], inputs=my_mapping)
. But it doesn't generate the nodes I expect.
ValueError: 2 errors encountered:
Error: Required input file_1_processed not provided for nodes: ['merged'].
Error: Required input file_2_processed not provided for nodes: ['merged'].
Seems I have not wired the graph correctly, which I guess is to do with the parameterisation and function naming in the subdag. Any further tips? ThanksElijah Ben Izzy
04/10/2023, 4:00 PMElijah Ben Izzy
04/10/2023, 4:30 PM@parameterized_subdag(
read_file,
file_1_processed={"inputs" : {"path" : source("file_1")}},
file_2_processed={"inputs" : {"path" : source("file_2")}}
)
def read_files(read_file: pd.DataFrame) -> pd.DataFrame:
"""Read files from a folder."""
return read_file
However, the two examples are different here: https://hamilton.readthedocs.io/en/latest/reference/api-reference/decorators.html#parameterized-subdagElijah Ben Izzy
04/10/2023, 4:31 PMElijah Ben Izzy
04/10/2023, 5:56 PMAmos
04/10/2023, 11:49 PMAmos
04/11/2023, 11:15 AM@parameterized_subdag
example above to accept a variable number of inputs using @resolve
, would one write a helper function essentially to composes and returns the blelow (or equiv) from a dictionary of inputs? In this context, connector_thinger
is a DAG node that takes an iput from the DAG's config
dict. Would that still work?
parameterized_subdag(
per_file_transforms,
file_1_processed={
"inputs": {
"file_params": source("file_1"),
"connector_thinger": source("connector_thinger"),
},
},
file_2_processed={
"inputs": {
"file_params": source("file_2"),
"connector_thinger": source("connector_thinger"),
},
},
...
)
Secondly, what is the interaction between config
in a DAG and a subDAG? The API reference for @subdag
says its config dict "takes precedence over the DAG's config". Does that mean it replaces it or updates it or …?
I'm fine loading the DAG's inputs
and config
from YAML but haven't quite mastered passing external parameterisation through to the subDAG. What's the easiest way of doing that?
Thanks!Elijah Ben Izzy
04/11/2023, 3:44 PMresolve
, the trick with resolve
is that the parameter name corresponds to a field in the config dict. E.G:
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda file_names: parameterized_subdag(
per_file_transforms,
**{file_name + "_processed": {
"inputs" : {
"file_params" : source(file_name),
"connector_thinger" : source("connector_thinger")} for file_name in file_names}
)
)
def ...
Then you’d instantiate a driver with driver.Driver({'file_names': ['file_1', 'file_2', 'file_3']})
Elijah Ben Izzy
04/11/2023, 3:53 PMconfig
from the DAG gets passed to the subdag
2. If the subdag has its own config
, it updates (not replaces) the original config
Same with inputs
.
Re: loading `inputs`/`config` from yaml, first we recommend putting as much as you can in the code first, but then you can just load up your config/inputs before the driver and pass it in.
config = load_config_from_yaml(...)
inputs = load_inputs_from_yaml(...)
dr = driver.Driver(config, ...)
dr.execute(..., inputs=inputs)
Simple as that!Stefan Krawczyk
04/11/2023, 6:38 PMresolve
you needn’t define the function inline with a lambda - you can pass in a function that does that - just remember to use _
in front of it if you’re defining it in the same python module it’ll be used in.Amos
04/12/2023, 9:00 AM# Define a YAML string
yaml_str = """
import:
type: csv
file_path: data.csv
transformations:
- method: dropna
- method: rename
args:
columns:
old_column_name: new_column_name
"""
...
import pandas as pd
import yaml
class BaseImporter:
def __init__(self, config):
self.config = config
def read_data(self):
raise NotImplementedError
class CsvImporter(BaseImporter):
def read_data(self):
return pd.read_csv(self.config["file_path"])
class Transformer:
def __init__(self, config, custom_functions={}):
self.config = config
self.custom_functions = custom_functions
def apply_transformations(self, df):
for transformation in self.config:
method = transformation["method"]
args = transformation.get("args", {})
if method in self.custom_functions:
func = self.custom_functions[method]
df = func(df, **args)
else:
df = getattr(df, method)(**args)
return df
def import_and_transform(yaml_config_file):
with open(yaml_config_file, "r") as file:
config = yaml.safe_load(file)
# Create the appropriate importer based on the config
importer_class = {
"csv": CsvImporter,
}[config["import"]["type"]]
importer = importer_class(config["import"])
# Read data using the importer
data = importer.read_data()
# Apply transformations
transformer = Transformer(config["transformations"])
transformed_data = transformer.apply_transformations(data)
return transformed_data
The point is to dynamically cater for a minimal set of clean-up operations without writing extra code. I'm trying at the moment to see if I can prototype that in Hamilton. In something closer to a real-world example, the YAML might be marginally more complicated. At this stage I can change the structure.
yaml_str = """
config:
io_config:
project_key: SOMEPROJ
folder_name: SOMEFLDR
inputs:
file_1:
file_path: some_data.csv
file_type: csv
load_args:
usecols:
- MYCOLUMN
transformations:
etc.
...
It makes sense to run this first stage in a subDAG. At the moment I instantiate a data connector in the DAG and pass it in, along with the params from the YAML … something like this, as modified from your guidance.
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda inputs: parameterized_subdag(
per_file_transforms,
**{
file_id
+ "_processed": {
"inputs": {
"file_params": file_params,
"folder_handle": source("folder_handle"),
},
}
for file_id, file_params in inputs.items()
},
),
)
def read_files(
...
But the file_params
part is problematic because hamilton wants to call methods on it that suggest it should be specified as an upstream dependency. I guess I need to break up the inputs dict somehow with a node before this one? Ideally, I want three steps in the subDAG: one for loading, one for the clean-up transforms and one for merging the result to something suitable for validation. At some future time when the data comes from real storage, the subDAG stage is then easy to remove. At the highest level, the overarching problem I'm looking to solve is working with really smart people who hide brilliant models behind goopy setup code that means you pretty much have to understand the whole thing before you can apply it to anything else. That's probably too much info. But if I can make this work with Pandas then the next Q becomes can it work with GeoPandas and then with various other things. Better dash as my kids are screaming. Appreciate any tips. And if you think this is the wrong approach or not right for Hamilton, say so. I won't be upset. Cheers…Amos
04/12/2023, 10:01 AM@resolve
may be excessive abstraction and I should either put up with a fixed set of inputs or wrap the driver in something that reduces the mess beforehand. As a user, though, it seems like it would be nice if it were possible to parameterise the inputs to .execute
rather than config
and have that propagate.Stefan Krawczyk
04/12/2023, 5:33 PMStefan Krawczyk
04/12/2023, 5:36 PMElijah Ben Izzy
04/12/2023, 5:42 PMfile_params
, I think you want a value
on top of it. E.g. "file_params" : value(file_params)
Dependencies can be source
or value
— source means you get it from an upstream node, value
means you get it from a literal value. E.G. source('foo')
means there’s a node foo
to depend on and value(3)
means its the literal value 3
. I think that’ll solve your thing.
2. Geopandas is fully supported on hamilton (see https://github.com/DAGWorks-Inc/hamilton/blob/5c8e564d19ff2386b00a1ef71de43d11205c5273/hamilton/plugins/geopandas_extensions.py)Amos
04/13/2023, 12:43 PMvalue(…)
, of course. Why didn't I remember that. That worked, and I was able to complete the subDAG with the inject(…)
parts etc. You guys have been very generous with your time, and I'm hesitant to take up more of it. But if you're up for a chat, I'm on UTC+9:30 and can be pretty flexible outside child-chaos hours. I'm hoping this exercise becomes an example I can pitch to a wider array of users, so maybe there's something in it for your project in the medium term. I'd certainly gratefully receive any tips.Elijah Ben Izzy
04/13/2023, 1:12 PMElijah Ben Izzy
04/13/2023, 1:21 PMsource
versus value
. Will try to do that shortly.Amos
04/14/2023, 5:58 AM@inject(join_order=value(_JOIN_ORDER), merge_cols=value(_MERGE_COLS))
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda inputs: inject(
curated_inputs=group(**{key: source(key + _CURATED) for key in inputs}),
),
)
def joined_files(
curated_inputs: Dict[str, pd.DataFrame],
join_order: List[str],
merge_cols: List[str],
) -> pd.DataFrame:
I get AttributeError: 'str' object has no attribute '__name__'
In this context, _JOIN_ORDER
is just a list of strings defined in the same module.Amos
04/14/2023, 5:58 AMAttributeError
on import not on execution. Remove this line @inject(join_order=value(_JOIN_ORDER), merge_cols=value(_MERGE_COLS))
and it imports and runs fine (obviously I have to add defaults to the function).Amos
04/14/2023, 6:33 AM# @inject(join_order=value(_JOIN_ORDER), merge_cols=value(_MERGE_COLS))
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda inputs: inject(
curated_inputs=group(**{key: source(key + _CURATED) for key in inputs}),
),
)
def joined_files(
curated_inputs: Dict[str, pd.DataFrame],
join_order: List[str] = _JOIN_ORDER, # TODO: should be injected
merge_cols: List[str] = _MERGE_COLS, # TODO: should be injected
) -> pd.DataFrame:
"""Join files."""
Confused because if I can use value
to supply a dict of strings extracted from config
as a literal, why can't I use it to supply a list of strings? Must be missing smtg. Tks.Amos
04/14/2023, 7:53 AM@extract_fields
works? I can't immediately find an example. In this context, params
comes from the config dictionary supplied to the DAG.
@extract_fields(fields={"gvars": List[str]})
def parameters(params: Dict[str, Any]) -> Any:
"""Return a node for each pipeline parameter."""
return params
Amos
04/14/2023, 8:00 AM@extract_fields
will accept `fields={'thing': list}`but not fields={'thing': List[str]}
Elijah Ben Izzy
04/14/2023, 4:30 PMinject
outside of the resolve
is actually what you want… Rather, hardcoding it in the function, or as a default, might be a lot cleaner (yes, defaults can’t be mutable, but this could be a tuple or a sentinel None
value…). That way you don’t have to do two hops when reading it.Elijah Ben Izzy
04/14/2023, 4:32 PMextract_fields
now… Looks like you’ve found a bug!
>>> @extract_fields(fields={"gvars" : List[str]})
... def parameters(params: Dict[str, Any]) -> Any:
... return params
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/function_modifiers/expanders.py", line 731, in __init__
raise base.InvalidDecoratorException(
hamilton.function_modifiers.base.InvalidDecoratorException: Error, found these ['gvars does not declare a type. Instead it passes typing.List[str].']. Please pass in a dict of string to types.
Scoping out a quick fix…