This message was deleted.
# hamilton-help
s
This message was deleted.
m
Imagine I wanna do something that can be expressed in SQL like this:
Copy code
SELECT
  <DIMS>
  , SUM(source_value) AS source_sum  
FROM (
  SELECT 
    source_name
    , source_value
    , <DIMS>
  )
  FROM <some_table>
  WHERE
    source_name in (<source_name_list>)
    AND <TIME_WINDOW_FILTER>
)
GROUP BY source_name, <DIMS>
ORDER BY <DIMS>
what is a good design pattern to do this with Hamilton. In particular, I explored
@resolve
to potentially design his but my biggest issue is that I don't know upfront all the possible values for a given dimension. Let's say the
<DIMS>
is stock_symbol, it could be 100's, does it make sense to create 100's of Hamilton nodes dynamically (seems a bit off to me).
alternatively, I could create a hamilton node that does a
df.groupby(<DIMS>).agg(['sum'])
is that a recommended way?
for context: SUM() is just an example here, it could be something compute-heavy and I'm looking at this to potentially create aggregates based on Hamilton nodes in Python
s
Interesting question! To clarify: 1. you want to pull some data, with M known columns (i.e. code makes assumptions on what these are) and N unknown columns (filtering to the right subset some how) — N is provided at runtime. 2. then grouped by the values in (1) do some aggregations using the M known column values ?
m
let's make it concrete, say, I want to compute trading volume by symbol, by date my dimensions are
<DIMS> = date, stock_ticker
and
source_name = 'volume'
(column in <some_table> that represents trading volume) my the inner SQL table becomes this (simplifying the in () clause)
Copy code
SELECT 
    source_name
    , source_value
    , date
    , symbol 
  FROM <some_table>
  WHERE
    source_name = 'volume'
    AND date BETWEEN start_date AND end_date
