Seth Stokes
06/13/2024, 3:52 PMhamilton
is that you cannot redefine a node with the same name.
However today, I noticed that this is possible in the contrived example below.
It was accidentally accomplished via load_from/inject_
.
Is this intended behavior or should an error have been raise?
@subdag(
scraper_a,
inputs={"data_location": source("a_data_location")}
)
def a_raw(
a_scraped_data: pd.DataFrame,
) -> pd.DataFrame:
return a_scraped_data
@extract_fields({
"a_processed": pd.DataFrame,
"b_processed": pd.DataFrame,
"c_processed": pd.DataFrame
})
@load_from.excel(path=..., inject_="a_raw")
def do_some_work(
a_raw: pd.DataFrame,
b_raw: pd.DataFrame,
c_raw: pd.DataFrame,
) -> Dict[str, pd.DataFrame]:
# some work
return {
"a_processed": pd.DataFrame(...),
"b_processed": pd.DataFrame(...),
"c_processed": pd.DataFrame(...),
}
Stefan Krawczyk
06/13/2024, 5:23 PMStefan Krawczyk
06/13/2024, 5:24 PMElijah Ben Izzy
06/13/2024, 6:01 PMload_from
or inject
is not creating a node called a_raw
). Can you print out the visualization of this? Will help debug.Seth Stokes
06/13/2024, 7:22 PM%%cell_to_module scraper_a -d
import pandas as pd
def a_scraped_data(a_data_location: str) -> pd.DataFrame:
return pd.DataFrame(dict(a=list("xyx")))
a_data_location = "a_sample_data_path.csv"
pd.DataFrame(dict(a=list("abc"))).to_csv(a_data_location)
%%cell_to_module loaders -d
import pandas as pd
from typing import Dict
from hamilton.function_modifiers import subdag, source, extract_fields, load_from
import scraper_a
@subdag(
scraper_a,
inputs={"data_location": source("a_data_location")}
)
def a_raw(
a_scraped_data: pd.DataFrame,
) -> pd.DataFrame:
print(f"FROM SUBDAG: {a_scraped_data}")
return a_scraped_data
@extract_fields({
"a_processed": pd.DataFrame,
# "b_processed": pd.DataFrame,
# "c_processed": pd.DataFrame
})
@load_from.csv(path=source("a_data_location"), inject_="a_raw")
def do_some_work(
a_raw: pd.DataFrame,
# b_raw: pd.DataFrame,
# c_raw: pd.DataFrame,
) -> Dict[str, pd.DataFrame]:
# some work
return {
"a_processed": pd.DataFrame(dict(a=[1,2,3])),
# "b_processed": pd.DataFrame(dict(b=[3,4,5])),
# "c_processed": pd.DataFrame(dict(c=[5,6,7])),
}
import loaders
from hamilton import driver
dr = driver.Builder().with_modules(loaders).build()
dr.execute(["a_processed"], inputs={"a_data_location": a_data_location})
>>> {'a_processed': a
0 1
1 2
2 3}
Seth Stokes
06/13/2024, 7:25 PMDataFrame
, that was "overwritten" never printed so i guess it was never executed at all.
pd.DataFrame(dict(a=list("xyx")))
Elijah Ben Izzy
06/13/2024, 8:31 PM