Shoaib
03/26/2025, 3:26 PMMark Goodhead
03/29/2025, 11:07 PMPiotr Bieszczad
03/31/2025, 9:56 AMstep
.
I wanted to use @check_output_custom
, but the validation is not propagated.
Is there a way to do this?
Example:
@check_output_custom(CustomValidator())
def foo(...) -> ...:
...
return result
@pipe_output(
step(foo).named('foo_applied'),
...,
)
def bar(...) -> ...:
return result
Validation is not applied to foo_applied
Thank you in advance for reply
----------------------------------------------------------
Also is there a way of acting upon failure?
E.g I know, that if validation fails in one of the previous steps, than I want to return a flag inference failed
at the end of the flow, on top of exiting it.
Basically some functionality for early termination of a flowLuke
04/06/2025, 1:34 AMprocessed_image
node)?
• I don’t love some of my node names and abstractions — each_image
, processed_image
-> processed_images
😐. Any recommendations?
◦ This expansion is especially weird and points to poor naming: list(_processed_image["preprocessed_image"] for _processed_image in processed_image)
I’ve collapsed the image processing steps into one node, processed_image
. I’d rather explode this into preprocessed_image
, tesseract_output
, tesseract_summary
, etc… Ideally, the results of each of these stages could be accessed without further processing before all results are Collected. With that in mind, I realize the current implementation is probably fine for this level of the problem. However, I’ll need a different solution for the “meta-scale” problem — processing thousands of PDFs. Within this context, I’ll need to queue, monitor, and save artifacts for each PDF individually. I wont be collapsing all processing into a single node as I’ve done here. I suspect this is where Hamilton’s scope ends and a full-featured Orchestrator is preferred. Is this correct?
DAG graph is attached. DAG definition below.
# dag.py
from typing import Union
from pathlib import Path
from PIL import Image
import pandas as pd
from hamilton.function_modifiers import source, extract_fields
# These are excluded from the DAG
from utils import (
pdf_to_images,
preprocess_image,
image_to_tesseract_data,
TesseractDocumentSummary,
)
from hamilton.htypes import Parallelizable, Collect
import numpy as np
@source
def pdf_path() -> Union[str, Path]:
pass
def pdf_images(pdf_path: Union[str, Path], kwargs: dict = {}) -> list[Image.Image]:
default_kwargs = {"dpi": 300, "thread_count": 4}
kwargs = {**default_kwargs, **kwargs}
return pdf_to_images(pdf_path, **kwargs)
def each_image(pdf_images: list[Image.Image]) -> Parallelizable[Image.Image]:
for image in pdf_images:
yield image
def processed_image(each_image: Image.Image) -> dict:
preprocessed_image = preprocess_image(each_image)
tesseract_output = image_to_tesseract_data(preprocessed_image)
difficulty_summary = TesseractDocumentSummary.from_tesseract_data(tesseract_output)
return {
"preprocessed_image": preprocessed_image,
"tesseract_output": tesseract_output,
"difficulty_summary": difficulty_summary,
}
@extract_fields(
{
"preprocessed_images": list[np.ndarray],
"tesseract_outputs": pd.DataFrame,
"difficulty_table": pd.DataFrame,
}
)
def processed_images(
processed_image: Collect[dict],
) -> dict:
return {
"preprocessed_images": list(
_processed_image["preprocessed_image"]
for _processed_image in processed_image
),
"tesseract_outputs": pd.concat(
[
_processed_image["tesseract_output"]
for _processed_image in processed_image
],
ignore_index=True,
),
"difficulty_table": pd.DataFrame(
[
_processed_image["difficulty_summary"]
for _processed_image in processed_image
]
),
}
Matías Fuentes
04/19/2025, 1:57 AMVolker Lorrmann
04/24/2025, 7:43 AMVolker Lorrmann
04/24/2025, 9:54 AMMattias Fornander US
04/24/2025, 4:48 PMMattias Fornander US
04/24/2025, 4:55 PMMattias Fornander US
04/24/2025, 10:22 PMSeth Stokes
05/07/2025, 8:07 PM# subdag.py
def mapping() -> pd.DataFrame:
...
@subdag(
prepare_src1, prepare_src2, prepare_src3,
inputs={
"directory": source("directory"),
"pattern_src1": source("pattern_src1"),
"pattern_src2": source("pattern_src2"),
"pattern_src3": source("pattern_src3"),
},
external_inputs=source("mapping")
)
def merge_and_stuff(
prepared_src1: pd.DataFrame,
prepared_src2: pd.DataFrame,
prepared_src3: pd.DataFrame,
mapping: pd.DataFrame
) -> pd.DataFrame:
...
Error:
TypeError: argument of type 'UpstreamDependency' is not iterableSeth Stokes
05/15/2025, 11:02 PMoverrides
on the raw_df.raw
node? I am currently coming across an issue where the step(_prepare)
is failing because it is depending on the mapping done in raw_df
. Since I am using pipe_output
I am thinking that the body of raw_df
should have been executed (and executed before) since raw_df.raw
is one step above.
@pipe_output(
step(_prepare_data)
)
def raw_df(path: Path) -> pd.DataFrame:
# some mapping
...
def prep_df(raw_prev_643: pd.DataFrame) -> pd.DataFrame:
...
results: dict = dr.execute(["prep_df"], inputs={}, overrides={
"path": "path/to/file",
"raw_df.raw": raw_df, # loaded outside of dag
}
)
Slack ConversationMark Wotton
05/30/2025, 9:19 AMMark Wotton
05/30/2025, 9:21 AMMauricio
06/16/2025, 7:41 PMto
materialization to execute a function after it happens?
# run.py
materializers = [to.csv(id="save_file", dependencies=["file_modify"], path="./out.csv")]
...driver build and execution
# pipeline.py
def load_saved_file(save_file, remote_path):
fs.put(save_file, remote_path)
When I try to reference like this I get Duplicate node names found
Any advice?Louwrens
06/20/2025, 1:42 PMSAURABH SINGH
07/19/2025, 8:18 AMNic Pottier
07/21/2025, 5:17 PMasyncio.run()
loop within a Hamilton node to parallelize some work in that node? It seems to work, but I'm curious if there are gotchas.Hassan S
07/22/2025, 7:37 AMHassan S
07/22/2025, 7:40 AMHassan S
07/22/2025, 7:50 AMNic Pottier
07/22/2025, 2:40 PMconfig
.. I then have sub config nodes that take config
as an input which are then used downstream. That makes sure those downstream nodes only reevaluate if the parts of the config that matter to them change.Hassan S
07/23/2025, 2:15 PMJannik Buhr
07/25/2025, 12:58 PMParallelizable
. This leads to unexpectedly long execute times, even though the data was in fact restored from the cache. I suspect this is due to the number of nodes that are being checked. Essentially, I have a parallel Node that for a list of directories goes into the directory and processes some data to return a DataFrame. All DataFrames then get collected into one big DataFrame in the end. What are your recommendations? Is there a way for me to give hamilton more information about the data such that it can speed up cache lookups? And is there the opposite of @cache(behavior="recompute")
to tell hamilton that something is assumed to be up-to-date?
Doing
id = dr.cache.last_run_id
data_version = dr.cache.data_versions[id]['all_frames']
result_store.get(data_version)
is an option, but it would still be nice to speed up dr.execute
when the cache is up-to-date.Hassan S
07/28/2025, 12:14 PMHassan S
07/31/2025, 9:24 AMHassan S
07/31/2025, 9:37 AMFabian Preiß
08/04/2025, 12:33 PMHassan S
08/07/2025, 10:49 PMDries Hugaerts
08/16/2025, 10:38 AMCOUNTRY_CODES = ['BE', 'NL', 'FR', 'DE', ...]
load_parameterization = { f"{cc.lower()}_load": dict(cc=value(cc)) for cc in COUNTRY_CODES }
@parameterize(**load_parameterization)
def country_load(cc: str) -> pd.Series:
# Some specific way of getting the countries load
return get_load(cc)
without resorting to this:
@tag(cc='BE')
def be_load():
return get_load('BE')
@tag(cc='NL')
def nl_load():
return get_load('NL')
...
Is something like this already possible?
I appreciate the help.
Cheers Dries