https://github.com/stitchfix/hamilton logo
Join Slack
Powered by
# hamilton-help
  • m

    Mattias Fornander US

    04/24/2025, 10:22 PM
    Any way to control input or output types such that I can use ABCs such as Mapping? I'd like to not collapse my ChainMap into a dict, but right now the Builder can't link up one node that returns a Mapping (on top of ChainMap) into another node that requests a Mapping. I realize the automatic discovery may be intractable or ambiguous. Any ideas in this direction?
    s
    e
    • 3
    • 7
  • s

    Seth Stokes

    05/07/2025, 8:07 PM
    Hello, should I expect something like this to be allowed?
    Copy code
    # subdag.py
    
    def mapping() -> pd.DataFrame:
        ...
    
    
    @subdag(
        prepare_src1, prepare_src2, prepare_src3,
        inputs={
            "directory": source("directory"),
            "pattern_src1": source("pattern_src1"),
            "pattern_src2": source("pattern_src2"),
            "pattern_src3": source("pattern_src3"),
        },
        external_inputs=source("mapping")
    )
    def merge_and_stuff(
        prepared_src1: pd.DataFrame,
        prepared_src2: pd.DataFrame,
        prepared_src3: pd.DataFrame,
        mapping: pd.DataFrame
    ) -> pd.DataFrame:
        ...
    Error: TypeError: argument of type 'UpstreamDependency' is not iterable
    s
    • 2
    • 2
  • s

    Seth Stokes

    05/15/2025, 11:02 PM
    Am I allowed to use
    overrides
    on the
    raw_df.raw
    node? I am currently coming across an issue where the
    step(_prepare)
    is failing because it is depending on the mapping done in
    raw_df
    . Since I am using
    pipe_output
    I am thinking that the body of
    raw_df
    should have been executed (and executed before) since
    raw_df.raw
    is one step above.
    Copy code
    @pipe_output(
        step(_prepare_data)
    )
    def raw_df(path: Path) -> pd.DataFrame:
        # some mapping
        ...
        
    
    def prep_df(raw_prev_643: pd.DataFrame) -> pd.DataFrame:
        ...
    
    
    results: dict = dr.execute(["prep_df"], inputs={}, overrides={
        "path": "path/to/file",
        "raw_df.raw": raw_df, # loaded outside of dag
        }
    )
    Slack Conversation
    s
    • 2
    • 2
  • m

    Mark Wotton

    05/30/2025, 9:19 AM
    is there a way to return a lazy generator of values from a node? i have a process that is going to need some indeterminate amount of data, but i won't know how much until it either succeeds or fails. i can sort of see how i could just set a static number of elements to generate as part of the hamilton input variables, and just increment it upwards every time i throw NotEnoughData or something similar, but is there an idiomatic way to do this?
    s
    • 2
    • 4
  • m

    Mark Wotton

    05/30/2025, 9:21 AM
    (i can see why it is unreasonable to return a generator from a node, it's not very cacheable)
  • m

    Mauricio

    06/16/2025, 7:41 PM
    hello, is it possible to reference a
    to
    materialization to execute a function after it happens?
    Copy code
    # run.py
    materializers = [to.csv(id="save_file", dependencies=["file_modify"], path="./out.csv")]
    ...driver build and execution
    
    # pipeline.py
    def load_saved_file(save_file, remote_path):
        fs.put(save_file, remote_path)
    When I try to reference like this I get
    Duplicate node names found
    Any advice?
    s
    • 2
    • 18
  • l

    Louwrens

    06/20/2025, 1:42 PM
    Hello 👋 I was wondering if we can make caching work together with graceful error adapter?
    s
    • 2
    • 12
  • s

    SAURABH SINGH

    07/19/2025, 8:18 AM
    Hi team, I'm trying to use Wren AI, and instead of an Open AI key, I was wondering if I can use ollama and use a local model. However, I keep getting this error, when wren-ai-service is starting, can anyone help me out AttributeError: 'NoneType' object has no attribute 'get_generator' ERROR: Application startup failed. Exiting.
    s
    m
    • 3
    • 3
  • n

    Nic Pottier

    07/21/2025, 5:17 PM
    Is it acceptable to start an
    asyncio.run()
    loop within a Hamilton node to parallelize some work in that node? It seems to work, but I'm curious if there are gotchas.
    s
    e
    • 3
    • 39
  • h

    Hassan S

    07/22/2025, 7:37 AM
    how can i manage and parse my configuration file into my dag builder, especially if I have a nested config file?
  • h

    Hassan S

    07/22/2025, 7:40 AM
    Is it a good practice to have the config dictionary as a parameter to my function code? I was thinking to keep my functions not aware of the configuration
    🙌 1
    s
    • 2
    • 2
  • h

    Hassan S

    07/22/2025, 7:50 AM
    do I have to flat my config? what do you do it usually?
  • n

    Nic Pottier

    07/22/2025, 2:40 PM
    I use OmegaConf for config and parse in in my main then pass that as the root node value for a node named
    config
    .. I then have sub config nodes that take
    config
    as an input which are then used downstream. That makes sure those downstream nodes only reevaluate if the parts of the config that matter to them change.
    👍 4
    t
    • 2
    • 1
  • h

    Hassan S

    07/23/2025, 2:15 PM
    I have a question: let's say I am using pandas to organize the data wrangling in a Hamilton's dag. and let's say I have a group of features that share the same theme: time features or financial features. What I want to know is how to organize these features in the dag in a way I can turn on or off some of them and then combine them to the main dataframe later on in the dataframe. How should I handle this situation? what's the best practice here? please let me know if you need more details, thanks
    s
    • 2
    • 7
  • j

    Jannik Buhr

    07/25/2025, 12:58 PM
    Hi there, I'm using Hamilton's caching in conjunction with
    Parallelizable
    . This leads to unexpectedly long execute times, even though the data was in fact restored from the cache. I suspect this is due to the number of nodes that are being checked. Essentially, I have a parallel Node that for a list of directories goes into the directory and processes some data to return a DataFrame. All DataFrames then get collected into one big DataFrame in the end. What are your recommendations? Is there a way for me to give hamilton more information about the data such that it can speed up cache lookups? And is there the opposite of
    @cache(behavior="recompute")
    to tell hamilton that something is assumed to be up-to-date? Doing
    Copy code
    id = dr.cache.last_run_id
    data_version = dr.cache.data_versions[id]['all_frames']
    result_store.get(data_version)
    is an option, but it would still be nice to speed up
    dr.execute
    when the cache is up-to-date.
    s
    t
    • 3
    • 5
  • h

    Hassan S

    07/28/2025, 12:14 PM
    Hi everyone, how can I manage reading and writing parts of my DAG from/to database. Let me give you the context here: I am building a DAG that has two main abstract functionalities: 1) training a model and updating this model, and I am saving the model's weights unstructuredly into a database, now I want to my DAG to read the model weights from the updates and update it using the incoming new data, how can I architect my DAG is a good way to accomplish that?
    s
    • 2
    • 5
  • h

    Hassan S

    07/31/2025, 9:24 AM
    Hello there, What's the best practice to collect a bunch of pd.Series to construct a pandas dataframe, shall I explicitly specify the input pd.Series nodes in the parameter list and use contact to collect them, can I use instead **kwargs?
    s
    s
    • 3
    • 2
  • h

    Hassan S

    07/31/2025, 9:37 AM
    if I prepared my Machine Learning features separatly and I wanna combine them now for modelling, is it a good practice or might be needed that I should configure the selected features from a yaml config so i can control turning on and off the features ?
    s
    • 2
    • 2
  • f

    Fabian Preiß

    08/04/2025, 12:33 PM
    Hello, I was looking for a Data Loader, that allows to load a variable number of datasets based on a string template similar to the kedro dataset factories but couldn't find a direct equivalent in the documentation. Is there a suggested way to achieve this in Hamilton apart from defining the search path as one of the inputs of the dataflow apart from adjusting the examples given in the loading data documentation? I have seen that you can reuse a Kedro DataCatalog, but it doesn't look like the right approach to me to first setup some Kedro code just to pass it to Hamilton.
    t
    e
    • 3
    • 7
  • h

    Hassan S

    08/07/2025, 10:49 PM
    Hello everyone I want to use the same DAG for both training/updating the model and for inference. During inference, the DAG should load all learned parameters and artifacts from the database, do prediction and output it. During training, the DAG should also load the artifacts if they exist, and do training or updating it and then save it to the database. What's the best practice to architect the DAG in order to use it for these two purposes. Thanks a lot. Note: I know that the two purposes are going share some nodes and will be different in the downstream nodes, but at least I am looking for a clean way to utilize the starting dag because as you know the training and inference pipelines share the same nodes at the beginning and then they branch out
    s
    • 2
    • 1
  • d

    Dries Hugaerts

    08/16/2025, 10:38 AM
    Hello, I'm looking into using tags into my work flows. I use parameterize quite a lot, and I was wondering if there was a way of combining them (with different tags for the different nodes). For example, I'd like to do something like this:
    Copy code
    COUNTRY_CODES = ['BE', 'NL', 'FR', 'DE', ...]
    load_parameterization = { f"{cc.lower()}_load": dict(cc=value(cc)) for cc in COUNTRY_CODES }
    
    @parameterize(**load_parameterization) 
    def country_load(cc: str) -> pd.Series: 
        # Some specific way of getting the countries load
        return get_load(cc)
    without resorting to this:
    Copy code
    @tag(cc='BE')
    def be_load():
    return get_load('BE')
    
    @tag(cc='NL')
    def nl_load():
    return get_load('NL')
    
    ...
    Is something like this already possible? I appreciate the help. Cheers Dries
    s
    • 2
    • 4
  • v

    Vladyslav Bondarenko

    08/19/2025, 10:43 AM
    Hello Trying out caching functionality with a simple set up we have so far and noticing that it might not be working as I would expect We split out the result of
    legacy_cashflow_computation
    into 3 sub results using
    @extract_fields
    , it looks like all of them are being picked up from cache correctly, however it seems like we still re-run
    legacy_cashflow_computation
    I understand that the reason we are re-running right now is because we have not handled
    pd.DataFrame
    and
    pydantic.BaseModel
    hashing properly using
    fingerprinting
    yet However, I would have expected it to understand that given we picked up all 3 outputs of a node from cache, it should stop traversing the graph further. Am I understanding the intended behaviour incorrectly or could this be a bug? Appreciate any help on this!
    s
    t
    • 3
    • 4
  • z

    Zhenya Kovalyov

    09/04/2025, 12:57 PM
    hi - quick question - i have
    @save_to.parquet(...)
    sprinkled around my pipelines, and in order to trigger the saving routine, i need to explicitly call out
    save.<name>
    in
    final_vars
    in
    driver.execute
    question - is there a way i can modify
    my driver.execute
    call, so that all vars that are decorated with
    save_to
    get persisted? thank you very much in advance!
    t
    e
    • 3
    • 3
  • e

    Erin Gunn

    09/07/2025, 9:40 PM
    The results of executing a dataflow is returning one of my dataframes in the dataset keyed in the results of the driver as 'function_name.column1', 'function_name.columns2' etc. I expect the dataframe to be in the results as simply 'function_name'. When I manually execute the function, I do in fact retrieve a single dataframe. The first screenshot shows the expected results of the function which is a single dataframe with the columns in tact. The second screenshot shows the dataframe has been broken out in the results and each column is now a series. Any ideas on what I'm doing wrong?
    s
    t
    • 3
    • 5
  • g

    Gavin Kistner

    09/15/2025, 7:21 PM
    Hello! I have a final_var node that I want to include multiple times with different aliases for the output based on different input values, determined at config time. Roughly:
    Copy code
    def foo(some_node: SomeNode, foo_cfg: FooConfig) -> int:
       …
    
    # This is procedurally created with dynamic names at runtime
    foo_cfgs = {"a": FooConfig(…), "b": FooConfig(…)}
    dr = driver.Builder().with_config({"foo_cfgs": foo_cfgs}).build()
    print(dr.execute(final_vars=[*foo_cfgs.keys()]))
    #=> {"a":42, "b":17}
    I have a working approach (details in thread) using
    @resolve
    and
    @parameterize
    , but I'm wondering if there's a better way to do this using pipelines or subdags or something. Basically: What's the Hamilton way (or ways) to accomplish the equivalent of this procedural code:
    Copy code
    for name, setup in some_input.items():
        final_result[name] = some_node(…other_attrs_from_dag…, some_attr=setup)
    t
    s
    • 3
    • 12
  • e

    Eric Riddoch

    10/03/2025, 7:38 PM
    Hello my friends! Just joined slack. I’m working on writing an AWS SageMaker course. Is there an example anywhere of Hamilton with SageMaker pipelines?
    🙌 1
    e
    • 2
    • 2
  • r

    Roi Jacob Olfindo

    10/08/2025, 7:10 AM
    Hi, sorry for the stupid question but I keep encountering this error:
    Copy code
    >>> from hamilton.function_modifiers import unpack_fields
    ImportError: cannot import name 'unpack_fields' from 'hamilton.function_modifiers' (/root/venv/lib/python3.10/site-packages/hamilton/function_modifiers/__init__.py)
    What would be the alternatives to this so that I could extract something from a tuple?
    s
    c
    • 3
    • 3
  • a

    Abhilash Singhal

    10/09/2025, 10:41 AM
    Is there a way to integrate hamilton-ui directly as a django app into an existing django application ? Has anyone integrated it successfully ? Context: We are building a dcoument ingestion product for one of our client. The core product is a django application. We want to build out data pipelines in hamilton and want to use hamilton-ui to leverage the observability. Adding multiple containers just to run hamilton ui will increase the compute cost for us, which is a constraint at this point in time. If hamilton-ui can be run in an integrated manner directly within the existing django application, then it will make a lot of sense to use in this project.
    e
    • 2
    • 1
  • s

    Seth Stokes

    10/13/2025, 4:13 PM
    Morning, could you please advise if I could be defining this while loop for
    Parallelizable
    incorrectly?
    Copy code
    def raw_paginated_results(api: API, query: str) -> Parallelizable[dict]:
        """Paginate the results from the API.
        a `nextPageToken` indicates more pages must be fetch.
        Previously, in v2, split into batches from `total` `startAt` and `maxResults`.
        """
        next_page, init_result = api.search(query, max_results=50)
        yield init_result
    
        while next_page:
            next_page, result = api.search(query, max_results=50, params={"nextPageToken": next_page})
            if result:
                yield result
    When I collect this I am getting a
    List[List[dict]]
    (where the outer list is just
    len
    of 1) instead of a
    List[dict]
    . Could this have something to do with the two
    yield
    statements?
    s
    • 2
    • 2
  • p

    Paul D

    10/14/2025, 2:32 PM
    Hello, I have a simple question but cannot find easily the clean way to do it in the doc. How to rename a node to not have the function node. In particular, I'd like to have a space in the node name. Code would be like
    Copy code
    @rename("My Feature")
    def my_feature(...) -> ...:
        return this_feature
    And then node is named My Feature in the DAG, and I can further access it with
    source("My Feature")
    if needed.
    s
    s
    +2
    • 5
    • 10