https://github.com/stitchfix/hamilton logo
Join Slack
Powered by
# hamilton-help
  • s

    Shoaib

    03/26/2025, 3:26 PM
    Hi All. I am new to Hamilton. My use case is not machine learning but rather converting formulas from spreadsheets to Python code. Hamilton is an option because a formula in one cell depends on the output of a formula in another cell and so on. So there is a DAG effect. It seems I will require to use one function per rule and pass in Series as opposed to dataframes. But then I will have to reassemble the dataframe at the end somehow... Is it okay to just pass the whole dataframe to the function which then appends a column with its transformation and the next function does the same etc. At the end I will then have my dataframe with Transformations?
    s
    b
    • 3
    • 8
  • m

    Mark Goodhead

    03/29/2025, 11:07 PM
    Does Hamilton play well with code which is more structured around classes or would we need to refactor everything to be static functions? I'm wondering what the expected practise is for managing state in a purely functional design?
    e
    s
    • 3
    • 6
  • p

    Piotr Bieszczad

    03/31/2025, 9:56 AM
    Hello, I want to validate the output of a function, that I am later using in pipe_output as a
    step
    . I wanted to use
    @check_output_custom
    , but the validation is not propagated. Is there a way to do this? Example:
    Copy code
    @check_output_custom(CustomValidator())
    def foo(...) -> ...:
        ...
        return result
    
    @pipe_output(
        step(foo).named('foo_applied'),
        ...,
    )
    def bar(...) -> ...:
        return result
    Validation is not applied to
    foo_applied
    Thank you in advance for reply ---------------------------------------------------------- Also is there a way of acting upon failure? E.g I know, that if validation fails in one of the previous steps, than I want to return a flag
    inference failed
    at the end of the flow, on top of exiting it. Basically some functionality for early termination of a flow
    s
    e
    • 3
    • 11
  • l

    Luke

    04/06/2025, 1:34 AM
    Hi all, I’m using Hamilton for some file processing. The process involves splitting a PDF into images. Then, for each image, preprocessing, running Tesseract for OCR, and producing a summary. Requesting some feedback re: • Is my implementation reasonable or are there obvious anti-patterns? • Is this a good application for a subdag for each image (vs. the collapsed
    processed_image
    node)? • I don’t love some of my node names and abstractions —
    each_image
    ,
    processed_image
    ->
    processed_images
    😐. Any recommendations? ◦ This expansion is especially weird and points to poor naming:
    list(_processed_image["preprocessed_image"] for _processed_image in processed_image)
    I’ve collapsed the image processing steps into one node,
    processed_image
    . I’d rather explode this into
    preprocessed_image
    ,
    tesseract_output
    ,
    tesseract_summary
    , etc… Ideally, the results of each of these stages could be accessed without further processing before all results are Collected. With that in mind, I realize the current implementation is probably fine for this level of the problem. However, I’ll need a different solution for the “meta-scale” problem — processing thousands of PDFs. Within this context, I’ll need to queue, monitor, and save artifacts for each PDF individually. I wont be collapsing all processing into a single node as I’ve done here. I suspect this is where Hamilton’s scope ends and a full-featured Orchestrator is preferred. Is this correct? DAG graph is attached. DAG definition below.
    Copy code
    # dag.py
    from typing import Union
    from pathlib import Path
    from PIL import Image
    import pandas as pd
    from hamilton.function_modifiers import source, extract_fields
    # These are excluded from the DAG
    from utils import (
        pdf_to_images,
        preprocess_image,
        image_to_tesseract_data,
        TesseractDocumentSummary,
    )
    from hamilton.htypes import Parallelizable, Collect
    import numpy as np
    
    
    @source
    def pdf_path() -> Union[str, Path]:
        pass
    
    
    def pdf_images(pdf_path: Union[str, Path], kwargs: dict = {}) -> list[Image.Image]:
        default_kwargs = {"dpi": 300, "thread_count": 4}
        kwargs = {**default_kwargs, **kwargs}
        return pdf_to_images(pdf_path, **kwargs)
    
    
    def each_image(pdf_images: list[Image.Image]) -> Parallelizable[Image.Image]:
        for image in pdf_images:
            yield image
    
    
    def processed_image(each_image: Image.Image) -> dict:
        preprocessed_image = preprocess_image(each_image)
        tesseract_output = image_to_tesseract_data(preprocessed_image)
        difficulty_summary = TesseractDocumentSummary.from_tesseract_data(tesseract_output)
    
        return {
            "preprocessed_image": preprocessed_image,
            "tesseract_output": tesseract_output,
            "difficulty_summary": difficulty_summary,
        }
    
    
    @extract_fields(
        {
            "preprocessed_images": list[np.ndarray],
            "tesseract_outputs": pd.DataFrame,
            "difficulty_table": pd.DataFrame,
        }
    )
    def processed_images(
        processed_image: Collect[dict],
    ) -> dict:
        return {
            "preprocessed_images": list(
                _processed_image["preprocessed_image"]
                for _processed_image in processed_image
            ),
            "tesseract_outputs": pd.concat(
                [
                    _processed_image["tesseract_output"]
                    for _processed_image in processed_image
                ],
                ignore_index=True,
            ),
            "difficulty_table": pd.DataFrame(
                [
                    _processed_image["difficulty_summary"]
                    for _processed_image in processed_image
                ]
            ),
        }
    e
    s
    t
    • 4
    • 3
  • m

    Matías Fuentes

    04/19/2025, 1:57 AM
    Hi, a have built a DAG in hamilton that takes about 10 minutes to run. It involves typical pandas sync data transformations, as well as a bunch of async LLM API requests. It runs fine from end to end using hamiltons async_driver. I was wandering if there is a straightforward way to use something like FastAPI StreamingReponses to notify in near-realtime to a client about the progress of the process. For instance, inside one of the nodes I create a bunch of asyncio tasks to process each row of the dataset concurrently with an LLM via API requests, and I would like the client to update a progress bar each time a response arrives. I have multiple nodes that implement this kind of logic, and I would like the client to know in which node the process is currently and the progress of the node. I am aware that Burr provides streaming support, but I am kinda clueless about how to implement this feature in a simple way.
    e
    • 2
    • 5
  • v

    Volker Lorrmann

    04/24/2025, 7:43 AM
    Hi guys, I have the following scenario, for which I do not yet, what´s a good practice to handle it. I am subscribed to a mqtt topic. On a new message, a hamilton pipeline is executed, that gehts the message payload and topic as input. Every payload has a "config" an "data" part. The config holds metadata and control flags. The data in the payload final written into a delta table. Unfortunately, the data is not identical over all messages, so the data processing in the pipeline might be not exactly identical. Is it possible somehow, to use only one pipeline for several, slightly different data processing flows? Can I control the flow based on the inputs or any node output somehow?
    s
    • 2
    • 2
  • v

    Volker Lorrmann

    04/24/2025, 9:54 AM
    Hi guys, I am thinking about retrying and or repeating the execution of a hamilton dataflow (graph execution). There are a lot of python modules (tenacity, retry, retry2, retrying,...) available for retry of failed functions. And eve creating one yourself is straight forward. Most of them can be used by adding a decorator to the function so can be used in hamilton for retrying node executions. However they can not be used for the graph execution. The same is true, for repeating the graph execution. Is there an adapter available for this? If not, is there some documentation on how to build an adapter?
    s
    • 2
    • 3
  • m

    Mattias Fornander US

    04/24/2025, 4:48 PM
    Question1: What's a good way to dynamically spawn subgraphs? Specifically, I'm trying to move away from Dataframes (logic code doesn't need column access, and need types beyond NumPy) towards lists of dataclasses as both input and output. Each "row" needs to be transformed using the same DAG, but independently. I'm looking at subdag but it doesn't fit I think. Right now I'm driving each list item independently through execute, and collecting final_vars back into a list. Ideas?
    s
    • 2
    • 5
  • m

    Mattias Fornander US

    04/24/2025, 4:55 PM
    Question 2: What a good way to represent database access that is visible and clean in HamiltonUI? The initial part of my current data pipeline is the lookup and assembly of the initial configuration. It requires database access to resolve string references into real Python objects and some minor logic. I can do this outside the execute but I also tried feeding execute a plain config (YAML) file and a duckdb connection to have it resolve into a typed instance of a class. I'd like to have as much of the code inside the graph but I'm wondering if this is a good pattern. It seems like it given your examples but I'm also wondering if I'm missing something with materializers and adapters. The graph looks a bit odd since a lot of nodes access the db provided as input. Still a good path?
    s
    • 2
    • 1
  • m

    Mattias Fornander US

    04/24/2025, 10:22 PM
    Any way to control input or output types such that I can use ABCs such as Mapping? I'd like to not collapse my ChainMap into a dict, but right now the Builder can't link up one node that returns a Mapping (on top of ChainMap) into another node that requests a Mapping. I realize the automatic discovery may be intractable or ambiguous. Any ideas in this direction?
    s
    e
    • 3
    • 7
  • s

    Seth Stokes

    05/07/2025, 8:07 PM
    Hello, should I expect something like this to be allowed?
    Copy code
    # subdag.py
    
    def mapping() -> pd.DataFrame:
        ...
    
    
    @subdag(
        prepare_src1, prepare_src2, prepare_src3,
        inputs={
            "directory": source("directory"),
            "pattern_src1": source("pattern_src1"),
            "pattern_src2": source("pattern_src2"),
            "pattern_src3": source("pattern_src3"),
        },
        external_inputs=source("mapping")
    )
    def merge_and_stuff(
        prepared_src1: pd.DataFrame,
        prepared_src2: pd.DataFrame,
        prepared_src3: pd.DataFrame,
        mapping: pd.DataFrame
    ) -> pd.DataFrame:
        ...
    Error: TypeError: argument of type 'UpstreamDependency' is not iterable
    s
    • 2
    • 2
  • s

    Seth Stokes

    05/15/2025, 11:02 PM
    Am I allowed to use
    overrides
    on the
    raw_df.raw
    node? I am currently coming across an issue where the
    step(_prepare)
    is failing because it is depending on the mapping done in
    raw_df
    . Since I am using
    pipe_output
    I am thinking that the body of
    raw_df
    should have been executed (and executed before) since
    raw_df.raw
    is one step above.
    Copy code
    @pipe_output(
        step(_prepare_data)
    )
    def raw_df(path: Path) -> pd.DataFrame:
        # some mapping
        ...
        
    
    def prep_df(raw_prev_643: pd.DataFrame) -> pd.DataFrame:
        ...
    
    
    results: dict = dr.execute(["prep_df"], inputs={}, overrides={
        "path": "path/to/file",
        "raw_df.raw": raw_df, # loaded outside of dag
        }
    )
    Slack Conversation
    s
    • 2
    • 2
  • m

    Mark Wotton

    05/30/2025, 9:19 AM
    is there a way to return a lazy generator of values from a node? i have a process that is going to need some indeterminate amount of data, but i won't know how much until it either succeeds or fails. i can sort of see how i could just set a static number of elements to generate as part of the hamilton input variables, and just increment it upwards every time i throw NotEnoughData or something similar, but is there an idiomatic way to do this?
    s
    • 2
    • 4
  • m

    Mark Wotton

    05/30/2025, 9:21 AM
    (i can see why it is unreasonable to return a generator from a node, it's not very cacheable)
  • m

    Mauricio

    06/16/2025, 7:41 PM
    hello, is it possible to reference a
    to
    materialization to execute a function after it happens?
    Copy code
    # run.py
    materializers = [to.csv(id="save_file", dependencies=["file_modify"], path="./out.csv")]
    ...driver build and execution
    
    # pipeline.py
    def load_saved_file(save_file, remote_path):
        fs.put(save_file, remote_path)
    When I try to reference like this I get
    Duplicate node names found
    Any advice?
    s
    • 2
    • 18
  • l

    Louwrens

    06/20/2025, 1:42 PM
    Hello 👋 I was wondering if we can make caching work together with graceful error adapter?
    s
    • 2
    • 12
  • s

    SAURABH SINGH

    07/19/2025, 8:18 AM
    Hi team, I'm trying to use Wren AI, and instead of an Open AI key, I was wondering if I can use ollama and use a local model. However, I keep getting this error, when wren-ai-service is starting, can anyone help me out AttributeError: 'NoneType' object has no attribute 'get_generator' ERROR: Application startup failed. Exiting.
    s
    m
    • 3
    • 3
  • n

    Nic Pottier

    07/21/2025, 5:17 PM
    Is it acceptable to start an
    asyncio.run()
    loop within a Hamilton node to parallelize some work in that node? It seems to work, but I'm curious if there are gotchas.
    s
    e
    • 3
    • 39
  • h

    Hassan S

    07/22/2025, 7:37 AM
    how can i manage and parse my configuration file into my dag builder, especially if I have a nested config file?
  • h

    Hassan S

    07/22/2025, 7:40 AM
    Is it a good practice to have the config dictionary as a parameter to my function code? I was thinking to keep my functions not aware of the configuration
    🙌 1
    s
    • 2
    • 2
  • h

    Hassan S

    07/22/2025, 7:50 AM
    do I have to flat my config? what do you do it usually?
  • n

    Nic Pottier

    07/22/2025, 2:40 PM
    I use OmegaConf for config and parse in in my main then pass that as the root node value for a node named
    config
    .. I then have sub config nodes that take
    config
    as an input which are then used downstream. That makes sure those downstream nodes only reevaluate if the parts of the config that matter to them change.
    👍 4
    t
    • 2
    • 1
  • h

    Hassan S

    07/23/2025, 2:15 PM
    I have a question: let's say I am using pandas to organize the data wrangling in a Hamilton's dag. and let's say I have a group of features that share the same theme: time features or financial features. What I want to know is how to organize these features in the dag in a way I can turn on or off some of them and then combine them to the main dataframe later on in the dataframe. How should I handle this situation? what's the best practice here? please let me know if you need more details, thanks
    s
    • 2
    • 7
  • j

    Jannik Buhr

    07/25/2025, 12:58 PM
    Hi there, I'm using Hamilton's caching in conjunction with
    Parallelizable
    . This leads to unexpectedly long execute times, even though the data was in fact restored from the cache. I suspect this is due to the number of nodes that are being checked. Essentially, I have a parallel Node that for a list of directories goes into the directory and processes some data to return a DataFrame. All DataFrames then get collected into one big DataFrame in the end. What are your recommendations? Is there a way for me to give hamilton more information about the data such that it can speed up cache lookups? And is there the opposite of
    @cache(behavior="recompute")
    to tell hamilton that something is assumed to be up-to-date? Doing
    Copy code
    id = dr.cache.last_run_id
    data_version = dr.cache.data_versions[id]['all_frames']
    result_store.get(data_version)
    is an option, but it would still be nice to speed up
    dr.execute
    when the cache is up-to-date.
    s
    t
    • 3
    • 5
  • h

    Hassan S

    07/28/2025, 12:14 PM
    Hi everyone, how can I manage reading and writing parts of my DAG from/to database. Let me give you the context here: I am building a DAG that has two main abstract functionalities: 1) training a model and updating this model, and I am saving the model's weights unstructuredly into a database, now I want to my DAG to read the model weights from the updates and update it using the incoming new data, how can I architect my DAG is a good way to accomplish that?
    s
    • 2
    • 5
  • h

    Hassan S

    07/31/2025, 9:24 AM
    Hello there, What's the best practice to collect a bunch of pd.Series to construct a pandas dataframe, shall I explicitly specify the input pd.Series nodes in the parameter list and use contact to collect them, can I use instead **kwargs?
    s
    s
    • 3
    • 2
  • h

    Hassan S

    07/31/2025, 9:37 AM
    if I prepared my Machine Learning features separatly and I wanna combine them now for modelling, is it a good practice or might be needed that I should configure the selected features from a yaml config so i can control turning on and off the features ?
    s
    • 2
    • 2
  • f

    Fabian Preiß

    08/04/2025, 12:33 PM
    Hello, I was looking for a Data Loader, that allows to load a variable number of datasets based on a string template similar to the kedro dataset factories but couldn't find a direct equivalent in the documentation. Is there a suggested way to achieve this in Hamilton apart from defining the search path as one of the inputs of the dataflow apart from adjusting the examples given in the loading data documentation? I have seen that you can reuse a Kedro DataCatalog, but it doesn't look like the right approach to me to first setup some Kedro code just to pass it to Hamilton.
    t
    e
    • 3
    • 7
  • h

    Hassan S

    08/07/2025, 10:49 PM
    Hello everyone I want to use the same DAG for both training/updating the model and for inference. During inference, the DAG should load all learned parameters and artifacts from the database, do prediction and output it. During training, the DAG should also load the artifacts if they exist, and do training or updating it and then save it to the database. What's the best practice to architect the DAG in order to use it for these two purposes. Thanks a lot. Note: I know that the two purposes are going share some nodes and will be different in the downstream nodes, but at least I am looking for a clean way to utilize the starting dag because as you know the training and inference pipelines share the same nodes at the beginning and then they branch out
    s
    • 2
    • 1
  • d

    Dries Hugaerts

    08/16/2025, 10:38 AM
    Hello, I'm looking into using tags into my work flows. I use parameterize quite a lot, and I was wondering if there was a way of combining them (with different tags for the different nodes). For example, I'd like to do something like this:
    Copy code
    COUNTRY_CODES = ['BE', 'NL', 'FR', 'DE', ...]
    load_parameterization = { f"{cc.lower()}_load": dict(cc=value(cc)) for cc in COUNTRY_CODES }
    
    @parameterize(**load_parameterization) 
    def country_load(cc: str) -> pd.Series: 
        # Some specific way of getting the countries load
        return get_load(cc)
    without resorting to this:
    Copy code
    @tag(cc='BE')
    def be_load():
    return get_load('BE')
    
    @tag(cc='NL')
    def nl_load():
    return get_load('NL')
    
    ...
    Is something like this already possible? I appreciate the help. Cheers Dries