Pengyu Chen
03/19/2024, 7:28 PM@resolve
with the code I will be attaching in this thread. I got this error however: hamilton.function_modifiers.base.InvalidDecoratorException: Dependency: <generator object <lambda>.<locals>.<genexpr> at 0x7f90febd67b0> is not a valid dependency type for group(), must be a LiteralDependency or UpstreamDependency.
.
If I replace the group()
call with group(source("cohort_month"), source("cohort_quarter"))
the issue would be resolved; however I do not want to hardcode this. I'm still pretty new to Hamilton and wondering if I could please get some help? Thanks 🙏Pengyu Chen
03/19/2024, 7:28 PMdef cohort_month(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="M").astype(str))
def cohort_quarter(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Q").astype(str))
def cohort_year(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Y").astype(str))
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda cohorts: inject(
cohort_series=group(
source(cohort) for cohort in cohorts
)
),
)
def df_with_cohorts(df: pd.DataFrame, cohort_series: List[pd.Series]) -> pd.DataFrame:
return pd.concat([df] + cohort_series, axis=1)
config = {
settings.ENABLE_POWER_USER_MODE: True,
"cohorts": ["cohort_month", "cohort_quarter"],
}
dr = (
driver.Builder()
.with_modules(module)
.with_config(config)
.build()
)
Elijah Ben Izzy
03/19/2024, 7:36 PM@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda cohorts: inject(
cohort_series=group(
*[source(cohort) for cohort in cohorts]
)
),
)
...
Pengyu Chen
03/19/2024, 7:37 PMElijah Ben Izzy
03/19/2024, 7:38 PM(foo for foo in bar)
. It’s expecting *args
, so it just thinks it has one, and its of type generator.Pengyu Chen
03/19/2024, 7:39 PMPengyu Chen
03/19/2024, 7:39 PMElijah Ben Izzy
03/19/2024, 7:39 PMElijah Ben Izzy
03/19/2024, 7:45 PMresolve
? I’m curious if doing something like this might be simpler:
Request the cohorts you want — tag them with some tag and select all of the ones that get tagged from the driver. This works well if its the final step — if its intermediate it gets a little trickier.
@tag(data_product="base")
def df(...) -> pd.DataFrame:
...
@tag(data_product="cohorts")
def cohort_month(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="M").astype(str))
@tag(data_product="cohorts)
def cohort_quarter(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Q").astype(str))
@tag(data_product="cohorts")
def cohort_year(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Y").astype(str))
vars = dr.list_available_variables({"data_product" : ["base", "cohort"]})
# or
vars = dr.list_available_variables({"data_product" : ["base"]}
dr.execute(vars)
Pengyu Chen
03/19/2024, 8:31 PMtable config + dependent dataframe => assign cohort(s) to each row => group by cohort(s) => metric calculations at a cohort level => compile metrics into final table
Because the requirements of new tables are changing from time to time, what I'm trying to build is somewhat like a generic template such that given the specifications (e.g., a yaml config) of a target table, the pipeline can figure out the data processing and calculations required including cohort assignment. That's why I thought resolve
could be helpful here -- for example, table A may be the aggregation of [cohort1, cohort2]
and table B may be the aggregation of ["cohort3"]
, which should be something configurable at run time. In this case would it make sense to use resolve
?Elijah Ben Izzy
03/19/2024, 8:39 PMresolve
does work. I’d have to think more, but I have a sense that you could probably get away with tags, but you’d have to be smart about parameters/how you pass it in. There’s a spectrum between configuring at runtime and at compile-time, and resolve
is all the way at the “compile-time” side of it.
I think the defining aspect of your use-case is that you have splitting prior to aggregations, an those cohorts
are then aggregated. The way to avoid resolve is generally to push i more into the runtime side (E.G. select “cohorts”), but there’s a trade-off. The functions become less granular.
So yeah, what you’re doing makes sense, especially if it works for you! Will see if others have ideas on how to model it in case I missed something 🙂Elijah Ben Izzy
03/19/2024, 8:40 PMPengyu Chen
03/19/2024, 8:52 PMPengyu Chen
03/19/2024, 8:52 PMThe way to avoid resolve is generally to push i more into the runtime side (E.G. select “cohorts”)
Pengyu Chen
03/19/2024, 8:52 PMElijah Ben Izzy
03/19/2024, 8:59 PMdf
and aggregated by a list of cohorts. It would have one column per-cohort. Would be less granular, but you could just pass it in and choose how to aggregate at runtime.
def cohort_aggregated(
df: pd.DataFrame,
frequencies: List[str]
):
...
All this said, I think the easiest solution might be to have everything defined as series (each key/granularity combined), and then just have a mapping of columns to tables. Then you can use the original solution… @parameterize
might help make it a little more reuse-friendly.
At a high-level:
• for every metric/granurity, there exists a function (obv might have dependencies)
• you have a mapping of tables -> function
• you query just the ones you want, and those are the only ones that are run. They get joined in the way you want.Stefan Krawczyk
03/19/2024, 9:09 PMElijah Ben Izzy
03/19/2024, 9:15 PMdef cohort_quarter(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Q").astype(str))
def cohort_month(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="M").astype(str))
def cohort_quarter(df: pd.DataFrame) -> pd.Series:
return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Q").astype(str))
def something_else_month(...) -> pd.Series:
...
def something_else_year(...) -> pd.Series:
...
data_tables = load_from_yaml(...) # mapping of data names to columns
for data_table, columns in data_tables.items():
output = dr.execute(columns)
save(output, ...) # or us materializers
Note that you can use @parameterize
if you have commonalities between them. This basically lets you define all the available ones, then you just choose which go where.Pengyu Chen
03/19/2024, 9:31 PMcolumns
, I guess they would be something like ["metric_a_acq_quarter, "metric_b_acq_quarter", ...]
in the code snippet below (say the table pertains to some metrics per acquisition quarter)? Is this what you had in mind wrt "have everything defined as series (each key/granularity combined)"?
@extract_columns("metric_a_before_agg", "metric_b_before_agg", ...)
def extract_df(df: pd.DataFrame) -> pd.DataFrame:
return df
def metric_a_resale_month(metric_a_before_agg: pd.Series, resale_month: pd.Series) -> pd.Series:
return pd.concat([metric_a_before_agg, resale_month], axis=1).gruopby("resale_month").sum()
def metric_b_acq_quarter(metric_b_before_agg: pd.Series, acq_quarter: pd.Series) -> pd.Series:
return pd.concat([metric_b_before_agg, acq_quarter], axis=1).gruopby("acq_quarter").sum()
Elijah Ben Izzy
03/19/2024, 9:37 PM@parameterize
can help you if you are commonly repeating, say, granularity: https://hamilton.dagworks.io/en/latest/reference/decorators/parameterize/.Pengyu Chen
03/19/2024, 9:39 PMElijah Ben Izzy
03/19/2024, 9:39 PMPengyu Chen
03/19/2024, 9:40 PMElijah Ben Izzy
03/19/2024, 9:40 PMStefan Krawczyk
03/26/2024, 12:21 AMThreadScribe
03/26/2024, 12:21 AMThreadScribe
03/26/2024, 12:21 AM@resolve
. The error was due to passing a generator object as a dependency to group()
, resulting in an invalid type exception. By updating the code to use *args
instead of a generator object, the issue was resolved.
The user, new to Hamilton, sought to understand if resolve
was the appropriate approach for their use case. They were creating various business metric tables against a dataframe, where each table might aggregate data from different dimensions (cohorts). They iterated on the specific requirements of their task and the flexibility needed at runtime for configuring the aggregation of different cohorts, contemplating the use of tags for selecting cohorts.
A discussion ensued around the use of tags and the trade-offs between configuring at runtime and compile-time. The conversation also delved into alternative approaches, including defining all granularities as series, mapping tables to functions, and using @parameterize
for reusability. This pattern was recommended as it can function as a metric catalog and enable documentation generation.
After considering the suggestions, the user expressed their appreciation and intended to build upon the insights gathered from the discussion.
The discussions led to the recommendation of a pattern for defining functions for each metric granularity, mapping tables to functions, and using @parameterize
for reusability and documentation generation.
The user appreciated the help and expressed their plan to proceed with the newly gathered insights.ThreadScribe
03/26/2024, 12:21 AMPengyu Chen
04/16/2024, 4:56 PMpd.Series
) directly coming from the input and wanted to group them by different dimensions (another Series) and get some aggregations , I figured I could do something like this:
@parametrize(
**{
f"{metric}_{operation}_by_{cohort}": dict(metric=source(metric), operation=value(operation), cohort=source(cohort))
for metric, operation, cohort in cartesian_product(metric=RAW_METRICS, operation=OPERATIONS, cohort=COHORTS)
}
)
def second_level_metrics(metric: pd.Series, cohort: pd.Series, operation: str):
# group "metric" by "cohort", and perform "operation" maybe using `eval()`
df = pd.concat([metric, cohort], axis=1)
return df.groupby(cohort.name)[metric.name].operation()
However, I find this approach does not scale well and the nodes become harder and harder to manage if I want to create more nodes based off of the aggregations and further (please see attached). Wondering if you have any thoughts on this? Thank you!Elijah Ben Izzy
04/16/2024, 6:41 PMPengyu Chen
04/16/2024, 7:33 PMfirst_level_metrics x cohorts x operations
• "third-level metrics" are the metrics derived from second level metrics. However, different types of them may be the cartesian product of different subsets of the second level metrics
◦ for instance:
▪︎ metrics1 is a collection of subset1 x subset3 x cohorts
▪︎ metrics2 is a collection of subset2 x subset3 x cohorts
▪︎ metrics3 is a collection of subset4 x subset5 x cohorts
◦ this means that to define them, I have to either
▪︎ (1) enumerate every possible combination (like in my code, I had to repeat revenue_sum
, underwriting_sum
, price_1
, price_2
), or
▪︎ (2) define subset1
, subset2
etc as separate constant variables and use dict comprehension (e.g, subset1 = ["revenue", "underwriting_revenue", "cost", "underwriting_cost"]; subset2 = ["price_1", "price_2"]
, ...)
▪︎ But I find neither solutions satisfactory enough because in the latter solution it would still be nontrivial to manage those different variables.Pengyu Chen
04/16/2024, 7:36 PMPengyu Chen
04/16/2024, 7:38 PMsubset1
, subset2
etc, then this would not really be a problemElijah Ben Izzy
04/16/2024, 7:43 PM@subdag
Having a hard time fully rationalizing how yours might look, but I’m imagining something like this:
@subdag(
metrics_module,
inputs={"cohort" : value("x"), "operation": value("y"),
config=... # usually empty but you can reconfigure the subdags
)
def cohort_{x}_operation_{y}_metrics(
metric_1_from_subdag: pd.Series,
metric_2_from_subdag: pd.Series,
...
) -> pd.DataFrame:
return join(**all_parameters_above) # or however you want to do this
# This can also return a dict if you want to expose them individually -- IMO a dataframe is nice cause you can use the dataframe node name to disambiguate
Elijah Ben Izzy
04/16/2024, 7:45 PM@parameterized_subdag
, which is @subdag
on power-mode. Basically run a ton of those over a set of parameters. https://hamilton.dagworks.io/en/latest/reference/decorators/parameterize_subdag/. E.G. for each cohort/operation[
I can draw up some code, but want to see if this makes sense more.
High-level, however of everything I just said:
1. You have a module that’s “repeated” for each cohort/operation combination
2. You rerun that using @subdag
for each combination
3. You expose the results for each combination as you see fit
4. You can use @parameterize_subdag
to do (2) in a loop/list comprehension (although it can get a bit messy)Pengyu Chen
04/16/2024, 7:52 PMPengyu Chen
04/16/2024, 7:52 PMElijah Ben Izzy
04/16/2024, 7:53 PMStefan Krawczyk
04/16/2024, 10:08 PMinline
in the decorator. You can abstract those out into a function(s) to help make the code more readable — there is also the possibility of creating a custom decorator which would amount to the same thing.Pengyu Chen
04/16/2024, 10:50 PMStefan Krawczyk
04/16/2024, 10:53 PMPengyu Chen
04/23/2024, 5:25 PM@subdag(
metrics_module,
inputs={"cohort" : value("x"), "operation": value("y"),
config=... # usually empty but you can reconfigure the subdags
)
def cohort_{x}_operation_{y}_metrics(
metric_1_from_subdag: pd.Series,
metric_2_from_subdag: pd.Series,
...
) -> pd.DataFrame:
metrics_module
subdag is probably going to have 100+ nodes. Most likely what I want to do with the function decorated with @subdag
is to combine all of them. Is there a way to get access to all the nodes in the subdag without enumerating them in function parameters?Elijah Ben Izzy
04/23/2024, 6:24 PMinject
, which will be a little manual, but very explicit. I might suggest something like this:
1. Have an intermediate node that is all of them “joined” — this lives inside the subdag
2. Have the subdag parameter declare that as a dataframe
3. Use inject
to inject all the nodes inside the subdag into that and then join them.
# metrics_module.py
@inject(data=group(**{key=source(key) for key in METRICS_YOU_CARE_ABOUT}) # or just all of them
def joined_final_data(data: Dict[str, pd.Series]):
... # join
# other_module_with_subdag.py
@subdag(
metrics_module,
inputs={"cohort" : value("x"), "operation": value("y"),
config=... # usually empty but you can reconfigure the subdags
)
def cohort_{x}_operation_{y}_metrics(
joined_data: pd.DataFrame
...
) -> pd.DataFrame:
... # just return it or do something smarter
This has the following advantages:
1. You explicitly list it out, avoiding intermediate ones
2. You can avoid a long list of parameters
Note that if you want, you can be clever…
NODES_IN_MODULE = crawl(module) # crawl for function names, but be a bit careful here
See https://hamilton.dagworks.io/en/latest/reference/decorators/inject/#hamilton.function_modifiers.inject for more details.Pengyu Chen
04/23/2024, 7:26 PMElijah Ben Izzy
04/23/2024, 7:28 PMPengyu Chen
04/23/2024, 9:54 PM@subdag(
metrics_module,
inputs={"cohort": group(source("acquisition_month"), source("resale_quarter"))},
config=None,
namespace=None,
external_inputs=[]
)
def acquisition_month_resale_quarter_metrics(joined_metrics: pd.DataFrame) -> pd.DataFrame:
return joined_metrics
gives error
InvalidDecoratorException: Input cohort must be either an UpstreamDependency or a LiteralDependency , not <class 'hamilton.function_modifiers.dependencies.GroupedListDependency'>.
Elijah Ben Izzy
04/23/2024, 9:57 PMPengyu Chen
04/23/2024, 10:02 PMbut I suggested that you put it inside the module so you didn’t come up with this.Sorry but not sure I totally understand this, could you please explain?
Elijah Ben Izzy
04/23/2024, 10:03 PMinject
) in a separate function adjacent to the metrics. E.G. so you only had to declare that in the @subdag
function. Otherwise you’d run into exactly what you found.
[fn_1, fn_2, …] -> join_fn (using @inject
)-> subdag
Rather than
[fn_1, fn_2, …] -> subdag (using @inject
)Pengyu Chen
04/23/2024, 10:14 PM[fn_1, fn_2, …] -> join_fn (using @inject)-> subdag
. The problem with grouped dependencies is because instead of one cohort, we may want to analyze multi-level cohorts so I would need to inject a list of cohorts to the subdag (metrics module) like group(source("acquisition_month"), source("resale_quarter")
. To resolve this problem I think it would require a code change in the Hamilton framework, or am I missing something?Pengyu Chen
04/23/2024, 10:16 PMPengyu Chen
04/23/2024, 10:21 PM# metrics_module.py
def metric1(col1, cohorts: List[pd.Series]) -> pd.Series:
...
def metric2(col2, cohorts: List[pd.Series]) -> pd.Series:
...
@inject(data=group(source("metric1"), source(""metric2"")))
def joined_metrics(data: Dict[str, pd.Series]) -> pd.DataFrame:
return pd.concat(data, axis=1)
# module with subdag
@subdag(
metrics_module,
# notice a list of nodes are passed in
inputs={"cohorts": group(source("acquisition_month"), source("resale_quarter"))},
)
def subdag_metrics(joined_metrics: pd.DataFrame) -> pd.DataFrame:
return joined_metrics
Elijah Ben Izzy
04/23/2024, 10:22 PMPengyu Chen
04/23/2024, 10:22 PMElijah Ben Izzy
04/23/2024, 10:22 PMPengyu Chen
04/23/2024, 10:24 PMcohorts
is the changing input I'd like to inject into the subdag and it is a list
@subdag(
metrics_module,
inputs={"cohorts": group(source("acquisition_month"), source("resale_quarter"))},
)
def subdag1(joined_metrics: pd.DataFrame)
...
@subdag(
metrics_module,
inputs={"cohorts": group(source("acq_year"), source("resale_year"))},
)
def subdag2(joined_metrics: pd.DataFrame)
return joined_metrics
Elijah Ben Izzy
04/23/2024, 10:26 PMmetric_1
calculated for:
1. Each of the input series (acq_year and resale_year)
2. The join of the input series (acq_year + resale_year combined)Pengyu Chen
04/23/2024, 10:30 PMThe join of the input series (acq_year + resale_year combined)This. For instance:
def metric1(col1, cohorts: List[pd.Series]) -> pd.Series:
...
would concat col1
and cohorts
, and return df.groupby(cohorts)[col1].sum()
Elijah Ben Izzy
04/23/2024, 10:33 PMPengyu Chen
04/23/2024, 10:33 PMPengyu Chen
04/23/2024, 10:34 PMElijah Ben Izzy
04/23/2024, 10:35 PMPengyu Chen
04/23/2024, 10:38 PMcol1
with the df_cohorts
in order to do the grouping? (since you mentioned "no need to concat")Elijah Ben Izzy
04/23/2024, 10:41 PM@inject
) -> metrics -> subdag
cohort_group_2 (using @inject
) -> metrics -> subdag
…Pengyu Chen
04/23/2024, 10:42 PMPengyu Chen
04/23/2024, 10:42 PMPengyu Chen
04/23/2024, 10:43 PMPengyu Chen
04/23/2024, 10:44 PMcohort_group
s but I don't think that's a big issueElijah Ben Izzy
04/23/2024, 10:45 PMElijah Ben Izzy
04/23/2024, 10:46 PMcohort_groups
), which makes tracking/debugging simpler (same code, different input group, but its more explicit)Pengyu Chen
04/23/2024, 10:46 PMPengyu Chen
04/23/2024, 10:47 PMElijah Ben Izzy
04/23/2024, 10:47 PM