Slackbot
07/25/2023, 2:40 PMStefan Krawczyk
07/25/2023, 2:57 PMStefan Krawczyk
07/25/2023, 2:58 PMStefan Krawczyk
07/25/2023, 3:32 PMArthur Andres
07/25/2023, 3:50 PMwhat’s the use case?1. We have existing code that we want to run in a dag, but we can’t rename the functions or the argument. I guess we could just write a python module that calls this code directly for a POC. 2. We would like to do iterate over different dimension in our dag. For example, add different similar sources (say A, B, C), apply the same pipe line to these sources, but with different arguments (say
{"A": 1.0, "B": -1.0, "C": 2.0}
and then concatenate these 3 table. Writing that in a module is possible again, but hard to maintain and evolve dynamically.Stefan Krawczyk
07/25/2023, 3:54 PMArthur Andres
07/25/2023, 4:25 PMArthur Andres
07/25/2023, 5:18 PMimport dataclasses
import datetime
from typing import Optional
import pandas as pd
import pyarrow.parquet as pq
@dataclasses.dataclass(frozen=True)
class LoadTable:
table_name: str
columns: Optional[list[str]]
def __call__(self, dates: list[datetime.date], filters: list) -> pd.DataFrame:
files = [
f"{self.table_name}/{date:%Y-%m-%d}.parquet"
for date in dates
]
return pq.ParquetDataset(files, filters=filters).read(columns=self.columns).to_pandas()
@dataclasses.dataclass(frozen=True)
class MergeAsOf:
on: str
by: str
tolerance: Optional[pd.Timedelta]
def __call__(self, left, right):
pd.merge_asof(
left, right,
on=self.on,
by=<http://self.by|self.by>,
tolerance=self.tolerance,
allow_exact_matches=True
)
dag = Dag()
dates = dag.add_input("dates")
pokemon_names = dag.add_input("pokemon_names")
feature_1 = dag.add_node(LoadTable("pokemon-colour-features", ['red', "green", "blue"])).map(
dates, pokemon_names
)
feature_2 = dag.add_node(LoadTable("pokemon-power-features", ['water_power', "fire_power"])).map(
dates, pokemon_names
)
both_features = dag.add_node(MergeAsOf(on="timestamp", by="pokemon_name"))(
feature_1, feature_2
)
dag.set_output(both_features)
features = dag.execute(
dates=pd.date_range(pd.to_datetime("2023-01-01"), pd.to_datetime("2023-07-24")),
pokemon_names=(["fluffington", "pikapkia"])
)
Stefan Krawczyk
07/25/2023, 7:38 PM@resolve
but I’d need to try to write the code to be sure.
… otherwise putting it in a module would require a lot of repetitive work.To ask the question — how dynamic are things really going to be? Do you have 10s, 100s, or 1000s of features? and how often would they be changing? We find that optimizing for code readability actually makes iteration faster long term. Does that make sense? Otherwise without providing a code snippet to the above I don’t have a strong case for what to do, but just thought I would mention the verbosity is good perspective.
Elijah Ben Izzy
07/25/2023, 8:26 PM@parameterize
could help out to build out the set of parameterizations you need.
Looking at (2), just to be clear, you could model the above as follows:
def pokemon_power_features(power_columns: List[str] = ("red", "green", "blue")) -> pd.DataFrame:
# Call out to your helper class above/define a custom data loader
def pokemon_colour_features(color_columns: List[str] = ("water_power", "fire_power")) -> pd.DataFrame:
# Call out to your helper class above/define a custom data loader
def pokemon_features(
pokemon_power_features: pd.DataFrame,
pokemon_color_features: pd.DataFrame,
dates: pd.DateTimeIndex,
pokemon_names: List[str]
) -> pd.DataFrame:
return ...
# ... in your driver script
dr = driver.Driver({}, module)
inputs = {
"dates" : pd.date_range(pd.to_datetime("2023-01-01"), pd.to_datetime("2023-07-24")),
"pokemon_names": (["fluffington", "pikapkia"])
}
df = dr.execute(["pokemon_features"], inputs=inputs)
Note you could do a few things to make this more dynamic/clever:
1. join more feature tables together — have the possibility for them to by empty which means you’ll just skip it (allowing you to just inject the features you need)
2. Use @inject
with @parameterize
to make this more configuration/constant-driven
3. Use @resolve
to make it more configuration-driven, but I think that hits a readability trade-off
What you might want is to have your base operations in Hamilton then have your more complex operations downstream in an API that allows you to do the last few things dynamically…
All this said, you also could use the internal Hamilton Node
API to build exactly what you had above — happy to demo how you could do that pending whether what I just showed works or not 🙂 Hamilton is separated very cleanly between DAG definition and DAG execution, which means that you could take advantage of visualization, lineage, execution, etc… if you injected it in. This is not currently a first-class/supported feature, but definitely something we could think about.Arthur Andres
07/26/2023, 8:59 AMArthur Andres
07/26/2023, 9:00 AMdef get_mean(data, frequency: str):
return (
data
.assign(date= lambda x: x.ceil(pd.to_timedelta(frequency)))
.groupby("date")
[["price"]].mean()
.assign(frequency=frequency)
)
source_data = dag.add_node(load_data)
means = []
look_backs = ["1d", "1w", "30d"]
for look_back in look_backs:
look_back_node = dag.const(look_back)
mean_by_date = dag.add_node(get_mean).map(source_data, look_back_node)
means.append(mean_by_date)
dag.add_node(lambda *x: pd.concat(*x)).map(*means)
Arthur Andres
07/26/2023, 9:05 AMNode
. Another option would be possibly to assign callable to global variable. Would they behave the same as def
?
@dataclasses.dataclass(frozen=True)
class DataLoader:
table_name: str
def __call__(self, dates):
"""do your thing..."""
color_loader = DataLoader("color-features")
power_features = DataLoader("power-features")
Alternatively is there a way to add nodes, as functions, one by one to the dag (rather than load them from a module).
So for example:
driver = Driver({}, custom_functions={"color_features": DataLoader("color-features") })
Arthur Andres
07/26/2023, 9:07 AMElijah Ben Izzy
07/26/2023, 4:06 PMArthur Andres
07/26/2023, 5:04 PMArthur Andres
07/26/2023, 5:04 PMElijah Ben Izzy
07/26/2023, 5:04 PMArthur Andres
07/26/2023, 5:04 PMElijah Ben Izzy
07/26/2023, 5:04 PMElijah Ben Izzy
07/26/2023, 5:49 PMdef source_data(url: str) -> pd.Series:
"""You can also use load_from... and define a custom data loader for extra readability/metadata. Or parameterize."""
return _load_data(url)[COLUMN] # this could also return a dataframe
LOOKBACKS = ["1d", "1w", "30d"]
@parameterize(
{f"mean_{lookback}" : {"lookback" : value(lookback)} for lookback in LOOKBACKS}
)
def mean_lookback_n_days(data: pd.Series, lookback: str) -> pd.Series:
return ...
@inject(lookbacks=group(*[f"mean_{lookback}" for lookback in LOOKBACKS]))
def data_with_mean_lookbacks(lookbacks: List[pd.Series]) -> pd.DataFrame:
return _join_logic(lookbacks)
Note that you also have alternatives:
1. You could pass around dataframes and have lookbacks
be a runtime parameter — just run the operation over the entire dataframe. No inject
or @parameterize
— you can even take in default arguments/send in overrides.
2. If you have highly complex operations for each lookback
, you could wrap it in a subdag
3. If you want to make the above configuration-driven, we have resolve
def source_data(url: str) -> pd.Series:
"""You can also use load_from... and define a custom data loader for extra readability/metadata. Or parameterize."""
return _load_data(url)[COLUMN] # this could also return a dataframe
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
# config now expects a lookbacks parameter that's a list of ints
decorate_with=lambda lookbacks: parameterize(
{f"mean_{lookback}" : {"lookback" : value(lookback)} for lookback in lookbacks}
)
)
def mean_lookback_n_days(data: pd.Series, lookback: str) -> pd.Series:
return ...
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda lookbacks: inject(lookbacks=group(*[f"mean_{lookback}" for lookback in LOOKBACKS]))
)
def data_with_mean_lookbacks(lookbacks: List[pd.Series]) -> pd.DataFrame:
return _join_logic(lookbacks)
The API is a little ugly, but its easy to wrap this in something for your use-case, specifically:
@repeat_for_lookbacks(pattern="mean_{lookback}")
@inject_lookbacks(pattern="mean_{lookback}")
These could just call out to the above if you use them a lot. Implementation left as an exercise to the reader 🙂 IMO you want to avoid this as much as possible — the overall philosophy is modeling in code is easier than configuration-driven, with a library like Hamilton that allows it to be structured. Config-driven stuff can be injected as you go along to improve flexibility.Arthur Andres
07/28/2023, 7:23 AMimport sys
from hamilton.driver import Driver
from hamilton.function_modifiers import parameterize, value, inject, group, source
import pandas as pd
LOOKBACKS = ["1d", "1w", "30d"]
def source_data() -> pd.Series:
return pd.Series(
pd.to_datetime([
"2021-02-21T20:26:00Z",
"2021-02-22T20:26:00Z",
"2021-02-23T20:26:00Z",
"2021-02-24T20:26:00Z"
])
)
@parameterize(
**{f"mean_{lookback}" : {"lookback" : value(lookback)} for lookback in LOOKBACKS}
)
def mean_lookback_n_days(source_data: pd.Series, lookback: str) -> pd.Series:
return (source_data - pd.to_timedelta(lookback)).rename(lookback)
@inject(lookbacks=group(*[source(f"mean_{lookback}") for lookback in LOOKBACKS]))
def data_with_mean_lookbacks(lookbacks: list[pd.Series]) -> pd.DataFrame:
return pd.concat(lookbacks, axis=1)
driver = Driver({}, sys.modules[__name__])
data = driver.execute(["data_with_mean_lookbacks"])
print(data.to_markdown(index=False))
and:
import sys
import pandas as pd
from hamilton.driver import Driver
from hamilton.function_modifiers import (
ResolveAt,
group,
inject,
parameterize,
resolve,
source,
value,
)
from hamilton.settings import ENABLE_POWER_USER_MODE
def source_data() -> pd.Series:
return pd.Series(
pd.to_datetime(
[
"2021-02-21T20:26:00Z",
"2021-02-22T20:26:00Z",
"2021-02-23T20:26:00Z",
"2021-02-24T20:26:00Z",
]
)
)
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda lookbacks: parameterize(
**{f"mean_{lookback}": {"lookback": value(lookback)} for lookback in lookbacks}
),
)
def mean_lookback_n_days(source_data: pd.Series, lookback: str) -> pd.Series:
return (source_data - pd.to_timedelta(lookback)).rename(lookback)
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda lookbacks: inject(
lookbacks=group(*[source(f"mean_{lookback}") for lookback in lookbacks])
),
)
def data_with_mean_lookbacks(lookbacks: list[pd.Series]) -> pd.DataFrame:
return pd.concat(lookbacks, axis=1)
driver = Driver(
{"lookbacks": ["1d", "1m"], ENABLE_POWER_USER_MODE: True}, sys.modules[__name__]
)
data = driver.execute(["data_with_mean_lookbacks"])
print(data.to_markdown(index=False))
I guess these decorator (in particular resolve) take a bit of time to get used to.Arthur Andres
07/28/2023, 7:32 AMYou could pass around dataframes and haveIt is much simpler this way…be a runtime parameter — just run the operation over the entire dataframe. Nolookbacks
orinject
— you can even take in default arguments/send in overrides.@parameterize
import sys
import pandas as pd
from hamilton.driver import Driver
def source_data() -> pd.Series:
return pd.Series(
pd.to_datetime(
[
"2021-02-21T20:26:00Z",
"2021-02-22T20:26:00Z",
"2021-02-23T20:26:00Z",
"2021-02-24T20:26:00Z",
]
)
)
def data_with_lookbacks(source_data: pd.Series, lookbacks: list[str]) -> pd.DataFrame:
return pd.concat(
[
(source_data - pd.to_timedelta(lookback)).rename(lookback)
for lookback in lookbacks
],
axis=1,
)
driver = Driver({}, sys.modules[__name__])
data = driver.execute(
["data_with_lookbacks"], inputs={"lookbacks": ["1d", "1w", "30d"]}
)
print(data.to_markdown(index=False))
Elijah Ben Izzy
07/28/2023, 3:35 PMresolve
+ extract_columns
if you wanted it to show up in the DAG, but I think the simplicity of the second one is pretty easy.
One other thing — you could wrap the decorators to build syntactic sugar/macro for the lookbacks. E.G.
@repeat_for_lookbacks(pattern="mean_{lookback}")
@inject_lookbacks(pattern="mean_{lookback}")
Up to you!