Seth Stokes
02/18/2025, 10:33 PMfinal_result
will all the step
functions be applied to upstream_int
when I get it in the function body? What if it were pipe_output
?
@pipe_input(
step(_add_one),
step(_multiply, y=2),
step(_sum, y=value(3)),
step(_multiply, y=source("upstream_node_to_multiply")),
)
def final_result(upstream_int: int) -> int:
pdb.set_trace()
return upstream_int
Slack ConversationSeth Stokes
02/20/2025, 9:04 PMlast_run
is being used and its reflected by the function getting used _with_filter_run_date
.
However, for documentation purposes, how would you suggest amending so that all last_run
config options are visible in the DAG? Is this possible?
@extract_fields({
"child_process_run_id": int,
"child_process_completed_at": pd.Timestamp,
})
@pipe_input(
step(_filter_run_bom).when(last_run="bom"),
step(_filter_run_latest).when(last_run="last"),
step(_filter_run_date, completion_date=source("completion_date")).when(last_run="completion_date"),
)
def child_process_run_info(
child_process_runs: pd.DataFrame,
child_rec_process_code: str,
completion_date: Optional[str] = None
) -> Dict[int, pd.Timestamp]:
"""Return run id for process from `last_run` logic."""
Slack ConversationJonas Meyer-Ohle
02/21/2025, 4:36 PMEmeka Anyanwu
02/21/2025, 7:37 PMSeth Stokes
02/25/2025, 4:40 PMconfig
outside of the dag and executing the driver
for each config.
To using Parallelizable/Collect
to handling this and get some speed up.
The issue I am having is that previously, the dag depended on the `config`to know which downstream functions to call via config.when
Now this config variable is the iterated list, is there a way to still expose each of these so that the dag can know which downstream functions to call?Elijah Ben Izzy
02/25/2025, 5:37 PMBenoit Aubuchon
03/03/2025, 6:07 PMSeth Stokes
03/03/2025, 6:19 PM@check_output
does hamilton
have the ability to split the offending rows into two datasets?
@check_output(positions_schema)
def final_dataframe_before_api_call(prepped_df: pd.DataFrame) -> pd.DataFrame:
return prepped_df
def final_dataframe_validated(prepped_df: pd.DataFrame) -> pd.DataFrame:
return prepped_df
def final_dataframe_violated(prepped_df: pd.DataFrame) -> pd.DataFrame:
return prepped_df
def api_request_object(final_df_validated: pd.DataFrame) -> dict:
return final_df_validated.to_dict(orient="records")
def post_request_to_api(api_request_object: dict) -> Any:
...
Seth Stokes
03/04/2025, 5:10 PMextract_columns
on a `Collect`'ed node?
I am getting a seemingly unrelated error but I just added that so thats what I am suspecting.Stefan Krawczyk
03/04/2025, 5:46 PMLeonard Volarić Horvat
03/04/2025, 9:06 PMmrt 04 16:24:00 myhost sudo[841]: cur.execute("INSERT INTO history (cache_key, run_id) VALUES (?, ?)", (cache_key, run_id))
mrt 04 16:24:00 myhost sudo[841]: sqlite3.OperationalError: attempt to write a readonly database
Unfortunately, I don't have access to this machine right now. This happened at a client, and a colleague lifted the logs for later inspection. I will have access to the machine later this week, but I would like some hypotheses on what went wrong. I would first look at simply the disk being clogged up, but we've put Hamilton's cache in /dev/shm/hamilton_cache
, which does not reside on disk, AFAIK. And the RAM equivalent of this problem would likely result in the OOM killer killing... well, something noticeable. But since this worked perfectly fine for months, I can't help but suspect a system-wide issue. But I might be wrong. Any suggestions on where to look?Hai Nguyen Hong
03/06/2025, 9:32 AMSeth Stokes
03/07/2025, 8:18 PMError: Type requirement mismatch.
Expected irm:<class 'irm2.api.ice_risk_model_api.IceRiskModel'>
got <irm2.api.ice_risk_model_api.IceRiskModel object at 0x00000191729D64B0>:<class 'irm2.api.ice_risk_model_api.IceRiskModel'> instead.
Slackbot
03/11/2025, 5:52 PMJonas Meyer-Ohle
03/17/2025, 4:22 PMSeth Stokes
03/17/2025, 9:33 PMGracefulErrorAdapter
's in this way?
I'd like to be able to catch multipleexceptions in this way.
It is currently not working as expected.
from selenium.common.exceptions import WebDriverException, TimeoutException
dr = (
driver.Builder()
.with_modules(dataflow)
.with_adapters([
base.PandasDataFrameResult(),
GracefulErrorAdapter(
error_to_catch=WebDriverException,
sentinel_value=pd.DataFrame()
),
GracefulErrorAdapter(
error_to_catch=TimeoutException,
sentinel_value=pd.DataFrame()
)
])
.build()
)
Piotr Bieszczad
03/22/2025, 9:54 AMparameterize_extract_columns
decorator, or preferably parameterize_extract_fields
.
From my understanding, those are not yet available?
Will they be available in near future?
Thanks in advance for the answer!
And thank you for developing this great tool. I find it really clever.Volker Lorrmann
03/24/2025, 6:04 PMREACT_APP_HAMILTON_SUB_PATH
.
https://hamilton.dagworks.io/en/latest/hamilton-ui/ui/#self-hosting
I wonder what my nginx conf should look like for the subpath /hamilton
.
I´ve tried:
...
location /hamilton/ {
proxy_pass <http://frontend:8242>;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
...
But this does not work. It failes, because https://domain.com/static/js/main.77c4424c.js is not found.
Note: I have also asked for help here, as this might be of interested for others: https://github.com/DAGWorks-Inc/hamilton/pull/1284#issuecomment-2749049751Seth Stokes
03/25/2025, 5:55 PM@subdag
to a Parallelizable
node?
I am trying to do the following but and missing some of the plumbing.Shoaib
03/26/2025, 3:26 PMMark Goodhead
03/29/2025, 11:07 PMPiotr Bieszczad
03/31/2025, 9:56 AMstep
.
I wanted to use @check_output_custom
, but the validation is not propagated.
Is there a way to do this?
Example:
@check_output_custom(CustomValidator())
def foo(...) -> ...:
...
return result
@pipe_output(
step(foo).named('foo_applied'),
...,
)
def bar(...) -> ...:
return result
Validation is not applied to foo_applied
Thank you in advance for reply
----------------------------------------------------------
Also is there a way of acting upon failure?
E.g I know, that if validation fails in one of the previous steps, than I want to return a flag inference failed
at the end of the flow, on top of exiting it.
Basically some functionality for early termination of a flowLuke
04/06/2025, 1:34 AMprocessed_image
node)?
• I don’t love some of my node names and abstractions — each_image
, processed_image
-> processed_images
😐. Any recommendations?
◦ This expansion is especially weird and points to poor naming: list(_processed_image["preprocessed_image"] for _processed_image in processed_image)
I’ve collapsed the image processing steps into one node, processed_image
. I’d rather explode this into preprocessed_image
, tesseract_output
, tesseract_summary
, etc… Ideally, the results of each of these stages could be accessed without further processing before all results are Collected. With that in mind, I realize the current implementation is probably fine for this level of the problem. However, I’ll need a different solution for the “meta-scale” problem — processing thousands of PDFs. Within this context, I’ll need to queue, monitor, and save artifacts for each PDF individually. I wont be collapsing all processing into a single node as I’ve done here. I suspect this is where Hamilton’s scope ends and a full-featured Orchestrator is preferred. Is this correct?
DAG graph is attached. DAG definition below.
# dag.py
from typing import Union
from pathlib import Path
from PIL import Image
import pandas as pd
from hamilton.function_modifiers import source, extract_fields
# These are excluded from the DAG
from utils import (
pdf_to_images,
preprocess_image,
image_to_tesseract_data,
TesseractDocumentSummary,
)
from hamilton.htypes import Parallelizable, Collect
import numpy as np
@source
def pdf_path() -> Union[str, Path]:
pass
def pdf_images(pdf_path: Union[str, Path], kwargs: dict = {}) -> list[Image.Image]:
default_kwargs = {"dpi": 300, "thread_count": 4}
kwargs = {**default_kwargs, **kwargs}
return pdf_to_images(pdf_path, **kwargs)
def each_image(pdf_images: list[Image.Image]) -> Parallelizable[Image.Image]:
for image in pdf_images:
yield image
def processed_image(each_image: Image.Image) -> dict:
preprocessed_image = preprocess_image(each_image)
tesseract_output = image_to_tesseract_data(preprocessed_image)
difficulty_summary = TesseractDocumentSummary.from_tesseract_data(tesseract_output)
return {
"preprocessed_image": preprocessed_image,
"tesseract_output": tesseract_output,
"difficulty_summary": difficulty_summary,
}
@extract_fields(
{
"preprocessed_images": list[np.ndarray],
"tesseract_outputs": pd.DataFrame,
"difficulty_table": pd.DataFrame,
}
)
def processed_images(
processed_image: Collect[dict],
) -> dict:
return {
"preprocessed_images": list(
_processed_image["preprocessed_image"]
for _processed_image in processed_image
),
"tesseract_outputs": pd.concat(
[
_processed_image["tesseract_output"]
for _processed_image in processed_image
],
ignore_index=True,
),
"difficulty_table": pd.DataFrame(
[
_processed_image["difficulty_summary"]
for _processed_image in processed_image
]
),
}
Matías Fuentes
04/19/2025, 1:57 AMVolker Lorrmann
04/24/2025, 7:43 AMVolker Lorrmann
04/24/2025, 9:54 AMMattias Fornander US
04/24/2025, 4:48 PMMattias Fornander US
04/24/2025, 4:55 PMMattias Fornander US
04/24/2025, 10:22 PMSeth Stokes
05/07/2025, 8:07 PM# subdag.py
def mapping() -> pd.DataFrame:
...
@subdag(
prepare_src1, prepare_src2, prepare_src3,
inputs={
"directory": source("directory"),
"pattern_src1": source("pattern_src1"),
"pattern_src2": source("pattern_src2"),
"pattern_src3": source("pattern_src3"),
},
external_inputs=source("mapping")
)
def merge_and_stuff(
prepared_src1: pd.DataFrame,
prepared_src2: pd.DataFrame,
prepared_src3: pd.DataFrame,
mapping: pd.DataFrame
) -> pd.DataFrame:
...
Error:
TypeError: argument of type 'UpstreamDependency' is not iterable