https://github.com/stitchfix/hamilton logo
Join Slack
Powered by
# hamilton-help
  • s

    Seth Stokes

    02/18/2025, 10:33 PM
    Hey, If I am trying to debug this function
    final_result
    will all the
    step
    functions be applied to
    upstream_int
    when I get it in the function body? What if it were
    pipe_output
    ?
    Copy code
    @pipe_input(
        step(_add_one),
        step(_multiply, y=2),
        step(_sum, y=value(3)),
        step(_multiply, y=source("upstream_node_to_multiply")),
    )
    def final_result(upstream_int: int) -> int:
        pdb.set_trace()
        return upstream_int
    Slack Conversation
    s
    • 2
    • 4
  • s

    Seth Stokes

    02/20/2025, 9:04 PM
    Hey, Is it possible to see all the configs available for a DAG? For example, I want to see all possible `last_run`config options. When I pass the config (in green), the DAG is nice to tell me which
    last_run
    is being used and its reflected by the function getting used
    _with_filter_run_date
    . However, for documentation purposes, how would you suggest amending so that all
    last_run
    config options are visible in the DAG? Is this possible?
    Copy code
    @extract_fields({
        "child_process_run_id": int,
        "child_process_completed_at": pd.Timestamp,
    })
    @pipe_input(
        step(_filter_run_bom).when(last_run="bom"),
        step(_filter_run_latest).when(last_run="last"),
        step(_filter_run_date, completion_date=source("completion_date")).when(last_run="completion_date"),
    )
    def child_process_run_info(
        child_process_runs: pd.DataFrame, 
        child_rec_process_code: str, 
        completion_date: Optional[str] = None
    ) -> Dict[int, pd.Timestamp]:
        """Return run id for process from `last_run` logic."""
    Slack Conversation
    s
    • 2
    • 8
  • j

    Jonas Meyer-Ohle

    02/21/2025, 4:36 PM
    Hiya, Thanks for the hamilton package, I'm currently setting up the hamilton-ui on a domain subpath, using nginx as a reverse proxy. Having some issues getting this to work since the frontend is expecting the main.js and css file to be located under the main path. Has anyone had any success to get this to work? I've done similar with grafana and the following environment variables exist to make it work: - GF_SERVER_ROOT_URL=%(protocol)s://%(domain)s/grafana - GF_SERVER_SERVE_FROM_SUB_PATH=true Thanks for any help :)
    s
    e
    • 3
    • 21
  • e

    Emeka Anyanwu

    02/21/2025, 7:37 PM
    Hey all! Is there any way to access a node's metadata like run_id from within the node itself? I'd like to use to add it to some manual telemetry.
    e
    s
    • 3
    • 45
  • s

    Seth Stokes

    02/25/2025, 4:40 PM
    Looking forward to office hours today to hopefully work through this. I recently migrated from looping over the
    config
    outside of the dag and executing the
    driver
    for each config. To using
    Parallelizable/Collect
    to handling this and get some speed up. The issue I am having is that previously, the dag depended on the `config`to know which downstream functions to call via
    config.when
    Now this config variable is the iterated list, is there a way to still expose each of these so that the dag can know which downstream functions to call?
  • e

    Elijah Ben Izzy

    02/25/2025, 5:37 PM
    <!here> office hours starting now - https://meet.google.com/enx-bhus-fae
  • b

    Benoit Aubuchon

    03/03/2025, 6:07 PM
    I'm slowly porting an existing codebase to Hamilton and I'm trying to access the result of one of the workflow step. The result is a DataFrame. I made a function to return it (and called from the execute()) but when I access it from the execute function, each column is a key in the result dictionary. Is there a way to not have hamilton deconstruct the dataframe in the result?
    t
    • 2
    • 2
  • s

    Seth Stokes

    03/03/2025, 6:19 PM
    when using
    @check_output
    does
    hamilton
    have the ability to split the offending rows into two datasets?
    Copy code
    @check_output(positions_schema)
    def final_dataframe_before_api_call(prepped_df: pd.DataFrame) -> pd.DataFrame:
        return prepped_df
    
    def final_dataframe_validated(prepped_df: pd.DataFrame) -> pd.DataFrame:
        return prepped_df
    
    def final_dataframe_violated(prepped_df: pd.DataFrame) -> pd.DataFrame:
        return prepped_df
    
    def api_request_object(final_df_validated: pd.DataFrame) -> dict:
        return final_df_validated.to_dict(orient="records")
    
    def post_request_to_api(api_request_object: dict) -> Any:
        ...
    s
    • 2
    • 1
  • s

    Seth Stokes

    03/04/2025, 5:10 PM
    Hey should I be able to
    extract_columns
    on a `Collect`'ed node? I am getting a seemingly unrelated error but I just added that so thats what I am suspecting.
    s
    t
    • 3
    • 10
  • s

    Stefan Krawczyk

    03/04/2025, 5:46 PM
    Sorry no office hours today - but do ping here if you have Qs!
  • l

    Leonard Volarić Horvat

    03/04/2025, 9:06 PM
    Hi! I have several Hamilton DAGs running in parallel; each DAG runs a distinct type of analysis on some incoming data. Each DAG uses caching for most (but not all) nodes, and each DAG shares the logic for ingesting the data. This has worked perfectly fine for months until today, when all of a sudden I got hit by this error in several DAGs (full traceback in thread, to reduce spam):
    Copy code
    mrt 04 16:24:00 myhost sudo[841]:     cur.execute("INSERT INTO history (cache_key, run_id) VALUES (?, ?)", (cache_key, run_id))
    mrt 04 16:24:00 myhost sudo[841]: sqlite3.OperationalError: attempt to write a readonly database
    Unfortunately, I don't have access to this machine right now. This happened at a client, and a colleague lifted the logs for later inspection. I will have access to the machine later this week, but I would like some hypotheses on what went wrong. I would first look at simply the disk being clogged up, but we've put Hamilton's cache in
    /dev/shm/hamilton_cache
    , which does not reside on disk, AFAIK. And the RAM equivalent of this problem would likely result in the OOM killer killing... well, something noticeable. But since this worked perfectly fine for months, I can't help but suspect a system-wide issue. But I might be wrong. Any suggestions on where to look?
    t
    l
    • 3
    • 17
  • h

    Hai Nguyen Hong

    03/06/2025, 9:32 AM
    Hi Mr.Hamilton ! I have a trouble after deploying WrenAI with an error: "Failed to prepare semantics: Cannot serialize token-based secret. Use an alternative secret type like environment variables." I using Ollama, can you help me ? I am in Vietnam, Thanks !
    t
    • 2
    • 6
  • s

    Seth Stokes

    03/07/2025, 8:18 PM
    Hey. How can I fix this? Not really seeing how these are different types.
    Copy code
    Error: Type requirement mismatch. 
    Expected irm:<class 'irm2.api.ice_risk_model_api.IceRiskModel'> 
    got <irm2.api.ice_risk_model_api.IceRiskModel object at 0x00000191729D64B0>:<class 'irm2.api.ice_risk_model_api.IceRiskModel'> instead.
    e
    • 2
    • 7
  • s

    Slackbot

    03/11/2025, 5:52 PM
    This message was deleted.
    t
    • 2
    • 1
  • j

    Jonas Meyer-Ohle

    03/17/2025, 4:22 PM
    Hi there! I'm wondering if there is any best practice guidance about dynamically generating DAGs. I am trying to implement hamilton into a statistics engine that I have, the following steps are undertaken: 1. User defines a configuration file with a list of metrics (yaml file) 2. User calls the engine with this config file, creating a collection of metrics to be run. 3. Engine runs through these metrics, returns a bunch of results to the end user. Hamilton would solve some pain points that I have around parallel runs, caching and visualization of runs, however I'm finding it a bit difficult to find any examples of this on the GitHub (I might just not be able to read...). I have been using ModuleType and setattr to dynamically generate the module but this feels a bit contrived to me. Thanks for any help! Below is an example DAG that I would like to generate depending on the config file inputs: load_data -> preprocess_data -> filter_data -> run_metric -> return_result Currently each step has different configuration available and is not required for each metric, therefore a dynamic approach is best.
    s
    e
    • 3
    • 14
  • s

    Seth Stokes

    03/17/2025, 9:33 PM
    Should I be able to use multiple
    GracefulErrorAdapter
    's in this way? I'd like to be able to catch multipleexceptions in this way. It is currently not working as expected.
    Copy code
    from selenium.common.exceptions import WebDriverException, TimeoutException
    
    dr = (
        driver.Builder()
        .with_modules(dataflow)
        .with_adapters([
            base.PandasDataFrameResult(),
            GracefulErrorAdapter(
                error_to_catch=WebDriverException,
                sentinel_value=pd.DataFrame() 
            ),
            GracefulErrorAdapter(
                error_to_catch=TimeoutException,
                sentinel_value=pd.DataFrame() 
            )
        ])
        .build()
    )
    s
    e
    • 3
    • 5
  • p

    Piotr Bieszczad

    03/22/2025, 9:54 AM
    Hello, I'm interested in using the
    parameterize_extract_columns
    decorator, or preferably
    parameterize_extract_fields
    .
    From my understanding, those are not yet available? Will they be available in near future? Thanks in advance for the answer! And thank you for developing this great tool. I find it really clever.
    e
    • 2
    • 4
  • v

    Volker Lorrmann

    03/24/2025, 6:04 PM
    Hi guys, I have successfully deployed hamitlon ui using docker-compose some time ago. I saw in the docs, that hosting hamilton ui under a subpath is possible now, by using the env variable
    REACT_APP_HAMILTON_SUB_PATH
    . https://hamilton.dagworks.io/en/latest/hamilton-ui/ui/#self-hosting I wonder what my nginx conf should look like for the subpath
    /hamilton
    . I´ve tried:
    Copy code
    ...
            location /hamilton/ {
                proxy_pass <http://frontend:8242>;
                proxy_set_header Host $host;
                proxy_set_header X-Real-IP $remote_addr;
                proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
                proxy_set_header X-Forwarded-Proto $scheme;
            }
    ...
    But this does not work. It failes, because https://domain.com/static/js/main.77c4424c.js is not found. Note: I have also asked for help here, as this might be of interested for others: https://github.com/DAGWorks-Inc/hamilton/pull/1284#issuecomment-2749049751
    s
    j
    • 3
    • 29
  • s

    Seth Stokes

    03/25/2025, 5:55 PM
    Do you have an example of applying
    @subdag
    to a
    Parallelizable
    node? I am trying to do the following but and missing some of the plumbing.
    s
    • 2
    • 14
  • s

    Shoaib

    03/26/2025, 3:26 PM
    Hi All. I am new to Hamilton. My use case is not machine learning but rather converting formulas from spreadsheets to Python code. Hamilton is an option because a formula in one cell depends on the output of a formula in another cell and so on. So there is a DAG effect. It seems I will require to use one function per rule and pass in Series as opposed to dataframes. But then I will have to reassemble the dataframe at the end somehow... Is it okay to just pass the whole dataframe to the function which then appends a column with its transformation and the next function does the same etc. At the end I will then have my dataframe with Transformations?
    s
    b
    • 3
    • 8
  • m

    Mark Goodhead

    03/29/2025, 11:07 PM
    Does Hamilton play well with code which is more structured around classes or would we need to refactor everything to be static functions? I'm wondering what the expected practise is for managing state in a purely functional design?
    e
    s
    • 3
    • 6
  • p

    Piotr Bieszczad

    03/31/2025, 9:56 AM
    Hello, I want to validate the output of a function, that I am later using in pipe_output as a
    step
    . I wanted to use
    @check_output_custom
    , but the validation is not propagated. Is there a way to do this? Example:
    Copy code
    @check_output_custom(CustomValidator())
    def foo(...) -> ...:
        ...
        return result
    
    @pipe_output(
        step(foo).named('foo_applied'),
        ...,
    )
    def bar(...) -> ...:
        return result
    Validation is not applied to
    foo_applied
    Thank you in advance for reply ---------------------------------------------------------- Also is there a way of acting upon failure? E.g I know, that if validation fails in one of the previous steps, than I want to return a flag
    inference failed
    at the end of the flow, on top of exiting it. Basically some functionality for early termination of a flow
    s
    e
    • 3
    • 11
  • l

    Luke

    04/06/2025, 1:34 AM
    Hi all, I’m using Hamilton for some file processing. The process involves splitting a PDF into images. Then, for each image, preprocessing, running Tesseract for OCR, and producing a summary. Requesting some feedback re: • Is my implementation reasonable or are there obvious anti-patterns? • Is this a good application for a subdag for each image (vs. the collapsed
    processed_image
    node)? • I don’t love some of my node names and abstractions —
    each_image
    ,
    processed_image
    ->
    processed_images
    😐. Any recommendations? ◦ This expansion is especially weird and points to poor naming:
    list(_processed_image["preprocessed_image"] for _processed_image in processed_image)
    I’ve collapsed the image processing steps into one node,
    processed_image
    . I’d rather explode this into
    preprocessed_image
    ,
    tesseract_output
    ,
    tesseract_summary
    , etc… Ideally, the results of each of these stages could be accessed without further processing before all results are Collected. With that in mind, I realize the current implementation is probably fine for this level of the problem. However, I’ll need a different solution for the “meta-scale” problem — processing thousands of PDFs. Within this context, I’ll need to queue, monitor, and save artifacts for each PDF individually. I wont be collapsing all processing into a single node as I’ve done here. I suspect this is where Hamilton’s scope ends and a full-featured Orchestrator is preferred. Is this correct? DAG graph is attached. DAG definition below.
    Copy code
    # dag.py
    from typing import Union
    from pathlib import Path
    from PIL import Image
    import pandas as pd
    from hamilton.function_modifiers import source, extract_fields
    # These are excluded from the DAG
    from utils import (
        pdf_to_images,
        preprocess_image,
        image_to_tesseract_data,
        TesseractDocumentSummary,
    )
    from hamilton.htypes import Parallelizable, Collect
    import numpy as np
    
    
    @source
    def pdf_path() -> Union[str, Path]:
        pass
    
    
    def pdf_images(pdf_path: Union[str, Path], kwargs: dict = {}) -> list[Image.Image]:
        default_kwargs = {"dpi": 300, "thread_count": 4}
        kwargs = {**default_kwargs, **kwargs}
        return pdf_to_images(pdf_path, **kwargs)
    
    
    def each_image(pdf_images: list[Image.Image]) -> Parallelizable[Image.Image]:
        for image in pdf_images:
            yield image
    
    
    def processed_image(each_image: Image.Image) -> dict:
        preprocessed_image = preprocess_image(each_image)
        tesseract_output = image_to_tesseract_data(preprocessed_image)
        difficulty_summary = TesseractDocumentSummary.from_tesseract_data(tesseract_output)
    
        return {
            "preprocessed_image": preprocessed_image,
            "tesseract_output": tesseract_output,
            "difficulty_summary": difficulty_summary,
        }
    
    
    @extract_fields(
        {
            "preprocessed_images": list[np.ndarray],
            "tesseract_outputs": pd.DataFrame,
            "difficulty_table": pd.DataFrame,
        }
    )
    def processed_images(
        processed_image: Collect[dict],
    ) -> dict:
        return {
            "preprocessed_images": list(
                _processed_image["preprocessed_image"]
                for _processed_image in processed_image
            ),
            "tesseract_outputs": pd.concat(
                [
                    _processed_image["tesseract_output"]
                    for _processed_image in processed_image
                ],
                ignore_index=True,
            ),
            "difficulty_table": pd.DataFrame(
                [
                    _processed_image["difficulty_summary"]
                    for _processed_image in processed_image
                ]
            ),
        }
    e
    s
    t
    • 4
    • 3
  • m

    Matías Fuentes

    04/19/2025, 1:57 AM
    Hi, a have built a DAG in hamilton that takes about 10 minutes to run. It involves typical pandas sync data transformations, as well as a bunch of async LLM API requests. It runs fine from end to end using hamiltons async_driver. I was wandering if there is a straightforward way to use something like FastAPI StreamingReponses to notify in near-realtime to a client about the progress of the process. For instance, inside one of the nodes I create a bunch of asyncio tasks to process each row of the dataset concurrently with an LLM via API requests, and I would like the client to update a progress bar each time a response arrives. I have multiple nodes that implement this kind of logic, and I would like the client to know in which node the process is currently and the progress of the node. I am aware that Burr provides streaming support, but I am kinda clueless about how to implement this feature in a simple way.
    e
    • 2
    • 5
  • v

    Volker Lorrmann

    04/24/2025, 7:43 AM
    Hi guys, I have the following scenario, for which I do not yet, what´s a good practice to handle it. I am subscribed to a mqtt topic. On a new message, a hamilton pipeline is executed, that gehts the message payload and topic as input. Every payload has a "config" an "data" part. The config holds metadata and control flags. The data in the payload final written into a delta table. Unfortunately, the data is not identical over all messages, so the data processing in the pipeline might be not exactly identical. Is it possible somehow, to use only one pipeline for several, slightly different data processing flows? Can I control the flow based on the inputs or any node output somehow?
    s
    • 2
    • 2
  • v

    Volker Lorrmann

    04/24/2025, 9:54 AM
    Hi guys, I am thinking about retrying and or repeating the execution of a hamilton dataflow (graph execution). There are a lot of python modules (tenacity, retry, retry2, retrying,...) available for retry of failed functions. And eve creating one yourself is straight forward. Most of them can be used by adding a decorator to the function so can be used in hamilton for retrying node executions. However they can not be used for the graph execution. The same is true, for repeating the graph execution. Is there an adapter available for this? If not, is there some documentation on how to build an adapter?
    s
    • 2
    • 3
  • m

    Mattias Fornander US

    04/24/2025, 4:48 PM
    Question1: What's a good way to dynamically spawn subgraphs? Specifically, I'm trying to move away from Dataframes (logic code doesn't need column access, and need types beyond NumPy) towards lists of dataclasses as both input and output. Each "row" needs to be transformed using the same DAG, but independently. I'm looking at subdag but it doesn't fit I think. Right now I'm driving each list item independently through execute, and collecting final_vars back into a list. Ideas?
    s
    • 2
    • 5
  • m

    Mattias Fornander US

    04/24/2025, 4:55 PM
    Question 2: What a good way to represent database access that is visible and clean in HamiltonUI? The initial part of my current data pipeline is the lookup and assembly of the initial configuration. It requires database access to resolve string references into real Python objects and some minor logic. I can do this outside the execute but I also tried feeding execute a plain config (YAML) file and a duckdb connection to have it resolve into a typed instance of a class. I'd like to have as much of the code inside the graph but I'm wondering if this is a good pattern. It seems like it given your examples but I'm also wondering if I'm missing something with materializers and adapters. The graph looks a bit odd since a lot of nodes access the db provided as input. Still a good path?
    s
    • 2
    • 1
  • m

    Mattias Fornander US

    04/24/2025, 10:22 PM
    Any way to control input or output types such that I can use ABCs such as Mapping? I'd like to not collapse my ChainMap into a dict, but right now the Builder can't link up one node that returns a Mapping (on top of ChainMap) into another node that requests a Mapping. I realize the automatic discovery may be intractable or ambiguous. Any ideas in this direction?
    s
    e
    • 3
    • 7
  • s

    Seth Stokes

    05/07/2025, 8:07 PM
    Hello, should I expect something like this to be allowed?
    Copy code
    # subdag.py
    
    def mapping() -> pd.DataFrame:
        ...
    
    
    @subdag(
        prepare_src1, prepare_src2, prepare_src3,
        inputs={
            "directory": source("directory"),
            "pattern_src1": source("pattern_src1"),
            "pattern_src2": source("pattern_src2"),
            "pattern_src3": source("pattern_src3"),
        },
        external_inputs=source("mapping")
    )
    def merge_and_stuff(
        prepared_src1: pd.DataFrame,
        prepared_src2: pd.DataFrame,
        prepared_src3: pd.DataFrame,
        mapping: pd.DataFrame
    ) -> pd.DataFrame:
        ...
    Error: TypeError: argument of type 'UpstreamDependency' is not iterable
    s
    • 2
    • 2