Iliya R
08/06/2024, 8:53 AMpd.DataFrame.filter(like="_nanos")
, but I'm not sure how to set this up using DRY, to be a stage applied "transparently" to this family of tables.
Currently, my solution is a non-Hamilton class that does the loading from csv including the enrichments, and Hamilton methods decorated with @dataloader
that call said class, e.g.
@dataloader
def raw_performance_data(dl: FancyDataLoader) -> tuple[pd.DataFrame, dict]:
return dl.load_enriched("perf"), str(dl)
Stefan Krawczyk
08/06/2024, 4:24 PM@dataloader
should be @dataloader()
.
Otherwise ideas:
1. Put the dataloader logic in the function itself. You can use @config.when
to switch implementations.
2. Do what you’re doing now by taking in a dataloader and calling it within the function.
3. For N CSVs, create N loader functions. This would be using something like @parameterize
see tutorial. If it’s a fixed list, you can hard code it. If not, then you can combine it with @resolve
(docs).Iliya R
08/06/2024, 5:54 PMdataloader()
- thanks, I figured that out too eventually.
1. I'll give that a try.
2. What's the benefit in that compared to the current approach?
3. I have something like that. The list is fixed but some files might occasionally be missing.
Thanks for the suggestions! The most important takeaway for me is I wasn't too far off in my attempts from the best practices.Stefan Krawczyk
08/06/2024, 6:51 PMIliya R
08/08/2024, 10:17 AM@parametrize
a @load_from
? Assuming my custom dataloader (raw_pta_data
) accepts a "loader" object (a constant user input) as well as one of multiple table names, I want to do something like:
@parameterize_values(parameter="table_name", assigned_output=PTA_TABLES)
@load_from.raw_pta_data(loader=source("pta_loader"), table_name=source("table_name")) # `table_name` is supposedly parametrized
def pta_table_base(input_data: pd.DataFrame) -> pd.DataFrame:
"""Retrieve a parametrized PTA table."""
return input_data
If something like this can work - (how) should I mention the table_name
input in the @load_from
?Stefan Krawczyk
08/08/2024, 4:57 PM@load_from.raw_pta_data(loader=source("pta_loader"), table_name=source("table_name")) # `table_name` is supposedly parametrized
def pta_table_base(input_data: pd.DataFrame) -> pd.DataFrame:
"""Retrieve a parametrized PTA table."""
return input_data
@parameterized_subdag(pta_table_base,
output_1={"inputs": {"pta_loader": source("pta_loader"), "table_name": ...}}, # <-- you can construct this dict of dicts however you want...
output_2={"inputs": {"pta_loader": source("pta_loader"), "table_name": ...}},
)
def test_subdag(pta_table_base: pd.DataFrame) -> pd.DataFrame:
return pta_table_base
Stefan Krawczyk
08/08/2024, 4:58 PMIliya R
08/11/2024, 1:18 PM@parameterized_subdag
, @parameterize_sources
, Parallelizable
and Collect
- but this seems to be a very convoluted way to do something that should be simple/straightforward.Stefan Krawczyk
08/11/2024, 4:41 PM@task
def make_list():
# This creates an unknown N values
return [1, 2, 3, 4]
@task
def consumer(arg):
print(arg)
@task
def reduce_step(values):
total = sum(values)
print(total)
with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
v = consumer.expand(arg=make_list()) # <-- at DAG construction time it specifies hey there could be a list
reduce_step.expand(values=v)
The DAG created here would only have three nodes.Stefan Krawczyk
08/11/2024, 4:43 PMStefan Krawczyk
08/11/2024, 4:55 PMimport pandas as pd
from hamilton.function_modifiers import load_from, source
from hamilton.htypes import Parallelizable, Collect
def code(inputs: list) -> Parallelizable[str]:
for _code in inputs:
yield _code
@load_from.csv(path=source("code"))
def data_set(df: pd.DataFrame) -> pd.DataFrame:
# some processing
return df
def reduced_df(data_set: Collect[pd.DataFrame]) -> pd.DataFrame:
return pd.concat(data_set)
def rest_of_my_computation(reduced_df: pd.DataFrame) -> object:
# do something
return None
So it would look something like this:Stefan Krawczyk
08/11/2024, 4:56 PMinputs
here could be another function, or could be passed in at runtime, etc.
Does that map to what you were thinking @Iliya R?Stefan Krawczyk
08/11/2024, 4:58 PMIliya R
08/11/2024, 5:19 PMStefan Krawczyk
08/11/2024, 5:56 PMStefan Krawczyk
08/11/2024, 5:56 PMStefan Krawczyk
08/11/2024, 5:56 PMreduced_df
.Iliya R
08/11/2024, 6:02 PMStefan Krawczyk
08/11/2024, 6:06 PMStefan Krawczyk
08/11/2024, 6:09 PMIliya R
08/11/2024, 7:11 PMStefan Krawczyk
08/11/2024, 8:13 PMStefan Krawczyk
08/11/2024, 8:13 PMIliya R
08/11/2024, 8:19 PMElijah Ben Izzy
08/12/2024, 4:08 AM