Hi guys, I'm a beginner with Hamilton (and like i...
# hamilton-help
i
Hi guys, I'm a beginner with Hamilton (and like it very much so far!) looking for suggestions on how to implement/structure the following: I want to have the ability to load data from multiple (~15) csv, and apply some enrichments on their contents. Specifically, each table contains either one or two (consistently-named) columns with unix timestamps, which I want to convert to datetime objects. The variable number of columns can be easily processed branchlessly with pandas through
pd.DataFrame.filter(like="_nanos")
, but I'm not sure how to set this up using DRY, to be a stage applied "transparently" to this family of tables. Currently, my solution is a non-Hamilton class that does the loading from csv including the enrichments, and Hamilton methods decorated with
@dataloader
that call said class, e.g.
Copy code
@dataloader
def raw_performance_data(dl: FancyDataLoader) -> tuple[pd.DataFrame, dict]:
    return dl.load_enriched("perf"), str(dl)
đź‘€ 1
s
@Iliya R what’s the inside of your custom dataloader? Is it just a for loop over those files? To clarify, do those files all have the same structure? One thing to note is that
@dataloader
should be
@dataloader()
. Otherwise ideas: 1. Put the dataloader logic in the function itself. You can use
@config.when
to switch implementations. 2. Do what you’re doing now by taking in a dataloader and calling it within the function. 3. For N CSVs, create N loader functions. This would be using something like
@parameterize
see tutorial. If it’s a fixed list, you can hard code it. If not, then you can combine it with
@resolve
(docs).
i
My custom dataloader is a wrapper around a csv, parquet, and other filetype readers. It's initialized with some business parameters, such as team number, date, etc, then internally it constructs the path to the data files and creates the right Strategy for loading the available files. Then it has individual public methods to load each of the tables. Re
dataloader()
- thanks, I figured that out too eventually. 1. I'll give that a try. 2. What's the benefit in that compared to the current approach? 3. I have something like that. The list is fixed but some files might occasionally be missing. Thanks for the suggestions! The most important takeaway for me is I wasn't too far off in my attempts from the best practices.
s
@Iliya R the key design decisions are really around what do you want to make easier/harder to change. E.g. do you want a pull request to change something? or should it be completely driven by inputs/configuration? Then there’s how do you want it to display / be captured in the Hamilton UI - since you can surface metadata there… do you want N different nodes? or just one? or? Oh forgot there’s a 4th way - write your own custom hamilton datasaver & dataloader: • https://hamilton.dagworks.io/en/latest/concepts/materialization/#dataloader-and-datasaver — which you can then use at driver time, or as a decorator…
đź‘€ 1
i
Is it possible to
@parametrize
a
@load_from
? Assuming my custom dataloader (
raw_pta_data
) accepts a "loader" object (a constant user input) as well as one of multiple table names, I want to do something like:
Copy code
@parameterize_values(parameter="table_name", assigned_output=PTA_TABLES)
@load_from.raw_pta_data(loader=source("pta_loader"), table_name=source("table_name"))  # `table_name` is supposedly parametrized
def pta_table_base(input_data: pd.DataFrame) -> pd.DataFrame:
    """Retrieve a parametrized PTA table."""
    return input_data
If something like this can work - (how) should I mention the
table_name
input in the
@load_from
?
s
Yeah so naively — it wont work — we have some decorator precedence. But I think if you used parameterized_subdag it could just work — i.e. treat one part as a mini “DAG” and then parameterize it.
Copy code
@load_from.raw_pta_data(loader=source("pta_loader"), table_name=source("table_name"))  # `table_name` is supposedly parametrized
def pta_table_base(input_data: pd.DataFrame) -> pd.DataFrame:
    """Retrieve a parametrized PTA table."""
    return input_data
    

@parameterized_subdag(pta_table_base,
  output_1={"inputs": {"pta_loader": source("pta_loader"), "table_name": ...}}, # <-- you can construct this dict of dicts however you want...
  output_2={"inputs": {"pta_loader": source("pta_loader"), "table_name": ...}}, 
)
def test_subdag(pta_table_base: pd.DataFrame) -> pd.DataFrame:
    return pta_table_base
🤔 1
e.g. it should then create DAGs that look something like this:
i
Followup question: is there any solution that's a combination of @load_from and Parallelizable/Collect? Much like Airflow's dynamic task mapping. Suppose one of my nodes generates a list of codes, then I want to go to an external resource and fetch a table for each code, then reduce all individual tables to a single table. I tried achieving this using a combination of
@parameterized_subdag
,
@parameterize_sources
,
Parallelizable
and
Collect
- but this seems to be a very convoluted way to do something that should be simple/straightforward.
s
inject is another decorator that could be useful here. Dissecting the airflow approach:
Copy code
@task
def make_list():
    # This creates an unknown N values
    return [1, 2, 3, 4]


@task
def consumer(arg):
    print(arg)
@task
def reduce_step(values):
    total = sum(values)
    print(total)

with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    v = consumer.expand(arg=make_list()) # <-- at DAG construction time it specifies hey there could be a list
    reduce_step.expand(values=v)
The DAG created here would only have three nodes.
I think this is the most equivalent to Parallelizable & Collect — I think of these two constructs as “book ends” on some DAG.
So for me it could look something like:
Copy code
import pandas as pd
from hamilton.function_modifiers import load_from, source
from hamilton.htypes import Parallelizable, Collect

def code(inputs: list) -> Parallelizable[str]:
   for _code in inputs:
      yield _code

@load_from.csv(path=source("code"))
def data_set(df: pd.DataFrame) -> pd.DataFrame:
   # some processing
   return df

def reduced_df(data_set: Collect[pd.DataFrame]) -> pd.DataFrame:
   return pd.concat(data_set)

def rest_of_my_computation(reduced_df: pd.DataFrame) -> object:
    # do something
    return None
So it would look something like this:
inputs
here could be another function, or could be passed in at runtime, etc. Does that map to what you were thinking @Iliya R?
I also realize I probably didn’t understand your original intent and probably sent you down an unnecessary path… sorry for that.
👌 1
i
Yeah that looks about right, thanks a lot! When I was playing around with it, I somehow only got a green node, and no red node. Also, what's up with the arrows in this graph? They were also strange for me.
👍 1
s
The “crows” feet try to denote multiplicity
so code “expands”
and then it “contracts” into
reduced_df
.
i
Oh, I see... imho it's unnecessary given the green & red rectangles, but ok. If I want to contract the book ends including everything in between into a single node with a custom name, the way to do that is only though graphviz customizations?
s
so you get the graphviz object back — so you can manually adjust it. Otherwise I think you need to read https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/graph.py#L199 and understand the order, since I don’t think we can can collapse nodes IIRC.
The UI has some functionality for collapsing, but we haven’t added support for collapsing parallelizable & collect yet.
👍 1
i
Is it straightforward to implement? I could give that a try
s
Graphviz is a little painful. They really want you to construct their graph properly from the beginning. So we’d need a traversal algorithm to create a “collapsed” node and ensure the right edges are connected. Then wiring it all through the call chains.
For the UI — it’s react flow — and @Elijah Ben Izzy would have a better idea. But I assume you were thinking of the static image versus the UI?
i
That's right
e
Yeah so we need to have groupings for react flow as well, but I think that a bit of clever graphviz stuff will help. We can probably annotate it with taskgroups and automatically put it in a box…
👍 1