Elton Cardoso Do Nascimento
01/16/2025, 9:23 PMElton Cardoso Do Nascimento
01/16/2025, 9:23 PMHadi Gharibi
01/18/2025, 12:46 PMStefan Krawczyk
01/21/2025, 5:38 PMEthan Kim
01/23/2025, 6:17 PMGilad Rubin
01/27/2025, 9:03 AMStefan Krawczyk
01/28/2025, 5:31 PMMatthew K
01/29/2025, 3:37 PMprocess_data_from_special_api
function, so they could find that a coworker's DAG brings in data from another source to enrich the data in their pipeline that would also be useful in their own, or that so many analytics use the same follow on steps that perhaps they should be modularized in a component library to increase consistency across analytics.·
The thing is, this doesn't actually seem that easy to do in Hamilton, so I first have to ask, does the above setup I have in mind seem reasonable? From reading the documentation and doing a little experimenting, function reuse is tricky in general, or at least not intuitive. Imagine I made a module of reusable components that I import into my_analytic
like Builder().with_modules([component_library, my_analytic])
, but I really only want one function from component_library
, say process_special_data
. My understanding is, to hook it into the my_analytic
dag, I would need to make sure I have upstream nodes that are the expected inputs to process_special_data
and output nodes that will consume process_special_data
as input. What happens to any extra nodes/subgraphs that I don't want from the module? Is there a way to just import the process_special_data
function to make it explicit in the my_analytic
calling code exactly what the analytic depends on? Is there any good way to further decorate an imported function like this, if I wanted to tag further metadata or change parameterizations specific to the analytic? If another analytic uses a function I want in mine, is there a better way to bring it into my own graph/model outside of copy pasting it?
I have a separate, unrelated question about materialization and external APIs in general. Is materialization a concept that should only be used for initial and final I/O? If I grab data from a local parquet file using the built in parquet loader, process it, then grab data from an external API based on that processing to further enrich it, should I treat building a client, making the request, and processing the response as nodes in my graph, or should I write and register a data loader for the API that does this under the hood, so I can just decorate a node in my graph with @load_from.my_api(request_params=source["parquet_processing"])
? Static materialization, if I understand correctly, is out of the picture for this case, since I don't know how to construct the request until I do the initial parquet processing? I don't really see any examples in Hamilton docs on data loading/materialization outside of fairly simple static examples.
I appreciate any help on these questions.Seth Stokes
01/30/2025, 9:47 PMStefan Krawczyk
02/04/2025, 5:43 PMZoltan Eisler
02/04/2025, 10:38 PMJoao Castro
02/05/2025, 1:44 PMJoão Paulo Vieira
02/06/2025, 1:49 PMsahil-shetty
02/09/2025, 6:21 PMSeth Stokes
02/10/2025, 9:12 PM.with_materializers()
to save out intermediate step in a dag.
I am currently trying the following but am encountering some issues.
Details in thread.Volker Lorrmann
02/11/2025, 1:33 PMStefan Krawczyk
02/11/2025, 5:35 PMZoltan Eisler
02/12/2025, 8:48 AMHamiltonTracker
up to 20% of my DAG tuns can be spent calculating dataframe stats for display in the UI. I have short-circuited the calculations by overriding hamilton_sdk.tracking.polars_stats._compute_stats_
by an empty function, but any less hacky suggestions are welcome.
Second, and more crucially, I hit a very steep serialization penalty when moving data between nodes. If I understood correctly, we can eliminate serialization (at the cost of parallelism, possibly) by putting all the nodes in the same group, but I can’t seem to pull this off. Here’s my code:
class MyGS(grouping.GroupingStrategy):
def group_nodes(self, nodes: List[Node] -> List[NodeGroup]:
group = NodeGroup(base_id=“whatever”, spawning_task_base_id=None, nodes=nodes, purpose=NodeGroupPurpose.EXECUTE_BLOCK)
return [group]
Then I just attach this to my driver by calling
driver.Builder().with_grouping_strategy(MyGS())
However, this doesn’t seem to have the desired effect, stuff still gets serialized. Any suggestion what I am doing wrong? Or another possible workaround?Elton Cardoso Do Nascimento
02/12/2025, 7:42 PMparameterize
to define a new node in a file different from the original funcition?
For example, I have file a.py:
def a(input1:int) -> int:
...
And a file b.py:
b = parameterize(b={"input1":value(1))(a)
When I try this, and import in the module init.py both a and b, I get "Cannot define function a more than once." when building the driver.Stefan Krawczyk
02/18/2025, 5:47 PMSeth Stokes
02/18/2025, 10:33 PMfinal_result
will all the step
functions be applied to upstream_int
when I get it in the function body? What if it were pipe_output
?
@pipe_input(
step(_add_one),
step(_multiply, y=2),
step(_sum, y=value(3)),
step(_multiply, y=source("upstream_node_to_multiply")),
)
def final_result(upstream_int: int) -> int:
pdb.set_trace()
return upstream_int
Slack ConversationSeth Stokes
02/20/2025, 9:04 PMlast_run
is being used and its reflected by the function getting used _with_filter_run_date
.
However, for documentation purposes, how would you suggest amending so that all last_run
config options are visible in the DAG? Is this possible?
@extract_fields({
"child_process_run_id": int,
"child_process_completed_at": pd.Timestamp,
})
@pipe_input(
step(_filter_run_bom).when(last_run="bom"),
step(_filter_run_latest).when(last_run="last"),
step(_filter_run_date, completion_date=source("completion_date")).when(last_run="completion_date"),
)
def child_process_run_info(
child_process_runs: pd.DataFrame,
child_rec_process_code: str,
completion_date: Optional[str] = None
) -> Dict[int, pd.Timestamp]:
"""Return run id for process from `last_run` logic."""
Slack ConversationJonas Meyer-Ohle
02/21/2025, 4:36 PMEmeka Anyanwu
02/21/2025, 7:37 PMSeth Stokes
02/25/2025, 4:40 PMconfig
outside of the dag and executing the driver
for each config.
To using Parallelizable/Collect
to handling this and get some speed up.
The issue I am having is that previously, the dag depended on the `config`to know which downstream functions to call via config.when
Now this config variable is the iterated list, is there a way to still expose each of these so that the dag can know which downstream functions to call?Elijah Ben Izzy
02/25/2025, 5:37 PMBenoit Aubuchon
03/03/2025, 6:07 PMSeth Stokes
03/03/2025, 6:19 PM@check_output
does hamilton
have the ability to split the offending rows into two datasets?
@check_output(positions_schema)
def final_dataframe_before_api_call(prepped_df: pd.DataFrame) -> pd.DataFrame:
return prepped_df
def final_dataframe_validated(prepped_df: pd.DataFrame) -> pd.DataFrame:
return prepped_df
def final_dataframe_violated(prepped_df: pd.DataFrame) -> pd.DataFrame:
return prepped_df
def api_request_object(final_df_validated: pd.DataFrame) -> dict:
return final_df_validated.to_dict(orient="records")
def post_request_to_api(api_request_object: dict) -> Any:
...
Seth Stokes
03/04/2025, 5:10 PMextract_columns
on a `Collect`'ed node?
I am getting a seemingly unrelated error but I just added that so thats what I am suspecting.Stefan Krawczyk
03/04/2025, 5:46 PM