This message was deleted.
# hamilton-help
s
This message was deleted.
e
You have a few options — it depends a bit on how many files there are. Specifically, either: 1. Its the same # of files every time (as you said, taht set represetns that same set of things) 2. Its a different # of files every time (same set of things, different number) For (1), you’re exactly right —
@parameterized_subdag
allows you to process each one individually. Something like:
Copy code
dr = driver.Driver(*modules, config={})
# the following is a *fixed* set of files
result = dr.execute(
    ..., 
    inputs={
        'file_1_path' : 'path_for_file_1', 
        'file_2_path' : 'path_for_file_2', 
        'file_3_path' : 'path_for_file_3', 
         ...
    }
)
Then, in your
parameterized_subdag
, you can refer to each one specifically:
Copy code
@parameterized_subdag(
    file_processing_module,
    parameterization={
        "file_1_processed" : {"inputs" : {"path" : source("file_1")}},
        "file_2_processed" : {"inputs" : {"path" : source("file_2)}},
        "file_3_processed" : {"inputs" : {"path" : source("file_3)}},
        ...}
)
def file_n_processed(final_result_from_subdag: pd.DataFrame) -> pd.DataFrame:
    return final_result_from_subdag
    ...

def merged(
    file_1_processed: pd.DataFrame, 
    file_2_processed: pd.DataFrame, 
    file_3_processed: pd.DataFrame, ...) -> pd.DataFrame:
    return ...
If you have a varying number and need it to be configurable, we also have a tool for that! It buried under power-user mode for now though — you can use
@resolve
to create the nodes after the configuration is available: https://hamilton.readthedocs.io/en/latest/reference/api-reference/decorators.html#resolve. Specifically, instead of having a static set of nodes, we use the config you pass the driver to do it. Something like this:
Copy code
dr = driver.Driver(
    *modules, 
    config={'files' :
                {'file_1' : 'file_1_path.csv', 'file_2': 'file_2_path.csv', 'file_3' : 'file_3_path.csv', ...}, 
            'hamilton.enable_power_user_mode' : True})
Then you can use
@parameterized_subdag
behind
resolve
to create a dynamic number of processed files, one from each config item passed in. Note the name of the parameter in the lambda is
files
, same as the key in the config.
Copy code
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    lambda files: parameterized_subdag(
        file_processing_module,
        parameterization={
            {input_name: {"inputs" : {"file_name": value(input_value) for input_name, input_value in files.items}
        }
    )
)
def file_n_processed(final_result_from_subdag: pd.DataFrame) -> pd.DataFrame:
    return final_result_from_subdag
Then you can use
resolve
+
inject
to inject them all into a function:
Copy code
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    lambda files: inject(processed_files=group(**{key: source(key) for key in files})
)
def joined_files(processed_files: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    # logic to join
So this is extremely powerful, but tends to make the DAG less readable. Its a common patter though — fan out (same thing for multiple files) and contract (combine them all).
Hope this helps — lmk if it confuses you at all, or you think ther’s a cleaner way to go about it. Last (obvious) choice is to just shove it all in one function — this is easy but you get a less fine-grained/unit testable DAG:
Copy code
def processed_files(file_list: List[str]) -> pd.DataFrame:
    results = []
    for file in file_list:
        results.append(process_file) # large amount of LOC that could be broken into functions
    return join_logic(results)
Also doesn’t have the possibiliity of parallelism/caching specific pieces, but could be nice in a pinch.
s
@Amos just to chime in. Doing :
Copy code
values_to_use = load_from_file(...)

@paramaterized_subdag(..., parameterization={... from values_to_use})
def my_subdag(...) -> ...
isn’t unreasonable either. The design choice is really about where do you want the friction for maintaining things? What do you want a pull request to change, and where?
a
Thanks, guys. This is helpful. I'll give it a go.
👍 1
Trying a toy example, but I seem to be missing a concept here. My DAG inputs are a simple mapping (e.g.
{'file_1': 'some_path.csv'}
) and the
loading_funcs
module just canotains a function
def read_file(path: str) -> pd.DataFrame: ...
Then I have :
Copy code
@parameterized_subdag(
    loading_funcs,
    parameterization={
        "file_1_processed" : {"inputs" : {"path" : source("file_1")}},
        "file_2_processed" : {"inputs" : {"path" : source("file_2")}},
    },
)
def read_files(read_file: pd.DataFrame) -> pd.DataFrame:
    """Read files from a folder."""
    return read_file

def merged(file_1_processed: pd.DataFrame, file_2_processed: pd.DataFrame) -> pd.DataFrame:
    """Merge files."""
    return pd.concat([file_1_processed, file_2_processed], axis=1)
which I try to run by calling
dr.execute(['merged'], inputs=my_mapping)
. But it doesn't generate the nodes I expect.
Copy code
ValueError: 2 errors encountered:
  Error: Required input file_1_processed not provided for nodes: ['merged'].
  Error: Required input file_2_processed not provided for nodes: ['merged'].
Seems I have not wired the graph correctly, which I guess is to do with the parameterisation and function naming in the subdag. Any further tips? Thanks
e
Looking — let me see if I can repro
OK, I figured it out! This is an issue with the documentation — there’s one part that’s wrong. The actual way to call it is with `**kwargs`:
Copy code
@parameterized_subdag(
    read_file,
    file_1_processed={"inputs" : {"path" : source("file_1")}},
    file_2_processed={"inputs" : {"path" : source("file_2")}}
)
def read_files(read_file: pd.DataFrame) -> pd.DataFrame:
    """Read files from a folder."""
    return read_file
However, the two examples are different here: https://hamilton.readthedocs.io/en/latest/reference/api-reference/decorators.html#parameterized-subdag
Fixing, and thanks for helping find it!
Docs have been fixed!
a
It lives! Nice. Funnily enough, I was pondering that difference last night, but my kids came home to slobber on the keyboard before I got around to investigating further. I'll see if I can prototype the rest of the pipeline and add the parameterisation I want. But that's a good start, thanks.
Good progress but still one or two outstanding uncertainties if you'll humour me a touch longer? First, to modify the updated
@parameterized_subdag
example above to accept a variable number of inputs using
@resolve
, would one write a helper function essentially to composes and returns the blelow (or equiv) from a dictionary of inputs? In this context,
connector_thinger
is a DAG node that takes an iput from the DAG's
config
dict. Would that still work?
Copy code
parameterized_subdag(
    per_file_transforms,
    file_1_processed={
        "inputs": {
            "file_params": source("file_1"),
            "connector_thinger": source("connector_thinger"),
        },
    },
    file_2_processed={
        "inputs": {
            "file_params": source("file_2"),
            "connector_thinger": source("connector_thinger"),
        },
    },
    ...
)
Secondly, what is the interaction between
config
in a DAG and a subDAG? The API reference for
@subdag
says its config dict "takes precedence over the DAG's config". Does that mean it replaces it or updates it or …? I'm fine loading the DAG's
inputs
and
config
from YAML but haven't quite mastered passing external parameterisation through to the subDAG. What's the easiest way of doing that? Thanks!
e
Of course! For
resolve
, the trick with
resolve
is that the parameter name corresponds to a field in the config dict. E.G:
Copy code
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda file_names: parameterized_subdag(
        per_file_transforms,
        **{file_name + "_processed": {
            "inputs" : {
                "file_params" : source(file_name), 
                "connector_thinger" : source("connector_thinger")} for file_name in file_names}
    )
)
def ...
Then you’d instantiate a driver with
driver.Driver({'file_names': ['file_1', 'file_2', 'file_3']})
Re: config/inputs in a subdag, its pretty straightforward: 1.
config
from the DAG gets passed to the subdag 2. If the subdag has its own
config
, it updates (not replaces) the original config Same with
inputs
. Re: loading `inputs`/`config` from yaml, first we recommend putting as much as you can in the code first, but then you can just load up your config/inputs before the driver and pass it in.
Copy code
config = load_config_from_yaml(...)
inputs = load_inputs_from_yaml(...)
dr = driver.Driver(config, ...)
dr.execute(..., inputs=inputs)
Simple as that!
s
@Amos you’re jumping right into advanced user power mode! To double down on one of Elijah’s questions: is the config checked in, or is that something you expect users to manipulate for input to the code? If it’s checked in — then my suggestion would be to just make it code and save the gymnastics of going from YAML to python. Otherwise, a note on
resolve
you needn’t define the function inline with a lambda - you can pass in a function that does that - just remember to use
_
in front of it if you’re defining it in the same python module it’ll be used in.
a
Really appreciate the input, guys. Maybe it's helpful if I take half a step back and explain what I'm trying to do. I want a generic pattern for preprocessing common scientific data formats, which include tabular data. Hamilton seems attractive because the trunks of pipelines could be written in its declarative style with Pandera-based validation at each end. The pipelines need to be conceptually segregated into things that happen to specific chunks of data (files/query results), things that happen to specific types of data and things are required for specific models. The latter two can be relatively static, and Hamilton's decorator-based parameterisation seems more than adequate. It's the first stage that's a bit sticky. These pipelines are to be written for experimental applications by people who don't much care about software engineering; they may be run by people with almost no coding experience, and there isn't a large maintenance team. While the data should come from proper storage, in practice it often doesn't. For better or worse, the models end up being more useful when people who turn up with a heap of nasty files can add a simple YAML config that helps them meet the minimum data requirements. Again in a toy example, this is the sort of thing:
Copy code
# Define a YAML string
yaml_str = """
import:
  type: csv
  file_path: data.csv
transformations:
  - method: dropna
  - method: rename
    args:
      columns:
        old_column_name: new_column_name
"""
...
import pandas as pd
import yaml


class BaseImporter:
    def __init__(self, config):
        self.config = config

    def read_data(self):
        raise NotImplementedError


class CsvImporter(BaseImporter):
    def read_data(self):
        return pd.read_csv(self.config["file_path"])


class Transformer:
    def __init__(self, config, custom_functions={}):
        self.config = config
        self.custom_functions = custom_functions

    def apply_transformations(self, df):
        for transformation in self.config:
            method = transformation["method"]
            args = transformation.get("args", {})

            if method in self.custom_functions:
                func = self.custom_functions[method]
                df = func(df, **args)
            else:
                df = getattr(df, method)(**args)
        return df


def import_and_transform(yaml_config_file):
    with open(yaml_config_file, "r") as file:
        config = yaml.safe_load(file)

    # Create the appropriate importer based on the config
    importer_class = {
        "csv": CsvImporter,
    }[config["import"]["type"]]
    importer = importer_class(config["import"])

    # Read data using the importer
    data = importer.read_data()

    # Apply transformations
    transformer = Transformer(config["transformations"])
    transformed_data = transformer.apply_transformations(data)

    return transformed_data
The point is to dynamically cater for a minimal set of clean-up operations without writing extra code. I'm trying at the moment to see if I can prototype that in Hamilton. In something closer to a real-world example, the YAML might be marginally more complicated. At this stage I can change the structure.
Copy code
yaml_str = """
config:
    io_config:
        project_key: SOMEPROJ
        folder_name: SOMEFLDR
inputs:
    file_1:
        file_path: some_data.csv
        file_type: csv
        load_args:
            usecols:
            - MYCOLUMN
        transformations:
            etc. 
...
It makes sense to run this first stage in a subDAG. At the moment I instantiate a data connector in the DAG and pass it in, along with the params from the YAML … something like this, as modified from your guidance.
Copy code
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda inputs: parameterized_subdag(
        per_file_transforms,
        **{
            file_id
            + "_processed": {
                "inputs": {
                    "file_params": file_params,
                    "folder_handle": source("folder_handle"),
                },
            }
            for file_id, file_params in inputs.items()
        },
    ),
)
def read_files(
...
But the
file_params
part is problematic because hamilton wants to call methods on it that suggest it should be specified as an upstream dependency. I guess I need to break up the inputs dict somehow with a node before this one? Ideally, I want three steps in the subDAG: one for loading, one for the clean-up transforms and one for merging the result to something suitable for validation. At some future time when the data comes from real storage, the subDAG stage is then easy to remove. At the highest level, the overarching problem I'm looking to solve is working with really smart people who hide brilliant models behind goopy setup code that means you pretty much have to understand the whole thing before you can apply it to anything else. That's probably too much info. But if I can make this work with Pandas then the next Q becomes can it work with GeoPandas and then with various other things. Better dash as my kids are screaming. Appreciate any tips. And if you think this is the wrong approach or not right for Hamilton, say so. I won't be upset. Cheers…
P.S. My intuition is that what I'm trying to do with
@resolve
may be excessive abstraction and I should either put up with a fixed set of inputs or wrap the driver in something that reduces the mess beforehand. As a user, though, it seems like it would be nice if it were possible to parameterise the inputs to
.execute
rather than
config
and have that propagate.
s
@Amos thanks for the context. Let’s set up a call to draw out a few things. I think I understand, but I think drawing out a few things would help me grok and provide the right recommendations. 🙂
Let us know when you’re up and we can schedule something.
e
Pre: call — a few points: 1. re:
file_params
, I think you want a
value
on top of it. E.g.
"file_params" : value(file_params)
Dependencies can be
source
or
value
— source means you get it from an upstream node,
value
means you get it from a literal value. E.G.
source('foo')
means there’s a node
foo
to depend on and
value(3)
means its the literal value
3
. I think that’ll solve your thing. 2. Geopandas is fully supported on hamilton (see https://github.com/DAGWorks-Inc/hamilton/blob/5c8e564d19ff2386b00a1ef71de43d11205c5273/hamilton/plugins/geopandas_extensions.py)
a
Ah
value(…)
, of course. Why didn't I remember that. That worked, and I was able to complete the subDAG with the
inject(…)
parts etc. You guys have been very generous with your time, and I'm hesitant to take up more of it. But if you're up for a chat, I'm on UTC+9:30 and can be pretty flexible outside child-chaos hours. I'm hoping this exercise becomes an example I can pitch to a wider array of users, so maybe there's something in it for your project in the medium term. I'd certainly gratefully receive any tips.
e
We have customers in australia so happy to make do (also somehow we’re on all hours of the night 😆 )
Also, yeah, I think we can add clearer error messages for
source
versus
value
. Will try to do that shortly.
a
That's kind of you. I have a few projects on the go, so perhaps if I keep forging ahead until I get truly stuck or have done the best I can then we could organise a call about how to move forward? In the interim, here's one thing that's confusing me. Why doesn't this work?
Copy code
@inject(join_order=value(_JOIN_ORDER), merge_cols=value(_MERGE_COLS))
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda inputs: inject(
        curated_inputs=group(**{key: source(key + _CURATED) for key in inputs}),
    ),
)
def joined_files(
    curated_inputs: Dict[str, pd.DataFrame],
    join_order: List[str],
    merge_cols: List[str],
) -> pd.DataFrame:
I get
AttributeError: 'str' object has no attribute '__name__'
In this context,
_JOIN_ORDER
is just a list of strings defined in the same module.
I get the
AttributeError
on import not on execution. Remove this line
@inject(join_order=value(_JOIN_ORDER), merge_cols=value(_MERGE_COLS))
and it imports and runs fine (obviously I have to add defaults to the function).
For clarity, this works as expected:
Copy code
# @inject(join_order=value(_JOIN_ORDER), merge_cols=value(_MERGE_COLS))
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda inputs: inject(
        curated_inputs=group(**{key: source(key + _CURATED) for key in inputs}),
    ),
)
def joined_files(
    curated_inputs: Dict[str, pd.DataFrame],
    join_order: List[str] = _JOIN_ORDER,  # TODO: should be injected
    merge_cols: List[str] = _MERGE_COLS,  # TODO: should be injected
) -> pd.DataFrame:
    """Join files."""
Confused because if I can use
value
to supply a dict of strings extracted from
config
as a literal, why can't I use it to supply a list of strings? Must be missing smtg. Tks.
Is this how
@extract_fields
works? I can't immediately find an example. In this context,
params
comes from the config dictionary supplied to the DAG.
Copy code
@extract_fields(fields={"gvars": List[str]})
def parameters(params: Dict[str, Any]) -> Any:
    """Return a node for each pipeline parameter."""
    return params
It seems
@extract_fields
will accept `fields={'thing': list}`but not
fields={'thing': List[str]}
e
So, I haven’t tested dual injects, but I think that’s a current limitation — in this case it might not be catching it (there’s usually a catch), can do a bit of testing to figure out why. This isn’t an inherent limitation, more an implementation detail. That said, I don’t think
inject
outside of the
resolve
is actually what you want… Rather, hardcoding it in the function, or as a default, might be a lot cleaner (yes, defaults can’t be mutable, but this could be a tuple or a sentinel
None
value…). That way you don’t have to do two hops when reading it.
Trying out
extract_fields
now… Looks like you’ve found a bug!
Copy code
>>> @extract_fields(fields={"gvars" : List[str]})
... def parameters(params: Dict[str, Any]) -> Any:
...     return params
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/function_modifiers/expanders.py", line 731, in __init__
    raise base.InvalidDecoratorException(
hamilton.function_modifiers.base.InvalidDecoratorException: Error, found these ['gvars does not declare a type. Instead it passes typing.List[str].']. Please pass in a dict of string to types.
Scoping out a quick fix…