This message was deleted.
# hamilton-help
s
This message was deleted.
💡 1
s
Interesting. What’s the use case? We haven’t exposed that functionality in a public facing API.
But happy to consider doing something more formal; otherwise if you’re just prototyping to prove something out, I can show you what you’d need to augment.
It's just one function that you'd need to implement.
a
what’s the use case?
1. We have existing code that we want to run in a dag, but we can’t rename the functions or the argument. I guess we could just write a python module that calls this code directly for a POC. 2. We would like to do iterate over different dimension in our dag. For example, add different similar sources (say A, B, C), apply the same pipe line to these sources, but with different arguments (say
{"A": 1.0, "B": -1.0, "C": 2.0}
and then concatenate these 3 table. Writing that in a module is possible again, but hard to maintain and evolve dynamically.
s
Cool. I have to run to drop off my kid ( morning here in California) but will be back later (in 30 mins) Happy to jump on a call if you're still around, else if you have some code you could share/sketch out that would help ground the conversation.
a
let me get back to you
So basically, I’m building a features store. When loading features I have a lot of pre canned operations that I use over and over again: • load a table • do a merg as of Each operation needs input that are either derived from previous operation or just provided when loading the feautres (eg: which dates we load the data for, which entity we’re interested in). Because I reuse these operation a lot (eg MergeAsOf), I need to be able to build the dag dynamically, otherwise putting it in a module would require a lot of repetitive work.
Copy code
import dataclasses
import datetime
from typing import Optional

import pandas as pd
import pyarrow.parquet as pq

@dataclasses.dataclass(frozen=True)
class LoadTable:
    table_name: str
    columns: Optional[list[str]]

    def __call__(self, dates: list[datetime.date], filters: list) -> pd.DataFrame:
        files = [
            f"{self.table_name}/{date:%Y-%m-%d}.parquet"
            for date in dates
        ]
        return pq.ParquetDataset(files, filters=filters).read(columns=self.columns).to_pandas()


@dataclasses.dataclass(frozen=True)
class MergeAsOf:
    on: str
    by: str
    tolerance: Optional[pd.Timedelta]

    def __call__(self, left, right):
        pd.merge_asof(
            left, right,
            on=self.on,
            by=<http://self.by|self.by>,
            tolerance=self.tolerance,
            allow_exact_matches=True
        )


dag = Dag()
dates = dag.add_input("dates")
pokemon_names = dag.add_input("pokemon_names")
feature_1 = dag.add_node(LoadTable("pokemon-colour-features", ['red', "green", "blue"])).map(
    dates, pokemon_names
)
feature_2 = dag.add_node(LoadTable("pokemon-power-features", ['water_power', "fire_power"])).map(
    dates, pokemon_names
)
both_features = dag.add_node(MergeAsOf(on="timestamp", by="pokemon_name"))(
    feature_1, feature_2
)

dag.set_output(both_features)


features = dag.execute(
    dates=pd.date_range(pd.to_datetime("2023-01-01"), pd.to_datetime("2023-07-24")),
    pokemon_names=(["fluffington", "pikapkia"])
)
s
Interesting. Thanks for the code sample. I think I understand. Either @Elijah Ben Izzy or myself will take a quick stab to see what we can do with Hamilton today. This could work with
@resolve
but I’d need to try to write the code to be sure.
… otherwise putting it in a module would require a lot of repetitive work.
To ask the question — how dynamic are things really going to be? Do you have 10s, 100s, or 1000s of features? and how often would they be changing? We find that optimizing for code readability actually makes iteration faster long term. Does that make sense? Otherwise without providing a code snippet to the above I don’t have a strong case for what to do, but just thought I would mention the verbosity is good perspective.
e
So yeah, to echo @Stefan Krawczyk I think the big Q is how dynamic it is — E.G. what do you want to expose the user to? A few possibilities I see: 1. If the user is always building different joins/exploring features, and nothing is static, you’re right that the Hamilton API is wrong for you — its not dynamic enough (and isn’t meant to be) 2. If you have a set of features that you want to expose (and maybe want to parameterize the join dates), or the shapes of the transformation DAGs fit a few common patterns you could pretty easily model this statically using Hamilton. Even if you have loads of them, using
@parameterize
could help out to build out the set of parameterizations you need. Looking at (2), just to be clear, you could model the above as follows:
Copy code
def pokemon_power_features(power_columns: List[str] = ("red", "green", "blue")) -> pd.DataFrame: 
    # Call out to your helper class above/define a custom data loader

def pokemon_colour_features(color_columns: List[str] = ("water_power", "fire_power")) -> pd.DataFrame:
    # Call out to your helper class above/define a custom data loader

def pokemon_features(
    pokemon_power_features: pd.DataFrame, 
    pokemon_color_features: pd.DataFrame,
    dates: pd.DateTimeIndex,
    pokemon_names: List[str]
) -> pd.DataFrame: 
    return ...

# ... in your driver script

dr = driver.Driver({}, module)
inputs = {
    "dates" : pd.date_range(pd.to_datetime("2023-01-01"), pd.to_datetime("2023-07-24")),
    "pokemon_names": (["fluffington", "pikapkia"])
}
df = dr.execute(["pokemon_features"], inputs=inputs)
Note you could do a few things to make this more dynamic/clever: 1. join more feature tables together — have the possibility for them to by empty which means you’ll just skip it (allowing you to just inject the features you need) 2. Use
@inject
with
@parameterize
to make this more configuration/constant-driven 3. Use
@resolve
to make it more configuration-driven, but I think that hits a readability trade-off What you might want is to have your base operations in Hamilton then have your more complex operations downstream in an API that allows you to do the last few things dynamically… All this said, you also could use the internal Hamilton
Node
API to build exactly what you had above — happy to demo how you could do that pending whether what I just showed works or not 🙂 Hamilton is separated very cleanly between DAG definition and DAG execution, which means that you could take advantage of visualization, lineage, execution, etc… if you injected it in. This is not currently a first-class/supported feature, but definitely something we could think about.
👍 1
a
OK thanks for getting back to me.
regarding parameterize / inject, I think it would be helpful to do something where I fork the dag across a dimension and merge it back. See the example below. I had a look at the doc but could not find a simple example that uses both parameterize and inject to do that.
Copy code
def get_mean(data, frequency: str):
    return (
        data
        .assign(date= lambda x: x.ceil(pd.to_timedelta(frequency)))
        .groupby("date")
        [["price"]].mean()
        .assign(frequency=frequency)
    )


source_data = dag.add_node(load_data)
means = []
look_backs = ["1d", "1w", "30d"]
for look_back in look_backs:
    look_back_node = dag.const(look_back)
    mean_by_date = dag.add_node(get_mean).map(source_data, look_back_node)
    means.append(mean_by_date)

dag.add_node(lambda *x: pd.concat(*x)).map(*means)
Regarding having a dynamic API, I guess one option is to use the low level / internal
Node
. Another option would be possibly to assign callable to global variable. Would they behave the same as
def
?
Copy code
@dataclasses.dataclass(frozen=True)
class DataLoader:
    table_name: str

    def __call__(self, dates):
        """do your thing..."""


color_loader = DataLoader("color-features")
power_features = DataLoader("power-features")
Alternatively is there a way to add nodes, as functions, one by one to the dag (rather than load them from a module). So for example:
Copy code
driver = Driver({}, custom_functions={"color_features": DataLoader("color-features") })
I’m happy to go on a call, I’m UK based so my afternoon, your morning should work.
e
Hey! Need to read over again to fully grok what you’re suggesting, but agreed — might be easiest to hop on a call. Around now if its not too late, otherwise I’ll be up and about pretty early tomorrow morning.
a
I can have a call now, but it will have to be quick.
available tomorrow otherwise.
e
I have a few minutes too — we can always pick it up later again too
a
ok
e
Some demos of `parameterize`/`inject` — there are a few ways to do this, but this is how I’d represent the task above:
Copy code
def source_data(url: str) -> pd.Series:
    """You can also use load_from... and define a custom data loader for extra readability/metadata. Or parameterize."""
    return _load_data(url)[COLUMN] # this could also return a dataframe

LOOKBACKS = ["1d", "1w", "30d"]

@parameterize(
    {f"mean_{lookback}" : {"lookback" : value(lookback)} for lookback in LOOKBACKS}
)
def mean_lookback_n_days(data: pd.Series, lookback: str) -> pd.Series:
    return ...

@inject(lookbacks=group(*[f"mean_{lookback}" for lookback in LOOKBACKS]))
def data_with_mean_lookbacks(lookbacks: List[pd.Series]) -> pd.DataFrame:
    return _join_logic(lookbacks)
Note that you also have alternatives: 1. You could pass around dataframes and have
lookbacks
be a runtime parameter — just run the operation over the entire dataframe. No
inject
or
@parameterize
— you can even take in default arguments/send in overrides. 2. If you have highly complex operations for each
lookback
, you could wrap it in a subdag 3. If you want to make the above configuration-driven, we have resolve
Copy code
def source_data(url: str) -> pd.Series:
    """You can also use load_from... and define a custom data loader for extra readability/metadata. Or parameterize."""
    return _load_data(url)[COLUMN] # this could also return a dataframe


@resolve(
  when=ResolveAt.CONFIG_AVAILABLE,
  # config now expects a lookbacks parameter that's a list of ints
  decorate_with=lambda lookbacks: parameterize(
    {f"mean_{lookback}" : {"lookback" : value(lookback)} for lookback in lookbacks}
    )
)
def mean_lookback_n_days(data: pd.Series, lookback: str) -> pd.Series:
    return ...

@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda lookbacks: inject(lookbacks=group(*[f"mean_{lookback}" for lookback in LOOKBACKS]))
)
def data_with_mean_lookbacks(lookbacks: List[pd.Series]) -> pd.DataFrame:
    return _join_logic(lookbacks)
