https://github.com/stitchfix/hamilton logo
Join Slack
Powered by
# hamilton-help
  • m

    Matías Fuentes

    04/19/2025, 1:57 AM
    Hi, a have built a DAG in hamilton that takes about 10 minutes to run. It involves typical pandas sync data transformations, as well as a bunch of async LLM API requests. It runs fine from end to end using hamiltons async_driver. I was wandering if there is a straightforward way to use something like FastAPI StreamingReponses to notify in near-realtime to a client about the progress of the process. For instance, inside one of the nodes I create a bunch of asyncio tasks to process each row of the dataset concurrently with an LLM via API requests, and I would like the client to update a progress bar each time a response arrives. I have multiple nodes that implement this kind of logic, and I would like the client to know in which node the process is currently and the progress of the node. I am aware that Burr provides streaming support, but I am kinda clueless about how to implement this feature in a simple way.
    e
    • 2
    • 5
  • v

    Volker Lorrmann

    04/24/2025, 7:43 AM
    Hi guys, I have the following scenario, for which I do not yet, what´s a good practice to handle it. I am subscribed to a mqtt topic. On a new message, a hamilton pipeline is executed, that gehts the message payload and topic as input. Every payload has a "config" an "data" part. The config holds metadata and control flags. The data in the payload final written into a delta table. Unfortunately, the data is not identical over all messages, so the data processing in the pipeline might be not exactly identical. Is it possible somehow, to use only one pipeline for several, slightly different data processing flows? Can I control the flow based on the inputs or any node output somehow?
    s
    • 2
    • 2
  • v

    Volker Lorrmann

    04/24/2025, 9:54 AM
    Hi guys, I am thinking about retrying and or repeating the execution of a hamilton dataflow (graph execution). There are a lot of python modules (tenacity, retry, retry2, retrying,...) available for retry of failed functions. And eve creating one yourself is straight forward. Most of them can be used by adding a decorator to the function so can be used in hamilton for retrying node executions. However they can not be used for the graph execution. The same is true, for repeating the graph execution. Is there an adapter available for this? If not, is there some documentation on how to build an adapter?
    s
    • 2
    • 3
  • m

    Mattias Fornander US

    04/24/2025, 4:48 PM
    Question1: What's a good way to dynamically spawn subgraphs? Specifically, I'm trying to move away from Dataframes (logic code doesn't need column access, and need types beyond NumPy) towards lists of dataclasses as both input and output. Each "row" needs to be transformed using the same DAG, but independently. I'm looking at subdag but it doesn't fit I think. Right now I'm driving each list item independently through execute, and collecting final_vars back into a list. Ideas?
    s
    • 2
    • 5
  • m

    Mattias Fornander US

    04/24/2025, 4:55 PM
    Question 2: What a good way to represent database access that is visible and clean in HamiltonUI? The initial part of my current data pipeline is the lookup and assembly of the initial configuration. It requires database access to resolve string references into real Python objects and some minor logic. I can do this outside the execute but I also tried feeding execute a plain config (YAML) file and a duckdb connection to have it resolve into a typed instance of a class. I'd like to have as much of the code inside the graph but I'm wondering if this is a good pattern. It seems like it given your examples but I'm also wondering if I'm missing something with materializers and adapters. The graph looks a bit odd since a lot of nodes access the db provided as input. Still a good path?
    s
    • 2
    • 1
  • m

    Mattias Fornander US

    04/24/2025, 10:22 PM
    Any way to control input or output types such that I can use ABCs such as Mapping? I'd like to not collapse my ChainMap into a dict, but right now the Builder can't link up one node that returns a Mapping (on top of ChainMap) into another node that requests a Mapping. I realize the automatic discovery may be intractable or ambiguous. Any ideas in this direction?
    s
    e
    • 3
    • 7
  • s

    Seth Stokes

    05/07/2025, 8:07 PM
    Hello, should I expect something like this to be allowed?
    Copy code
    # subdag.py
    
    def mapping() -> pd.DataFrame:
        ...
    
    
    @subdag(
        prepare_src1, prepare_src2, prepare_src3,
        inputs={
            "directory": source("directory"),
            "pattern_src1": source("pattern_src1"),
            "pattern_src2": source("pattern_src2"),
            "pattern_src3": source("pattern_src3"),
        },
        external_inputs=source("mapping")
    )
    def merge_and_stuff(
        prepared_src1: pd.DataFrame,
        prepared_src2: pd.DataFrame,
        prepared_src3: pd.DataFrame,
        mapping: pd.DataFrame
    ) -> pd.DataFrame:
        ...
    Error: TypeError: argument of type 'UpstreamDependency' is not iterable
    s
    • 2
    • 2
  • s

    Seth Stokes

    05/15/2025, 11:02 PM
    Am I allowed to use
    overrides
    on the
    raw_df.raw
    node? I am currently coming across an issue where the
    step(_prepare)
    is failing because it is depending on the mapping done in
    raw_df
    . Since I am using
    pipe_output
    I am thinking that the body of
    raw_df
    should have been executed (and executed before) since
    raw_df.raw
    is one step above.
    Copy code
    @pipe_output(
        step(_prepare_data)
    )
    def raw_df(path: Path) -> pd.DataFrame:
        # some mapping
        ...
        
    
    def prep_df(raw_prev_643: pd.DataFrame) -> pd.DataFrame:
        ...
    
    
    results: dict = dr.execute(["prep_df"], inputs={}, overrides={
        "path": "path/to/file",
        "raw_df.raw": raw_df, # loaded outside of dag
        }
    )
    Slack Conversation
    s
    • 2
    • 2
  • m

    Mark Wotton

    05/30/2025, 9:19 AM
    is there a way to return a lazy generator of values from a node? i have a process that is going to need some indeterminate amount of data, but i won't know how much until it either succeeds or fails. i can sort of see how i could just set a static number of elements to generate as part of the hamilton input variables, and just increment it upwards every time i throw NotEnoughData or something similar, but is there an idiomatic way to do this?
    s
    • 2
    • 4
  • m

    Mark Wotton

    05/30/2025, 9:21 AM
    (i can see why it is unreasonable to return a generator from a node, it's not very cacheable)
  • m

    Mauricio

    06/16/2025, 7:41 PM
    hello, is it possible to reference a
    to
    materialization to execute a function after it happens?
    Copy code
    # run.py
    materializers = [to.csv(id="save_file", dependencies=["file_modify"], path="./out.csv")]
    ...driver build and execution
    
    # pipeline.py
    def load_saved_file(save_file, remote_path):
        fs.put(save_file, remote_path)
    When I try to reference like this I get
    Duplicate node names found
    Any advice?
    s
    • 2
    • 18
  • l

    Louwrens

    06/20/2025, 1:42 PM
    Hello 👋 I was wondering if we can make caching work together with graceful error adapter?
    s
    • 2
    • 12
  • s

    SAURABH SINGH

    07/19/2025, 8:18 AM
    Hi team, I'm trying to use Wren AI, and instead of an Open AI key, I was wondering if I can use ollama and use a local model. However, I keep getting this error, when wren-ai-service is starting, can anyone help me out AttributeError: 'NoneType' object has no attribute 'get_generator' ERROR: Application startup failed. Exiting.
    s
    m
    • 3
    • 3
  • n

    Nic Pottier

    07/21/2025, 5:17 PM
    Is it acceptable to start an
    asyncio.run()
    loop within a Hamilton node to parallelize some work in that node? It seems to work, but I'm curious if there are gotchas.
    s
    e
    • 3
    • 39
  • h

    Hassan S

    07/22/2025, 7:37 AM
    how can i manage and parse my configuration file into my dag builder, especially if I have a nested config file?
  • h

    Hassan S

    07/22/2025, 7:40 AM
    Is it a good practice to have the config dictionary as a parameter to my function code? I was thinking to keep my functions not aware of the configuration
    🙌 1
    s
    • 2
    • 2
  • h

    Hassan S

    07/22/2025, 7:50 AM
    do I have to flat my config? what do you do it usually?
  • n

    Nic Pottier

    07/22/2025, 2:40 PM
    I use OmegaConf for config and parse in in my main then pass that as the root node value for a node named
    config
    .. I then have sub config nodes that take
    config
    as an input which are then used downstream. That makes sure those downstream nodes only reevaluate if the parts of the config that matter to them change.
    👍 4
    t
    • 2
    • 1
  • h

    Hassan S

    07/23/2025, 2:15 PM
    I have a question: let's say I am using pandas to organize the data wrangling in a Hamilton's dag. and let's say I have a group of features that share the same theme: time features or financial features. What I want to know is how to organize these features in the dag in a way I can turn on or off some of them and then combine them to the main dataframe later on in the dataframe. How should I handle this situation? what's the best practice here? please let me know if you need more details, thanks
    s
    • 2
    • 7
  • j

    Jannik Buhr

    07/25/2025, 12:58 PM
    Hi there, I'm using Hamilton's caching in conjunction with
    Parallelizable
    . This leads to unexpectedly long execute times, even though the data was in fact restored from the cache. I suspect this is due to the number of nodes that are being checked. Essentially, I have a parallel Node that for a list of directories goes into the directory and processes some data to return a DataFrame. All DataFrames then get collected into one big DataFrame in the end. What are your recommendations? Is there a way for me to give hamilton more information about the data such that it can speed up cache lookups? And is there the opposite of
    @cache(behavior="recompute")
    to tell hamilton that something is assumed to be up-to-date? Doing
    Copy code
    id = dr.cache.last_run_id
    data_version = dr.cache.data_versions[id]['all_frames']
    result_store.get(data_version)
    is an option, but it would still be nice to speed up
    dr.execute
    when the cache is up-to-date.
    s
    t
    • 3
    • 5
  • h

    Hassan S

    07/28/2025, 12:14 PM
    Hi everyone, how can I manage reading and writing parts of my DAG from/to database. Let me give you the context here: I am building a DAG that has two main abstract functionalities: 1) training a model and updating this model, and I am saving the model's weights unstructuredly into a database, now I want to my DAG to read the model weights from the updates and update it using the incoming new data, how can I architect my DAG is a good way to accomplish that?
    s
    • 2
    • 5
  • h

    Hassan S

    07/31/2025, 9:24 AM
    Hello there, What's the best practice to collect a bunch of pd.Series to construct a pandas dataframe, shall I explicitly specify the input pd.Series nodes in the parameter list and use contact to collect them, can I use instead **kwargs?
    s
    s
    • 3
    • 2
  • h

    Hassan S

    07/31/2025, 9:37 AM
    if I prepared my Machine Learning features separatly and I wanna combine them now for modelling, is it a good practice or might be needed that I should configure the selected features from a yaml config so i can control turning on and off the features ?
    s
    • 2
    • 2
  • f

    Fabian Preiß

    08/04/2025, 12:33 PM
    Hello, I was looking for a Data Loader, that allows to load a variable number of datasets based on a string template similar to the kedro dataset factories but couldn't find a direct equivalent in the documentation. Is there a suggested way to achieve this in Hamilton apart from defining the search path as one of the inputs of the dataflow apart from adjusting the examples given in the loading data documentation? I have seen that you can reuse a Kedro DataCatalog, but it doesn't look like the right approach to me to first setup some Kedro code just to pass it to Hamilton.
    t
    e
    • 3
    • 7
  • h

    Hassan S

    08/07/2025, 10:49 PM
    Hello everyone I want to use the same DAG for both training/updating the model and for inference. During inference, the DAG should load all learned parameters and artifacts from the database, do prediction and output it. During training, the DAG should also load the artifacts if they exist, and do training or updating it and then save it to the database. What's the best practice to architect the DAG in order to use it for these two purposes. Thanks a lot. Note: I know that the two purposes are going share some nodes and will be different in the downstream nodes, but at least I am looking for a clean way to utilize the starting dag because as you know the training and inference pipelines share the same nodes at the beginning and then they branch out
    s
    • 2
    • 1
  • d

    Dries Hugaerts

    08/16/2025, 10:38 AM
    Hello, I'm looking into using tags into my work flows. I use parameterize quite a lot, and I was wondering if there was a way of combining them (with different tags for the different nodes). For example, I'd like to do something like this:
    Copy code
    COUNTRY_CODES = ['BE', 'NL', 'FR', 'DE', ...]
    load_parameterization = { f"{cc.lower()}_load": dict(cc=value(cc)) for cc in COUNTRY_CODES }
    
    @parameterize(**load_parameterization) 
    def country_load(cc: str) -> pd.Series: 
        # Some specific way of getting the countries load
        return get_load(cc)
    without resorting to this:
    Copy code
    @tag(cc='BE')
    def be_load():
    return get_load('BE')
    
    @tag(cc='NL')
    def nl_load():
    return get_load('NL')
    
    ...
    Is something like this already possible? I appreciate the help. Cheers Dries
    s
    • 2
    • 4
  • v

    Vladyslav Bondarenko

    08/19/2025, 10:43 AM
    Hello Trying out caching functionality with a simple set up we have so far and noticing that it might not be working as I would expect We split out the result of
    legacy_cashflow_computation
    into 3 sub results using
    @extract_fields
    , it looks like all of them are being picked up from cache correctly, however it seems like we still re-run
    legacy_cashflow_computation
    I understand that the reason we are re-running right now is because we have not handled
    pd.DataFrame
    and
    pydantic.BaseModel
    hashing properly using
    fingerprinting
    yet However, I would have expected it to understand that given we picked up all 3 outputs of a node from cache, it should stop traversing the graph further. Am I understanding the intended behaviour incorrectly or could this be a bug? Appreciate any help on this!
    s
    t
    • 3
    • 4
  • z

    Zhenya Kovalyov

    09/04/2025, 12:57 PM
    hi - quick question - i have
    @save_to.parquet(...)
    sprinkled around my pipelines, and in order to trigger the saving routine, i need to explicitly call out
    save.<name>
    in
    final_vars
    in
    driver.execute
    question - is there a way i can modify
    my driver.execute
    call, so that all vars that are decorated with
    save_to
    get persisted? thank you very much in advance!
    t
    e
    • 3
    • 3
  • e

    Erin Gunn

    09/07/2025, 9:40 PM
    The results of executing a dataflow is returning one of my dataframes in the dataset keyed in the results of the driver as 'function_name.column1', 'function_name.columns2' etc. I expect the dataframe to be in the results as simply 'function_name'. When I manually execute the function, I do in fact retrieve a single dataframe. The first screenshot shows the expected results of the function which is a single dataframe with the columns in tact. The second screenshot shows the dataframe has been broken out in the results and each column is now a series. Any ideas on what I'm doing wrong?
    s
    t
    • 3
    • 5
  • g

    Gavin Kistner

    09/15/2025, 7:21 PM
    Hello! I have a final_var node that I want to include multiple times with different aliases for the output based on different input values, determined at config time. Roughly:
    Copy code
    def foo(some_node: SomeNode, foo_cfg: FooConfig) -> int:
       …
    
    # This is procedurally created with dynamic names at runtime
    foo_cfgs = {"a": FooConfig(…), "b": FooConfig(…)}
    dr = driver.Builder().with_config({"foo_cfgs": foo_cfgs}).build()
    print(dr.execute(final_vars=[*foo_cfgs.keys()]))
    #=> {"a":42, "b":17}
    I have a working approach (details in thread) using
    @resolve
    and
    @parameterize
    , but I'm wondering if there's a better way to do this using pipelines or subdags or something. Basically: What's the Hamilton way (or ways) to accomplish the equivalent of this procedural code:
    Copy code
    for name, setup in some_input.items():
        final_result[name] = some_node(…other_attrs_from_dag…, some_attr=setup)
    t
    s
    • 3
    • 11