Matías Fuentes
04/19/2025, 1:57 AMVolker Lorrmann
04/24/2025, 7:43 AMVolker Lorrmann
04/24/2025, 9:54 AMMattias Fornander US
04/24/2025, 4:48 PMMattias Fornander US
04/24/2025, 4:55 PMMattias Fornander US
04/24/2025, 10:22 PMSeth Stokes
05/07/2025, 8:07 PM# subdag.py
def mapping() -> pd.DataFrame:
...
@subdag(
prepare_src1, prepare_src2, prepare_src3,
inputs={
"directory": source("directory"),
"pattern_src1": source("pattern_src1"),
"pattern_src2": source("pattern_src2"),
"pattern_src3": source("pattern_src3"),
},
external_inputs=source("mapping")
)
def merge_and_stuff(
prepared_src1: pd.DataFrame,
prepared_src2: pd.DataFrame,
prepared_src3: pd.DataFrame,
mapping: pd.DataFrame
) -> pd.DataFrame:
...
Error:
TypeError: argument of type 'UpstreamDependency' is not iterableSeth Stokes
05/15/2025, 11:02 PMoverrides
on the raw_df.raw
node? I am currently coming across an issue where the step(_prepare)
is failing because it is depending on the mapping done in raw_df
. Since I am using pipe_output
I am thinking that the body of raw_df
should have been executed (and executed before) since raw_df.raw
is one step above.
@pipe_output(
step(_prepare_data)
)
def raw_df(path: Path) -> pd.DataFrame:
# some mapping
...
def prep_df(raw_prev_643: pd.DataFrame) -> pd.DataFrame:
...
results: dict = dr.execute(["prep_df"], inputs={}, overrides={
"path": "path/to/file",
"raw_df.raw": raw_df, # loaded outside of dag
}
)
Slack ConversationMark Wotton
05/30/2025, 9:19 AMMark Wotton
05/30/2025, 9:21 AMMauricio
06/16/2025, 7:41 PMto
materialization to execute a function after it happens?
# run.py
materializers = [to.csv(id="save_file", dependencies=["file_modify"], path="./out.csv")]
...driver build and execution
# pipeline.py
def load_saved_file(save_file, remote_path):
fs.put(save_file, remote_path)
When I try to reference like this I get Duplicate node names found
Any advice?Louwrens
06/20/2025, 1:42 PMSAURABH SINGH
07/19/2025, 8:18 AMNic Pottier
07/21/2025, 5:17 PMasyncio.run()
loop within a Hamilton node to parallelize some work in that node? It seems to work, but I'm curious if there are gotchas.Hassan S
07/22/2025, 7:37 AMHassan S
07/22/2025, 7:40 AMHassan S
07/22/2025, 7:50 AMNic Pottier
07/22/2025, 2:40 PMconfig
.. I then have sub config nodes that take config
as an input which are then used downstream. That makes sure those downstream nodes only reevaluate if the parts of the config that matter to them change.Hassan S
07/23/2025, 2:15 PMJannik Buhr
07/25/2025, 12:58 PMParallelizable
. This leads to unexpectedly long execute times, even though the data was in fact restored from the cache. I suspect this is due to the number of nodes that are being checked. Essentially, I have a parallel Node that for a list of directories goes into the directory and processes some data to return a DataFrame. All DataFrames then get collected into one big DataFrame in the end. What are your recommendations? Is there a way for me to give hamilton more information about the data such that it can speed up cache lookups? And is there the opposite of @cache(behavior="recompute")
to tell hamilton that something is assumed to be up-to-date?
Doing
id = dr.cache.last_run_id
data_version = dr.cache.data_versions[id]['all_frames']
result_store.get(data_version)
is an option, but it would still be nice to speed up dr.execute
when the cache is up-to-date.Hassan S
07/28/2025, 12:14 PMHassan S
07/31/2025, 9:24 AMHassan S
07/31/2025, 9:37 AMFabian Preiß
08/04/2025, 12:33 PMHassan S
08/07/2025, 10:49 PMDries Hugaerts
08/16/2025, 10:38 AMCOUNTRY_CODES = ['BE', 'NL', 'FR', 'DE', ...]
load_parameterization = { f"{cc.lower()}_load": dict(cc=value(cc)) for cc in COUNTRY_CODES }
@parameterize(**load_parameterization)
def country_load(cc: str) -> pd.Series:
# Some specific way of getting the countries load
return get_load(cc)
without resorting to this:
@tag(cc='BE')
def be_load():
return get_load('BE')
@tag(cc='NL')
def nl_load():
return get_load('NL')
...
Is something like this already possible?
I appreciate the help.
Cheers DriesVladyslav Bondarenko
08/19/2025, 10:43 AMlegacy_cashflow_computation
into 3 sub results using @extract_fields
, it looks like all of them are being picked up from cache correctly, however it seems like we still re-run legacy_cashflow_computation
I understand that the reason we are re-running right now is because we have not handled pd.DataFrame
and pydantic.BaseModel
hashing properly using fingerprinting
yet
However, I would have expected it to understand that given we picked up all 3 outputs of a node from cache, it should stop traversing the graph further. Am I understanding the intended behaviour incorrectly or could this be a bug?
Appreciate any help on this!Zhenya Kovalyov
09/04/2025, 12:57 PM@save_to.parquet(...)
sprinkled around my pipelines, and in order to trigger the saving routine, i need to explicitly call out save.<name>
in final_vars
in driver.execute
question - is there a way i can modify my driver.execute
call, so that all vars that are decorated with save_to
get persisted?
thank you very much in advance!Erin Gunn
09/07/2025, 9:40 PMGavin Kistner
09/15/2025, 7:21 PMdef foo(some_node: SomeNode, foo_cfg: FooConfig) -> int:
…
# This is procedurally created with dynamic names at runtime
foo_cfgs = {"a": FooConfig(…), "b": FooConfig(…)}
dr = driver.Builder().with_config({"foo_cfgs": foo_cfgs}).build()
print(dr.execute(final_vars=[*foo_cfgs.keys()]))
#=> {"a":42, "b":17}
I have a working approach (details in thread) using @resolve
and @parameterize
, but I'm wondering if there's a better way to do this using pipelines or subdags or something. Basically:
What's the Hamilton way (or ways) to accomplish the equivalent of this procedural code:
for name, setup in some_input.items():
final_result[name] = some_node(…other_attrs_from_dag…, some_attr=setup)