The API is a little ugly, but its easy to wrap this in something for your use-case, specifically:
Copy code
@repeat_for_lookbacks(pattern="mean_{lookback}")
@inject_lookbacks(pattern="mean_{lookback}")
These could just call out to the above if you use them a lot. Implementation left as an exercise to the reader 🙂 IMO you want to avoid this as much as possible — the overall philosophy is modeling in code is easier than configuration-driven, with a library like Hamilton that allows it to be structured. Config-driven stuff can be injected as you go along to improve flexibility.
a
Thanks, I got the example to work with a couple of modification:
Copy code
import sys

from hamilton.driver import Driver
from hamilton.function_modifiers import parameterize, value, inject, group, source
import pandas as pd

LOOKBACKS = ["1d", "1w", "30d"]

def source_data() -> pd.Series:
    return pd.Series(

        pd.to_datetime([
                "2021-02-21T20:26:00Z",
                "2021-02-22T20:26:00Z",
                "2021-02-23T20:26:00Z",
                "2021-02-24T20:26:00Z"
            ])

    )


@parameterize(
    **{f"mean_{lookback}" : {"lookback" : value(lookback)} for lookback in LOOKBACKS}
)
def mean_lookback_n_days(source_data: pd.Series, lookback: str) -> pd.Series:
    return (source_data - pd.to_timedelta(lookback)).rename(lookback)