what I know source_names, in this case
'volume'
what I do NOT know how many
symbols
there will be on a given date
@Stefan Krawczyk that is the list of symbols will NOT be provided at run-time. I just wanna aggregate whatever exists in the upstream table
👍 1
in the end, I wanna solve this btw 1. I have a bunch of upstream tables that have data in the format of date, symbol, some columns (either wide table, or as example above tall skinny table) 2. do a bunch of group by like calcs by various dimensions, in above case, by date, symbol 3. potentially add additional filters into the mix (specified at run-time) 4. and even add sampling for daily, weekly, monthly or rolling window stats (we can solve this once we solve 1-3)
s
and to confirm, the aggregations can be applied over all symbols & dates? (nothing special based on value?)
m
correct
s
at runtime, what would you like to change/modify/specify?
m
for instance, imagine you want to compute daily/weekly open/high/low/close/volume candles from tick data
at run-time, I would specifc things like • from/to date range • potentially lookback windows if I wanted to compute rolling averages of price or volume (staying with the stock data example)
and ideally make this generic, so I could specifiy
source_name
and even
aggregation
types (SUM or whatever) at run-time as well
(tbd, still exploring what's best) main motivation: I want to move this to Hamilton so I can unit test the aggregations (as they get more complex) and move things out of SQL
👍 1
s
This is great context thanks. I have to run in a bit so wont be able to provide some ideas for code (that and I’d need to write some pandas code to ground things). @Elijah Ben Izzy is a little more familiar with the domain and might have thoughts too. In terms of guidance at a high level, you’d want to draw a DAG (flowchart) of the steps that you want to happen. To keep it simple start by fixing values (like you did above). Then slowly make it more parameterizeable. Basically if we can draw it, we should be able to map it to a hamilton construct. Possibilities: • parameterize* - “create lots of functions based on a parameterization” • “DAG-ception” - have Hamilton DAG within a function of a Hamilton DAG. • subdag - this is ^ but enables you to make one big Hamilton DAG. • parameterized_subdag - ^ parameterize the above • parallelizeable/collect - basically isomorphic to subdag, but where the parameterization is dynamic, and the flow is fixed. • modify the final step of how a result is returned, e.g. write a custom result_builder.
m
thanks, let me study these a bit more. I had discussed related topic with @Elijah Ben Izzy here: https://hamilton-opensource.slack.com/archives/C03M33QB4M8/p1699488418445009?thread_ts=1699132686.896269&amp;cid=C03M33QB4M8 and am pretty familiar with how
@resolve
works at this point. parameterized_subdag and parallelizeable/collect are a good tip, this would make this then easily parallelizable on ray.io later on
e
Will take a look a little bit later this afternoon and come back with some pointers, been AFK :)
🙏 1
s
👍 and if there’s a gap in what we have — then we can create an issue for it :)
m
thanks - appreciate all these quick replies even on weekends - you guys rock!
👌 1
big picture: • create something like a metrics cube, taking raw data and aggregating it to various stats, for a given set of dimensions • dimensions can be different, depending on which upstream data is used • make it dynamic / easily extendable with as little code duplication as possible ◦ so dimensions can be set at run-time at will
for prototyping, happy to go with your example data here: https://github.com/DAGWorks-Inc/hamilton/blob/90bf57dd2ba3968358df538ac143db3c24a9290b/examples/reusing_functions/reusable_subdags.py#L12 and image to aggregate this to • count
user_id
by region/date (but assuming you don't know what the # of regions are at runtime)
e
Heh we get our best thinking done on weekend 😆 So, quite a few ways to do this. As always, depends on your requirements. The big q is how much do you want to represent this in the DAG? E.G. do you want every step/every computation to be represented as small units in the DAG? This is nice because you can visualize it, test tiny pieces, and do more things like that, but the code could be somewhat complicated and you’re leaning heavily on decorator constructs. OTOH, do you want a few bigger steps in the DAG and have the logic in the steps? This is nice because the code is going to be easier/more natural to write, but complex in that you’re offloading the design to your own logical pieces, foregoing the benefits of Hamilton. So, moving from bite-sized hamilton functions, each representing a logical datapoint (aggregation + filter/groupby key) -> bigger hamilton function (each one reprsenting a few stages, I’d recommend (high-level approaches, happy to dig in more and show how it would apply to your use-case) (1) `parameterize_subdag`/`parameterize` +
resolve
— do a quick query of the dimensions to groupby beforehand, and pass those in in config. So, one node per aggregation/groupby. small pieces, expressive DAG, static (2) Dynamic version of (1) — if you really can’t determine this prior to runtime, then use
Parallelizable
+
Collect
for each subset that you don’t know. small pieces, expressive DAG, dynamic (3) A little bit in the middle. The trick here is pretty simple — anything that can be known beforehand should be represented in the DAG (aggregation types, windows, etc…). Anything known after the fact will be represented as columns inside your dataframe. So, the result will be a bunch of dataframes (a subset of which you can choose to query), with, say, regions as columns. Then you have the choice as to how static you want this to be. Do you do a new function for each aggreagation type with some parameterization? Or pass it all in? Some options here. But, there is a node (or a few, perhaps) for each aggregation/operation, and you’ll probably use
@resolve
+
@parameterize
or
@parameterize_subdag
to determine that. medium pieces, somewhat expressive DAG, static (4) Big pieces — E.G. dataframes, then apply a series of operations to them, passing in the set of aggregations. Doesn’t naturally solve your problems, but is an easy way to model it if you don’t want to rely exclusively on Hamilton constructs. big pieces, indexpressive DAG, static Presuming I have a good enough view of what you’re trying to do, I’d highly recommend (3). It will: 1. Form a nice readable DAG (caveat — this depends on how you leverage the hamilton constructs — note on that in a bit) 2. Likely be more performant (a little subtle, but pandas/other libraries are optimized for groupby/apply operations) 3. Allow for some custom data quality checks to ensure something matches your expectations 4. Allow for the computations done to be expressed in the DAG, nicely self-chunked and parallelizable (see point (2)) 5. Allow for further grouping (say, perhaps, if you want to aggregate by region + industry or whatever down the road). Re: point (1), I would highly recommend first doing this with the vanilla python decorators, then building a domain-specific expressive decorator that is just a thin wrapper on top of them. This way you can use the power of the Hamilton decorators but tailor them towards your use-case. A little nuanced on ways to design this right, but I’ll happily point you in the right direction once you’re at this point.
Hope this helps!
m
@Elijah Ben Izzy this is super helpful - thank you! I'm hacking along the
Parallelizable
+
Collect
route right now, and I believe it could work well. tough your (3) is also cool. let me digest all of the above a bit and then I circle back with what I come up with soon for additional feedback. thanks again!
e
Great! Let us know what you find. Its still a bit less mature (which is why it requires explicit enabling in the driver builder). Also, worth noting that if you want dynamically structured DAGs within a parallel group it can be tricky (e.g. you cant change the behavior of each parallel step with a different config, it has to be global — just the inputs). If you need this type of thing, the standard pattern is to split into groups of behavior then apply different paths through different parallel/collect statements. Not sure this is relevant to you, but the usual way this shows up is if you’re splitting up to process multiple sets of files, you’d have a different path for PDFs, pngs, etc…
👍 1
m
@Elijah Ben Izzy qq: I'm trying this example snippet (from here)
Copy code
# my_hamilton_nodes.py

def url() -> Parallelizable[str]:
    for url_ in ['url_a', 'url_b']:
        yield url_


def url_loaded(url: str) -> str:
    return url


def counts(url_loaded: str) -> int:
    print(type(url_loaded)) # url_loaded seems to be a list not a string???
    return len(url_loaded.split("_"))


def total_words(counts: Collect[int]) -> int:
    return sum(counts)
is there a particular flag I need to set in the driver such that the input in
counts()
is actually a string. it seems to be coming in as a list I'm using this snippet
Copy code
import my_hamilton_nodes as mh
import pandas as pd

import my_hamilton_nodes
from hamilton import driver, base, telemetry
from hamilton.execution import executors

telemetry.disable_telemetry()

config = {}

dr = (
    driver.Builder()
    .with_modules(my_hamilton_nodes)
    .enable_dynamic_execution(allow_experimental_mode=True)
    .with_local_executor(executors.SynchronousLocalTaskExecutor())
    .with_config(config)
    .build()
)

output_columns = [
    'counts'
]


out = dr.execute(output_columns)
print(out)
but it fails with File "my_path/hamilton-playground/my_hamilton_nodes.py", line 64, in counts return len(url_loaded.split("_"))
AttributeError: 'list' object has no attribute 'split'
e
This looks correct at first glance
Reproducing…
OK, so what’s happening is that you’re executing something within the
Parallel
block.
count
gets run n times (one for each url). Asking for it as an output is technically “undefined behavior” (we should error out that is), and I think becuase there’s no
Collect
its not treting it as such. If you run it with
total_words
as the output it’ll work.
m
oh, wow, glad you pointed me to this, otherwise I'd spend a few more hours on this scratching my head. 'total_words' does this trick and my console output show
Copy code
<class 'str'>
<class 'str'>
{'total_words': 4}
and everything works!
e
Great! Yeah this is a case of “there should be an error message”
m
ok, so it's not all in vain, happy to provide this user feedback 🙂
e
Yep! Thanks for this. Tracked it here — the umbrella issue for guard-rails — there are a few edge cases that have strange behavior: https://github.com/DAGWorks-Inc/hamilton/issues/301
🙏 1
m
@Elijah Ben Izzy if you're curious, I got a decent 1st working draft that seems quite felxible https://github.com/kzk2000/hamilton-playground next step would be to move some of the aggregations into separate Hamilton nodes and also
@resolve
that's my next goal...
🔥 1
s
@miek nice! The example you come up with could be a good candidate to add to hub.dagworks.io, since I’m sure you won’t be the last person to think/ask for something similar.
m
certaily happy to eventually. for now, I'll stay in my repo as I wanna do a few more things. I'm really hoping I can leverage @resolve to create a generic version of idea is 1 func to split by groups
Copy code
def ticker_df(stock_data: pd.DataFrame) -> Parallelizable[pd.DataFrame]
then <many nodes here all using ticker_df as input and do various aggregations 1 func to merge everything back together, likely a @resolve version of
Copy code
def final_stats(ticker_aggs: Collect[pd.DataFrame]) -> pd.DataFrame:
    df = pd.concat(ticker_aggs, ignore_index=True)
    return df
👍 1
s
Just incase it’s useful inject could also come in handy…
> I’ll stay in my repo as I wanna do a few more things. yep, yep. adding to the hub wouldn’t be the first priority 🙂
e
Yep, that sounds quite reasonable. One thing you’ll want to think through is how to pass the data to the intermediate nodes (between parallel). Just to be clear, you’re parallelizing over the unknowns (each region or ticker?). And then you have the aggregations defined as separate hamilton DAGs? And you’ll have multiple instances of the parallelizable/collect pairs?
m
@Elijah Ben Izzy yes, I parallelize over the unkowns, in this case, # of tickers. However, I don't seem to be able to re-use the spawned
Parallelizable
multiple times as the 1st time it gets processed by
Collect
it disappears from some cache. Getting this error
File "[...]/anaconda3/envs/py310/lib/python3.10/site-packages/hamilton/execution/state.py", line 113, in read
raise KeyError(f"Key {formatted_key} not found in cache")
KeyError: 'Key total_words2 not found in cache'
To reproduce, run this toy example: https://github.com/kzk2000/hamilton-playground/blob/main/toy_run.py it has two outputs 'total_words and 'total_words2' which are both doing the same trying to
Collect
from the same upstream is this not allowed?
Copy code
# toy_nodes.py
from hamilton.htypes import Parallelizable, Collect

def urls() -> Parallelizable[str]:
    for url_ in ['url_a', 'url_b']:
        yield url_

def counts(urls: str) -> int:
    return len(urls.split("_"))

def total_words(counts: Collect[int]) -> int:
    return sum(counts)

def total_words2(counts: Collect[int]) -> int:
    return sum(counts)
basically trying to do this: 1. create one Parallelizable for each ticker 2. then have many different nodes use it 3. then collect them all
btw, in my non-toy version, my Parallizable is a 1 pandas dataframe per ticker
Copy code
def ticker_df(stock_data: pd.DataFrame) -> Parallelizable[pd.DataFrame]:
    for ticker, df in stock_data.groupby('ticker'):
        yield df
e
Yep, so currently the one -> many link of parallelizable/collect is not supported. That said, I think you’ll really want to break it out TBH (this limitation or not). Multiple parallel/collect. It can give a sense of intention for the data flowing through and allow you to inject specified parameters. E.G. 1. For each computation/aggregation type 2. Do a parallelizable over ticker 3. Do a collect over the parallelizable 4. Then do a merge/join at the end This way. One thing you’ll want to do is share information from the parallelizable function to the consuming nodes — this is commonly done with a dataclass:
Copy code
class AggregationInput:
    data: pd.DataFrame
    agg_param_1: ...
    agg_param_2: ...
Which allows you to reuse subdags cleanly. Then you have a set of nodes (or a single node if its simple) for each parallelizable that listens to it, and join them all after the fact
The cool thing about doing this on a ticker basis is that the data sizes will (likely, making an assumption here), be roughly the same size, so your parallelization will make a ton of sense. You’ll want to use ray or create a custom joblib executor to do it (this is quite easy, and we need to add it into Hamilton)
m
I'm happy to follow your guidance here. Happy to break it out into multiple parallel/collect, one for each aggregation node. Do you happen to have a toy example of what you described above. I think I get it but need to experiment a bit. Are you suggesting to use a class
Copy code
class AggregationInput:
    data: pd.DataFrame
    agg_param_1: ...
    agg_param_2: ...
but then for each subdag, I only set exactly 1 of the agg_params, say agg_params_1 is set when I wanna do aggregation 1, and agg_params_2 is set when I do aggregation2 Is that what you meant?
@Elijah Ben Izzy and yes, using Ray is the plan eventually, though I probably have to think about serializing pd.DataFrame for distributed compute -- let's cross that bridge when we get there, wanna get local version ironed out first
e
Yeah so Ray should be able to handle serialization nicely — I wouldn’t worry about it until (and if) it becomes a problem. I don’t know about hte size of your data. You can design it the way you want, but the idea is that you need to figure out: • which of the aggregations are hardcoded (E.G. the aggregation type) • which of them are parameterized by the user/passed in Then you’d have to pass in the args you want. Something simple, for instance:
Copy code
@dataclasses.dataclass
class AggregationArguments:
    df: pd.DataFrame
    group_id: int # Or str?
    group_name: str 
    aggregation_arguments: Dict[str, Any] # unstructured
    
def ticker_df_for_rolling_close_bid_computation(stock_data: pd.DataFrame) -> Parallelizable[AggregationArguments]:
    for ticker, df in stock_data.groupby(ticker):
        yield AggregationArguments(
            df=df, 
            group_id=ticker, 
            group_name="ticker", 
            aggregation_arguments={'type' : 'avg', 'window' : '7d'})

def windowed_aggregation(ticker_df_for_rolling_close_bid_computation: AggregationArguments) -> pd.DataFrame:
    return ... # the apply part of the groupby

def all_windowed_aggregations(windowed_aggregation: Collect[pd.DataFrame]) -> pd.DataFrame:
    return concat(...) # the concat you had above
Note that the top can easily be parameterized in a few ways: 1. Parameterize over the Parallalizable/collect — this allows you to create multiple 2. Parameterize over the interior piece (the windowed aggregation) — you’d have to figure out how to pass the window numbers, but you could do one node per window size 3. Parameterize within the split (E.G. do one output per aggregation window/ticker combo…)
m
oh nice, this is powerful! and clean indeed, liking it. now this makes me wonder though, should I be shipping around full dataframes at all or just the tickers itself
e
Also you can use
subdag
in the middle if you want to reuse nodes if you want to express it using Hamilton.
Up to you — I don’t know the datasize, but from my experience working at a hedge fund its never huge?
m
and within each aggregation node I do an initial stock_data.query("ticker=={ticker}")
if it's daily data no issue, if it's tick trades & quotes data, it can get big (though I'd probably do the initial heavy lifting server-side in a time-series database like Clickhouse or similar)
all right, looks like I have a few things to ponder - thanks for all this!
e
I would pass the subset, personally. One option you can do if you want is to have a single function output all the groups (as a list), then have the subsequent parallelizable ones just loop over them:
Copy code
def grouped_by_ticker(data: pd.DataFrame) -> List[Tuple[int, pd.DataFrame]]:
    return list(data.groupby('ticker'))

def ticker_df_for_rolling_close_bid_computation(grouped_by_ticker: List[Tuple[int, pd.DataFrame]]) -> Parallelizable[AggregationArguments]:
    for ticker, df in grouped_by_ticker:
        yield ...
Yep! Fun stuff. I used to work at a hedge fund (Two Sigma) from 2014 -> 2019 so this is all extremely familiar to me 🙂 Hamilton was designed a lot around the experience there with time-series aggregation (which tend to be highly parameterized/specific in certain ways, and involve a ton of columns…)
m
good to know, similar background here. I'm not 100% clear on the benefit of
Copy code
def grouped_by_ticker(data: pd.DataFrame) -> List[Tuple[int, pd.DataFrame]]:
    return list(data.groupby('ticker'))
it would only be computed once in memory?
ie. each Parallelize just uses the same grouped_by_ticker() list
e
More just code-clarity TBH. You’ll want to ensure that all the places that output a set of groups use the same groups — hamilton can compute the groups then its cached. Makes adding a new aggregation section less expensive/less to think about. But its minor — up to you/what you want to work with. Conceptually closer to the way you initially thought about it (one
Parallelizable
function)
One thing to think about — anything you want to show up in the DAG should be a different `Parallelizable`/`Collect` piece — then you just go over all tickers, and use `resolve`/inputs/other stuff to ensure you pass in all the arguments appropriately
m
makes sense, thanks another question: per design above, for each new aggregation, I need to write 3 functions (parallelize/the actual calc/Collect) this is ok but new users need to be aware of this (say I'm working with a team and not all of them are Hamilton pros) Is there any way to make this even more compact, such that each aggregation becomes just one function?
above is ok for now, just asking to learn
e
Yeah so vanilla Hamilton requires that, but this is where some customizations on top of what we have could take you pretty far. One basic option is: 1. A list with all the parameterizations 2. A
parameterize
call with those — each one outputs a specific name 3. Write a function/subdag/something that processes it 4. A
collect
that mirrors (2) Then its two steps: 1. Implement the aggregation subdag 2. Add your aggregation name + fixed parameters to the global list. Dynamic parameters can also be declared, although you’ll have to be clever (this is where you can actually pass in all params/config and filter, or just filter the ones you want inside the parameterization function — lots of ways to do this. And, what’s cool, is that the user can request just the ones they need.
I’d recomend doing a few manually at first, then thinking about how to unify them/make the pattern easy/extensible
m
yes, agree, still early Hamilton days for me but so far I really like it... have to hop for now, will ponder the above, to be continued soon
thanks again!
e
Cool! Of course, hope this all helps.
m
it's very helpful for sure!
🙏 1
@Elijah Ben Izzy just tried a very basic toy2 example: https://github.com/kzk2000/hamilton-playground/blob/main/toy2_run.py with nodes in https://github.com/kzk2000/hamilton-playground/blob/main/toy2_nodes.py However, I'm running into a similar issue as earlier.
Copy code
@parameterize(
    data_agg1=dict(input_values=value(['url10_a', 'url10_b'])),
    data_agg2=dict(input_values=value(['url20_a', 'url20_b'])),
)
def data_node(input_values: List) -> Parallelizable[str]:
    for url_ in input_values:
        yield url_


def agg_1(data_agg1: str) -> int:
    print('**********\nagg_1')
    print(type(data_agg1))  # comes in as generator?
    print(list(data_agg1))
    return len(data_agg1.split("_"))  # type(data_agg1) == generator -> throws
[...]

@parameterize(
    output_agg1=dict(upstream_source=source('agg_1')),
    output_agg2=dict(upstream_source=source('agg_2')),
)
def output_node(upstream_source: Collect[int]) -> int:
    return upstream_source  # for testing, should be concat() ...
When asking for
output_agg1
, my
agg_1
node doesn't receive strings but the whole generator from my
data_agg1
node. Without
@parameterize
everythign works but when I add it (per our discussion above) things seem to behave differently and I'm not receiving the actual string items from the Parallizable
data_agg1
node Any ideas what I'm missing here?
cc: @Stefan Krawczyk who might also be interested
e
Hmm… will try out in a bit — on first glance it looks correct.
🙏 1
m
digging through the code a bit there's a section in grouping.py
Copy code
for collector in collectors:
    expander, nodes_in_block = self.nodes_after_last_expand_block(collector)
    # TODO -- add error message for conflicting groups...
    # if expander in visited:
    #     raise ValueError(f"Multiple collect nodes cannot trace "
    #                      f"back to the same expander. ")
    expander_name = f"expand-{expander.name}"
where
expander
becomes None once you add the @parameterize but frankly this low-level is a bit beyond my comfort zone, i.e. I don't have enough context what's happening under the hood.
so my hunch is
@parameterize
+
Parallelizable
are somehow stepping on each other
e
Hmm… That could be the case? Reproing real quick
🙌 1
So your hunch is looking good so far - its not registered as an expander.. (they show up differently on the DAG)
m
oh, cool idea to plot the visual - didn't think of that
e
Yeah — its an easy way to debug
m
ok, just to learn, just did the same on a working toy example. so the "expand" (green) and "collect" (red) pieces are missing and hence it breaks
easy fix on the backend?
e
Yep! Also the crow-feet connector
Looking in — need to figure out where its not figuring out that its a parallel/collect — I think the type is getting messed with somehow…
m
ok, sounds like this will require some careful thinking through...I let you to it. Will experiment a bit more with creating subdags for now
e
Ok, one line change 🙂
m
oh, nice!
e
A few things I’ll want to clean up longer-term but now that I’ve fixed it I’m going to get you a test hamilton version to play with
m
appreciate it
if it's really just 1 line change, happy to overwrite it locally on my end
e
Oh yeah, if you’re checking it out that’s easy
Here’s the branch — you can just check this out or copy/paste: https://github.com/DAGWorks-Inc/hamilton/pull/545
Easier for me to make some quick fixes to that one rather than push a new RC version
FWIW this might come up with a decorator or two. Should be easy for me to go through and do this type of thing the right way for everything
m
it worked! overwrote that line in my local packages
🔥 1
e
Great!
m
thanks for being so responsive while I'm doing my weekend hackings 🙂
really appreciate it
e
Yeah! Nice find. Thanks for finding bugs when I have the time to make quick fixes 😂
m
as mentioned before, I really wanna create a POC how Hamilton could replace my company's sql-only home-grown metrics cube. happy to share more details via private DM