Mattias Fornander US
04/24/2025, 10:22 PMSeth Stokes
05/07/2025, 8:07 PM# subdag.py
def mapping() -> pd.DataFrame:
...
@subdag(
prepare_src1, prepare_src2, prepare_src3,
inputs={
"directory": source("directory"),
"pattern_src1": source("pattern_src1"),
"pattern_src2": source("pattern_src2"),
"pattern_src3": source("pattern_src3"),
},
external_inputs=source("mapping")
)
def merge_and_stuff(
prepared_src1: pd.DataFrame,
prepared_src2: pd.DataFrame,
prepared_src3: pd.DataFrame,
mapping: pd.DataFrame
) -> pd.DataFrame:
...
Error:
TypeError: argument of type 'UpstreamDependency' is not iterableSeth Stokes
05/15/2025, 11:02 PMoverrides on the raw_df.raw node? I am currently coming across an issue where the step(_prepare) is failing because it is depending on the mapping done in raw_df . Since I am using pipe_output I am thinking that the body of raw_df should have been executed (and executed before) since raw_df.raw is one step above.
@pipe_output(
step(_prepare_data)
)
def raw_df(path: Path) -> pd.DataFrame:
# some mapping
...
def prep_df(raw_prev_643: pd.DataFrame) -> pd.DataFrame:
...
results: dict = dr.execute(["prep_df"], inputs={}, overrides={
"path": "path/to/file",
"raw_df.raw": raw_df, # loaded outside of dag
}
)
Slack ConversationMark Wotton
05/30/2025, 9:19 AMMark Wotton
05/30/2025, 9:21 AMMauricio
06/16/2025, 7:41 PMto materialization to execute a function after it happens?
# run.py
materializers = [to.csv(id="save_file", dependencies=["file_modify"], path="./out.csv")]
...driver build and execution
# pipeline.py
def load_saved_file(save_file, remote_path):
fs.put(save_file, remote_path)
When I try to reference like this I get Duplicate node names found
Any advice?Louwrens
06/20/2025, 1:42 PMSAURABH SINGH
07/19/2025, 8:18 AMNic Pottier
07/21/2025, 5:17 PMasyncio.run() loop within a Hamilton node to parallelize some work in that node? It seems to work, but I'm curious if there are gotchas.Hassan S
07/22/2025, 7:37 AMHassan S
07/22/2025, 7:40 AMHassan S
07/22/2025, 7:50 AMNic Pottier
07/22/2025, 2:40 PMconfig .. I then have sub config nodes that take config as an input which are then used downstream. That makes sure those downstream nodes only reevaluate if the parts of the config that matter to them change.Hassan S
07/23/2025, 2:15 PMJannik Buhr
07/25/2025, 12:58 PMParallelizable . This leads to unexpectedly long execute times, even though the data was in fact restored from the cache. I suspect this is due to the number of nodes that are being checked. Essentially, I have a parallel Node that for a list of directories goes into the directory and processes some data to return a DataFrame. All DataFrames then get collected into one big DataFrame in the end. What are your recommendations? Is there a way for me to give hamilton more information about the data such that it can speed up cache lookups? And is there the opposite of @cache(behavior="recompute") to tell hamilton that something is assumed to be up-to-date?
Doing
id = dr.cache.last_run_id
data_version = dr.cache.data_versions[id]['all_frames']
result_store.get(data_version)
is an option, but it would still be nice to speed up dr.execute when the cache is up-to-date.Hassan S
07/28/2025, 12:14 PMHassan S
07/31/2025, 9:24 AMHassan S
07/31/2025, 9:37 AMFabian Preiß
08/04/2025, 12:33 PMHassan S
08/07/2025, 10:49 PMDries Hugaerts
08/16/2025, 10:38 AMCOUNTRY_CODES = ['BE', 'NL', 'FR', 'DE', ...]
load_parameterization = { f"{cc.lower()}_load": dict(cc=value(cc)) for cc in COUNTRY_CODES }
@parameterize(**load_parameterization)
def country_load(cc: str) -> pd.Series:
# Some specific way of getting the countries load
return get_load(cc)
without resorting to this:
@tag(cc='BE')
def be_load():
return get_load('BE')
@tag(cc='NL')
def nl_load():
return get_load('NL')
...
Is something like this already possible?
I appreciate the help.
Cheers DriesVladyslav Bondarenko
08/19/2025, 10:43 AMlegacy_cashflow_computation into 3 sub results using @extract_fields, it looks like all of them are being picked up from cache correctly, however it seems like we still re-run legacy_cashflow_computation
I understand that the reason we are re-running right now is because we have not handled pd.DataFrame and pydantic.BaseModel hashing properly using fingerprinting yet
However, I would have expected it to understand that given we picked up all 3 outputs of a node from cache, it should stop traversing the graph further. Am I understanding the intended behaviour incorrectly or could this be a bug?
Appreciate any help on this!Zhenya Kovalyov
09/04/2025, 12:57 PM@save_to.parquet(...) sprinkled around my pipelines, and in order to trigger the saving routine, i need to explicitly call out save.<name> in final_vars in driver.execute
question - is there a way i can modify my driver.execute call, so that all vars that are decorated with save_to get persisted?
thank you very much in advance!Erin Gunn
09/07/2025, 9:40 PMGavin Kistner
09/15/2025, 7:21 PMdef foo(some_node: SomeNode, foo_cfg: FooConfig) -> int:
…
# This is procedurally created with dynamic names at runtime
foo_cfgs = {"a": FooConfig(…), "b": FooConfig(…)}
dr = driver.Builder().with_config({"foo_cfgs": foo_cfgs}).build()
print(dr.execute(final_vars=[*foo_cfgs.keys()]))
#=> {"a":42, "b":17}
I have a working approach (details in thread) using @resolve and @parameterize, but I'm wondering if there's a better way to do this using pipelines or subdags or something. Basically:
What's the Hamilton way (or ways) to accomplish the equivalent of this procedural code:
for name, setup in some_input.items():
final_result[name] = some_node(…other_attrs_from_dag…, some_attr=setup)Eric Riddoch
10/03/2025, 7:38 PMRoi Jacob Olfindo
10/08/2025, 7:10 AM>>> from hamilton.function_modifiers import unpack_fields
ImportError: cannot import name 'unpack_fields' from 'hamilton.function_modifiers' (/root/venv/lib/python3.10/site-packages/hamilton/function_modifiers/__init__.py)
What would be the alternatives to this so that I could extract something from a tuple?Abhilash Singhal
10/09/2025, 10:41 AMSeth Stokes
10/13/2025, 4:13 PMParallelizable incorrectly?
def raw_paginated_results(api: API, query: str) -> Parallelizable[dict]:
"""Paginate the results from the API.
a `nextPageToken` indicates more pages must be fetch.
Previously, in v2, split into batches from `total` `startAt` and `maxResults`.
"""
next_page, init_result = api.search(query, max_results=50)
yield init_result
while next_page:
next_page, result = api.search(query, max_results=50, params={"nextPageToken": next_page})
if result:
yield result
When I collect this I am getting a List[List[dict]] (where the outer list is just len of 1) instead of a List[dict].
Could this have something to do with the two yield statements?Paul D
10/14/2025, 2:32 PM@rename("My Feature")
def my_feature(...) -> ...:
return this_feature
And then node is named My Feature in the DAG, and I can further access it with source("My Feature") if needed.