@inject(lookbacks=group(*[source(f"mean_{lookback}") for lookback in LOOKBACKS]))
def data_with_mean_lookbacks(lookbacks: list[pd.Series]) -> pd.DataFrame:
    return pd.concat(lookbacks, axis=1)


driver = Driver({}, sys.modules[__name__])
data = driver.execute(["data_with_mean_lookbacks"])
print(data.to_markdown(index=False))
and:
Copy code
import sys

import pandas as pd
from hamilton.driver import Driver
from hamilton.function_modifiers import (
    ResolveAt,
    group,
    inject,
    parameterize,
    resolve,
    source,
    value,
)
from hamilton.settings import ENABLE_POWER_USER_MODE


def source_data() -> pd.Series:
    return pd.Series(
        pd.to_datetime(
            [
                "2021-02-21T20:26:00Z",
                "2021-02-22T20:26:00Z",
                "2021-02-23T20:26:00Z",
                "2021-02-24T20:26:00Z",
            ]
        )
    )


@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda lookbacks: parameterize(
        **{f"mean_{lookback}": {"lookback": value(lookback)} for lookback in lookbacks}
    ),
)
def mean_lookback_n_days(source_data: pd.Series, lookback: str) -> pd.Series:
    return (source_data - pd.to_timedelta(lookback)).rename(lookback)


@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda lookbacks: inject(
        lookbacks=group(*[source(f"mean_{lookback}") for lookback in lookbacks])
    ),
)
def data_with_mean_lookbacks(lookbacks: list[pd.Series]) -> pd.DataFrame:
    return pd.concat(lookbacks, axis=1)


driver = Driver(
    {"lookbacks": ["1d", "1m"], ENABLE_POWER_USER_MODE: True}, sys.modules[__name__]
)
data = driver.execute(["data_with_mean_lookbacks"])
print(data.to_markdown(index=False))
I guess these decorator (in particular resolve) take a bit of time to get used to.
You could pass around dataframes and have
lookbacks
be a runtime parameter — just run the operation over the entire dataframe. No
inject
or
@parameterize
— you can even take in default arguments/send in overrides.
It is much simpler this way…
Copy code
import sys

import pandas as pd
from hamilton.driver import Driver


def source_data() -> pd.Series:
    return pd.Series(
        pd.to_datetime(
            [
                "2021-02-21T20:26:00Z",
                "2021-02-22T20:26:00Z",
                "2021-02-23T20:26:00Z",
                "2021-02-24T20:26:00Z",
            ]
        )
    )


def data_with_lookbacks(source_data: pd.Series, lookbacks: list[str]) -> pd.DataFrame:
    return pd.concat(
        [
            (source_data - pd.to_timedelta(lookback)).rename(lookback)
            for lookback in lookbacks
        ],
        axis=1,
    )


driver = Driver({}, sys.modules[__name__])
data = driver.execute(
    ["data_with_lookbacks"], inputs={"lookbacks": ["1d", "1w", "30d"]}
)
print(data.to_markdown(index=False))
e
Nice! Seems like you have a few good ways to do it 🙂 Not going to prescribe which one is the best as it all depends on your workflow, but obviuosly trade-offs to each. You could also use
resolve
+
extract_columns
if you wanted it to show up in the DAG, but I think the simplicity of the second one is pretty easy. One other thing — you could wrap the decorators to build syntactic sugar/macro for the lookbacks. E.G.
Copy code
@repeat_for_lookbacks(pattern="mean_{lookback}")
@inject_lookbacks(pattern="mean_{lookback}")
Up to you!