Hello folks, I'm trying to dynamically resolve th...
# hamilton-help
p
Hello folks, I'm trying to dynamically resolve the upstream function dependencies using
@resolve
with the code I will be attaching in this thread. I got this error however:
hamilton.function_modifiers.base.InvalidDecoratorException: Dependency: <generator object <lambda>.<locals>.<genexpr> at 0x7f90febd67b0> is not a valid dependency type for group(), must be a LiteralDependency or UpstreamDependency.
. If I replace the
group()
call with
group(source("cohort_month"), source("cohort_quarter"))
the issue would be resolved; however I do not want to hardcode this. I'm still pretty new to Hamilton and wondering if I could please get some help? Thanks 🙏
👀 1
code:
Copy code
def cohort_month(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="M").astype(str))

def cohort_quarter(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Q").astype(str))

def cohort_year(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Y").astype(str))

@resolve( 
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda cohorts: inject(
        cohort_series=group(
            source(cohort) for cohort in cohorts
        )
    ),
)
def df_with_cohorts(df: pd.DataFrame, cohort_series: List[pd.Series]) -> pd.DataFrame:
    return pd.concat([df] + cohort_series, axis=1)


config = {
        settings.ENABLE_POWER_USER_MODE: True,
        "cohorts": ["cohort_month", "cohort_quarter"],
}

dr = (
  driver.Builder()
  .with_modules(module)
  .with_config(config)
  .build()
)
e
OK, haven’t tested it out but I think there’s a 3-char change that will fix it.
Copy code
@resolve( 
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with=lambda cohorts: inject(
        cohort_series=group(
            *[source(cohort) for cohort in cohorts]
        )
    ),
)
...
p
Yes it works!
🎉 1
e
What’s happening — you’re passing in a generator object, specified by python with
(foo for foo in bar)
. It’s expecting
*args
, so it just thinks it has one, and its of type generator.
p
understood, yes that makes sense!
Thanks so much!
e
Yeah! Glad I saw that 🙂 Its very subtle — could have definitely been a bit of a rabbit hole 😆
😄 1
Out of curiosity — why are you using
resolve
? I’m curious if doing something like this might be simpler: Request the cohorts you want — tag them with some tag and select all of the ones that get tagged from the driver. This works well if its the final step — if its intermediate it gets a little trickier.
Copy code
@tag(data_product="base")
def df(...) -> pd.DataFrame:
    ...

@tag(data_product="cohorts")
def cohort_month(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="M").astype(str))

@tag(data_product="cohorts)
def cohort_quarter(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Q").astype(str))

@tag(data_product="cohorts")
def cohort_year(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Y").astype(str))

vars = dr.list_available_variables({"data_product" : ["base", "cohort"]}) 
# or
vars = dr.list_available_variables({"data_product" : ["base"]}

dr.execute(vars)
p
Yes I do see the merit of using tags, though I find it a little tricky to port the idea directly to that specific part of my project. Basically we are looking to create various business metrics tables against a dataframe, where each table may be the aggregation from different dimensions (cohorts in my example), The workflow would be something like:
table config + dependent dataframe => assign cohort(s) to each row => group by cohort(s) => metric calculations at a cohort level => compile metrics into final table
Because the requirements of new tables are changing from time to time, what I'm trying to build is somewhat like a generic template such that given the specifications (e.g., a yaml config) of a target table, the pipeline can figure out the data processing and calculations required including cohort assignment. That's why I thought
resolve
could be helpful here -- for example, table A may be the aggregation of
[cohort1, cohort2]
and table B may be the aggregation of
["cohort3"]
, which should be something configurable at run time. In this case would it make sense to use
resolve
?
e
Ahh yes, that makes sense. Funny, you’re not the only person with almost that exact use-case (metics aggregations across different dimensions). So
resolve
does work. I’d have to think more, but I have a sense that you could probably get away with tags, but you’d have to be smart about parameters/how you pass it in. There’s a spectrum between configuring at runtime and at compile-time, and
resolve
is all the way at the “compile-time” side of it. I think the defining aspect of your use-case is that you have splitting prior to aggregations, an those
cohorts
are then aggregated. The way to avoid resolve is generally to push i more into the runtime side (E.G. select “cohorts”), but there’s a trade-off. The functions become less granular. So yeah, what you’re doing makes sense, especially if it works for you! Will see if others have ideas on how to model it in case I missed something 🙂
Thanks for walking me through your use-case!
p
Thank you for your help and your insights! Just get started with Hamilton and I really like it -- feel like it's something I've been looking for 😆
The way to avoid resolve is generally to push i more into the runtime side (E.G. select “cohorts”)
could you please elaborate a little more on this line?
e
Yeah, so, say you had a function that took in
df
and aggregated by a list of cohorts. It would have one column per-cohort. Would be less granular, but you could just pass it in and choose how to aggregate at runtime.
Copy code
def cohort_aggregated(
    df: pd.DataFrame,
    frequencies: List[str]
):
    ...
All this said, I think the easiest solution might be to have everything defined as series (each key/granularity combined), and then just have a mapping of columns to tables. Then you can use the original solution…
@parameterize
might help make it a little more reuse-friendly. At a high-level: • for every metric/granurity, there exists a function (obv might have dependencies) • you have a mapping of tables -> function • you query just the ones you want, and those are the only ones that are run. They get joined in the way you want.
s
@Elijah Ben Izzy can you sketch the code? I’m finding it hard to follow too 🙂
e
Sure:
Copy code
def cohort_quarter(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Q").astype(str))

def cohort_month(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="M").astype(str))

def cohort_quarter(df: pd.DataFrame) -> pd.Series:
    return pd.Series(pd.PeriodIndex(df["date"].dt.date, freq="Q").astype(str))

def something_else_month(...) -> pd.Series:
    ...
def something_else_year(...) -> pd.Series:
    ...

data_tables = load_from_yaml(...) # mapping of data names to columns

for data_table, columns in data_tables.items():
    output = dr.execute(columns) 
    save(output, ...) # or us materializers
Note that you can use
@parameterize
if you have commonalities between them. This basically lets you define all the available ones, then you just choose which go where.
p
I think I see your point. As for the
columns
, I guess they would be something like
["metric_a_acq_quarter, "metric_b_acq_quarter", ...]
in the code snippet below (say the table pertains to some metrics per acquisition quarter)? Is this what you had in mind wrt "have everything defined as series (each key/granularity combined)"?
Copy code
@extract_columns("metric_a_before_agg", "metric_b_before_agg", ...)
def extract_df(df: pd.DataFrame) -> pd.DataFrame:
    return df

def metric_a_resale_month(metric_a_before_agg: pd.Series, resale_month: pd.Series) -> pd.Series:
    return pd.concat([metric_a_before_agg, resale_month], axis=1).gruopby("resale_month").sum()

def metric_b_acq_quarter(metric_b_before_agg: pd.Series, acq_quarter: pd.Series) -> pd.Series:
    return pd.concat([metric_b_before_agg, acq_quarter], axis=1).gruopby("acq_quarter").sum()
e
Yep! Exactly. Note that
@parameterize
can help you if you are commonly repeating, say, granularity: https://hamilton.dagworks.io/en/latest/reference/decorators/parameterize/.
p
Amazing! I do think this is the way to go -- much cleaner and understandable than what I was originally thinking of doing
e
Great! Yeah this is a pattern another power-user i’ve met does. It’s fairly simple — the nice thing is that it doubles up as a metric catalog. E.G. you can generate documentation from all of your functions…
🙌 1
p
Appreciate your help! let me see what I can built out based off of this convo 🙂
e
Of course! Thanks for bringing you question + use-case 🙏
s
@ThreadScribe can you help summarize this thread so that we can make some documentation for the approach that @Elijah Ben Izzy suggested?
t
🪄 Summoning the summary elves... one moment!
Hey @Stefan Krawczyk, your summary is ready! Dynamic Function Dependency Resolution in Hamilton A user encountered an error while trying to dynamically resolve upstream function dependencies using
@resolve
. The error was due to passing a generator object as a dependency to
group()
, resulting in an invalid type exception. By updating the code to use
*args
instead of a generator object, the issue was resolved. The user, new to Hamilton, sought to understand if
resolve
was the appropriate approach for their use case. They were creating various business metric tables against a dataframe, where each table might aggregate data from different dimensions (cohorts). They iterated on the specific requirements of their task and the flexibility needed at runtime for configuring the aggregation of different cohorts, contemplating the use of tags for selecting cohorts. A discussion ensued around the use of tags and the trade-offs between configuring at runtime and compile-time. The conversation also delved into alternative approaches, including defining all granularities as series, mapping tables to functions, and using
@parameterize
for reusability. This pattern was recommended as it can function as a metric catalog and enable documentation generation. After considering the suggestions, the user expressed their appreciation and intended to build upon the insights gathered from the discussion. The discussions led to the recommendation of a pattern for defining functions for each metric granularity, mapping tables to functions, and using
@parameterize
for reusability and documentation generation. The user appreciated the help and expressed their plan to proceed with the newly gathered insights.
You can also view your Scribe HERE 👀
p
Hello folks, I wonder what would be the best way to "parametrize" @parametrize to avoid combinatorial explosion? Say I had a set of nodes/metrics (
pd.Series
) directly coming from the input and wanted to group them by different dimensions (another Series) and get some aggregations , I figured I could do something like this:
Copy code
@parametrize(
    **{
        f"{metric}_{operation}_by_{cohort}": dict(metric=source(metric), operation=value(operation), cohort=source(cohort))
        for metric, operation, cohort in cartesian_product(metric=RAW_METRICS, operation=OPERATIONS, cohort=COHORTS)
    }
)
def second_level_metrics(metric: pd.Series, cohort: pd.Series, operation: str):
    # group "metric" by "cohort", and perform "operation" maybe using `eval()`
    df = pd.concat([metric, cohort], axis=1)
    return df.groupby(cohort.name)[metric.name].operation()
However, I find this approach does not scale well and the nodes become harder and harder to manage if I want to create more nodes based off of the aggregations and further (please see attached). Wondering if you have any thoughts on this? Thank you!
👀 1
e
So, just to summarize, your problem is: 1. You have a cartesian product of metrics 2. It seems like you want to run more metrics on top of those (E.G. computations on top of other metric outputs) 3. You’re facing an issue where the DAG just gets complicated and parameterize is hard to understand, right? By scalability, do you mean complexity or performance?
p
That summarizes it well. By scalability I mean complexity. I think (1) is not a problem because the pattern to capture the cartesian product is consistent, but as we run more metrics on top of those, I start to see more patterns and it becomes complicated to define them in a maintainable way (i.e., not repeating everything) • Say we have many "first-level metrics" which are the input to the dataflow. • "second-level metrics" are the cartesian product of
first_level_metrics x cohorts x operations
• "third-level metrics" are the metrics derived from second level metrics. However, different types of them may be the cartesian product of different subsets of the second level metrics ◦ for instance: ▪︎ metrics1 is a collection of
subset1 x subset3 x cohorts
▪︎ metrics2 is a collection of
subset2 x subset3 x cohorts
▪︎ metrics3 is a collection of
subset4 x subset5 x cohorts
◦ this means that to define them, I have to either ▪︎ (1) enumerate every possible combination (like in my code, I had to repeat
revenue_sum
,
underwriting_sum
,
price_1
,
price_2
), or ▪︎ (2) define
subset1
,
subset2
etc as separate constant variables and use dict comprehension (e.g,
subset1 = ["revenue", "underwriting_revenue", "cost", "underwriting_cost"]; subset2 = ["price_1", "price_2"]
, ...) ▪︎ But I find neither solutions satisfactory enough because in the latter solution it would still be nontrivial to manage those different variables.
This would not be a problem if we just have a handful of potential metrics we'd like to get, as I can just enumerate all of them. But we have a lot more metrics that depend on each other and this set is growing.
But as I abstract out my problem I kind of feel there might not be direct solution within the Hamilton framework? I think if we could have a trusted source of definition for those
subset1
,
subset2
etc, then this would not really be a problem
e
So yeah, this is complicated, but I think we have the right tool to do this. See @subdag. The idea is that you can repeat a DAG as many times as you want. It would look something like this: 1. You’d define one per each cohort/operation (more on how you could consolidate this later) 2. You’d define the specific set of operations that each one uses 3. You wire inputs in for each cohort/operation combination 4. You get a series of metrics out from that subdag and pass it into the function decorated with
@subdag
Having a hard time fully rationalizing how yours might look, but I’m imagining something like this:
Copy code
@subdag(
    metrics_module,
    inputs={"cohort" : value("x"), "operation": value("y"),
    config=... # usually empty but you can reconfigure the subdags
)
def cohort_{x}_operation_{y}_metrics(
    metric_1_from_subdag: pd.Series,
    metric_2_from_subdag: pd.Series,
    ...
) -> pd.DataFrame:
    return join(**all_parameters_above) # or however you want to do this
    # This can also return a dict if you want to expose them individually -- IMO a dataframe is nice cause you can use the dataframe node name to disambiguate
So, the above has one per each — we also have
@parameterized_subdag
, which is
@subdag
on power-mode. Basically run a ton of those over a set of parameters. https://hamilton.dagworks.io/en/latest/reference/decorators/parameterize_subdag/. E.G. for each cohort/operation[ I can draw up some code, but want to see if this makes sense more. High-level, however of everything I just said: 1. You have a module that’s “repeated” for each cohort/operation combination 2. You rerun that using
@subdag
for each combination 3. You expose the results for each combination as you see fit 4. You can use
@parameterize_subdag
to do (2) in a loop/list comprehension (although it can get a bit messy)
p
I'll give some thoughts into this and let you know!
Thank you! @Elijah Ben Izzy
e
Yeah! I think it matches your case, but your case is complex and takes a bit to internalize 🙂
s
@Pengyu Chen also just to mention from a code organization standpoint — that the parameterize code needn’t all live
inline
in the decorator. You can abstract those out into a function(s) to help make the code more readable — there is also the possibility of creating a custom decorator which would amount to the same thing.
p
Thanks @Stefan Krawczyk! Makes sense to me!
s
also @Pengyu Chen to frame it another way — what is the API that you’d like here? (what would be ideal?) 🙂
p
Hey @Elijah Ben Izzy I just think into your approach and think it's a great idea! One follow up question
Copy code
@subdag(
    metrics_module,
    inputs={"cohort" : value("x"), "operation": value("y"),
    config=... # usually empty but you can reconfigure the subdags
)
def cohort_{x}_operation_{y}_metrics(
    metric_1_from_subdag: pd.Series,
    metric_2_from_subdag: pd.Series,
    ...
) -> pd.DataFrame:
metrics_module
subdag is probably going to have 100+ nodes. Most likely what I want to do with the function decorated with
@subdag
is to combine all of them. Is there a way to get access to all the nodes in the subdag without enumerating them in function parameters?
e
Hey! So yes, that’s a good point. Do you have a list of the nodes? Or is it static? Right now we don’t have the ability to just inject all of them, although that’s reasonable — would love an issue if you want! In the meanwhile, we have a tool called
inject
, which will be a little manual, but very explicit. I might suggest something like this: 1. Have an intermediate node that is all of them “joined” — this lives inside the subdag 2. Have the subdag parameter declare that as a dataframe 3. Use
inject
to inject all the nodes inside the subdag into that and then join them.
Copy code
# metrics_module.py
@inject(data=group(**{key=source(key) for key in METRICS_YOU_CARE_ABOUT}) # or just all of them
def joined_final_data(data: Dict[str, pd.Series]):
    ... # join

# other_module_with_subdag.py
@subdag(
    metrics_module,
    inputs={"cohort" : value("x"), "operation": value("y"),
    config=... # usually empty but you can reconfigure the subdags
)
def cohort_{x}_operation_{y}_metrics(
    joined_data: pd.DataFrame
    ...
) -> pd.DataFrame:
    ... # just return it or do something smarter
This has the following advantages: 1. You explicitly list it out, avoiding intermediate ones 2. You can avoid a long list of parameters Note that if you want, you can be clever…
Copy code
NODES_IN_MODULE = crawl(module) # crawl for function names, but be a bit careful here
See https://hamilton.dagworks.io/en/latest/reference/decorators/inject/#hamilton.function_modifiers.inject for more details.
p
Thank you @Elijah Ben Izzy for the detailed explanation! I might submit an issue regarding this feature when I get a chance but yes this work around should work for me 🙂
e
Great! And yeah I think it’s a pretty clean solution and has some preferable properties :)
🙌 1
1
p
sorry another question – is it possible to inject "grouped" dependencies into a subdag?
Copy code
@subdag(
    metrics_module,
    inputs={"cohort": group(source("acquisition_month"), source("resale_quarter"))},
    config=None,
    namespace=None,
    external_inputs=[]
)
def acquisition_month_resale_quarter_metrics(joined_metrics: pd.DataFrame) -> pd.DataFrame:
    return joined_metrics
gives error
Copy code
InvalidDecoratorException: Input cohort must be either an UpstreamDependency or a LiteralDependency , not <class 'hamilton.function_modifiers.dependencies.GroupedListDependency'>.
e
So, its not (currently) feasible, but not for any specific reason. Just that it hasn’t been implemented (the `source`` stuff is a little manual). That was my first thought of the approach (extremely concise), but I suggested that you put it inside the module so you didn’t come up with this. That said, it shouldn’t be too hard.
p
but I suggested that you put it inside the module so you didn’t come up with this.
Sorry but not sure I totally understand this, could you please explain?
e
Yeah — so the suggestion/code above had you put the join (with
inject
) in a separate function adjacent to the metrics. E.G. so you only had to declare that in the
@subdag
function. Otherwise you’d run into exactly what you found. [fn_1, fn_2, …] -> join_fn (using
@inject
)-> subdag Rather than [fn_1, fn_2, …] -> subdag (using
@inject
)
p
Thanks! I'm already following
[fn_1, fn_2, …] -> join_fn (using @inject)-> subdag
. The problem with grouped dependencies is because instead of one cohort, we may want to analyze multi-level cohorts so I would need to inject a list of cohorts to the subdag (metrics module) like
group(source("acquisition_month"), source("resale_quarter")
. To resolve this problem I think it would require a code change in the Hamilton framework, or am I missing something?
Saying that I do see some other work around for this problem. Alternatively, I'm also happy to add grouped dependencies support for subdag to the source code if it's something straightforward.
To clarify this is what I'm trying to do
Copy code
# metrics_module.py
def metric1(col1, cohorts: List[pd.Series]) -> pd.Series:
    ...

def metric2(col2, cohorts: List[pd.Series]) -> pd.Series:
    ...

@inject(data=group(source("metric1"), source(""metric2"")))
def joined_metrics(data: Dict[str, pd.Series]) -> pd.DataFrame:
    return pd.concat(data, axis=1)


# module with subdag
@subdag(
    metrics_module,
    # notice a list of nodes are passed in
    inputs={"cohorts": group(source("acquisition_month"), source("resale_quarter"))},
)
def subdag_metrics(joined_metrics: pd.DataFrame) -> pd.DataFrame:
    return joined_metrics
e
Oh, I see, so the inputs are the thing you’re going to be varying, right?
p
Yes exactly
e
E.G. you want to pass a list in
p
here
cohorts
is the changing input I'd like to inject into the subdag and it is a list
Copy code
@subdag(
    metrics_module,
    inputs={"cohorts": group(source("acquisition_month"), source("resale_quarter"))},
)
def subdag1(joined_metrics: pd.DataFrame)
    ...

@subdag(
    metrics_module,
    inputs={"cohorts": group(source("acq_year"), source("resale_year"))},
)
def subdag2(joined_metrics: pd.DataFrame)
    return joined_metrics
e
OK, cool, I’m not sure that’s possible but my guess is not. One workaround (until that is possible) is to: 1. Group before passing in in a list 2. Pass in that single one 3. Group at the end That said, there are a few more ways you could do it, but just to clarify — these both get combined, correct? E.G. the list gets added together in some way? Another way of asking: is
metric_1
calculated for: 1. Each of the input series (acq_year and resale_year) 2. The join of the input series (acq_year + resale_year combined)
p
The join of the input series (acq_year + resale_year combined)
This. For instance:
Copy code
def metric1(col1, cohorts: List[pd.Series]) -> pd.Series:
    ...
would concat
col1
and
cohorts
, and return
df.groupby(cohorts)[col1].sum()
e
Are all metrics of the same two sets? E.G. they all take in the same cohorts? Or do different metrics take in different cohorts?
p
Yes all metrics are supposed to take in the same list of cohorts
FWIW we have different tables, and each table would only be showing metrics for a particular set of cohorts (1 or more)
e
Ok, got it. Cool. So yeah, I like the idea of grouping it in the input, but I really think that if you group it into cohort_data as a dataframe you’re going to have a very easy time. Then you pass those in for each subdag, compute ont hat dataframe (no need to concat), then output the final result. The advantage of this is you’re not concatting the same thing twice.
p
> group it into cohort_data as a dataframe you’re going to have a very easy time. Then you pass those in for each subdag, compute ont hat dataframe (no need to concat), then output the final result. To make sure I understand, are you suggesting we pass in "cohorts" as a dataframe rather than a list of Series? I think this is a great idea, but I think we would still need to concat the
col1
with the
df_cohorts
in order to do the grouping? (since you mentioned "no need to concat")
e
Yep! One node for each cohort. To be clear, no need to concat inside the metric functions — you’ll do it outside. So: cohort_group_1 (using
@inject
) -> metrics -> subdag cohort_group_2 (using
@inject
) -> metrics -> subdag …
p
Now I see what you are talking about!
This is brilliant, I think this is the way to go!
I feel there are a lot of tools provided by the framework like building blocks but always wonder what would be the best way to put them together
The tradeoff here is now we have to maintain additional
cohort_group
s but I don't think that's a big issue
e
Awesome! Hope its as straightforward as it seems. High-level principle is to avoid making complex parameterization by just creating the dataset explicitly. And yeah, it’s a good point. It’s a lot of different capabilities that add together to form some really powerful stuff, but sometimes a little hard to find the best way. Also plenty of ways to get around the framework’s best practices 😆
🙌 1
Re: the trade-off, I think its an improvement. You get to have that as a specific dataset (
cohort_groups
), which makes tracking/debugging simpler (same code, different input group, but its more explicit)
p
Makes sense!
Thanks so much for your time and insights, appreciate it 🙂
e
Yeah! Interesting use-case, and I think a pretty simple, self-documenting solution