This message was deleted.
# hamilton-help
s
This message was deleted.
e
Hey! Can you explain what you mean more/show an example of something that doesn’t currently work but you’d like to?
w
this seems to be not allowed
Copy code
@load_from.json("A")
@load_from.json("B")
def function(A, B):
Copy code
.venv/lib/python3.9/site-packages/hamilton/function_modifiers/base.py:113: in __call__
    raise ValueError(
E   ValueError: Got multiple decorators for decorator @<class 'hamilton.function_modifiers.adapters.LoadFromDecorator'>. Only one allowed.
from the doc it seems load_from is expected to inject one only.
e
Ahh! I see. Yes, its currently not allowed, mostly due to simplicity of implementation, and not, say, an inherent reason.
It should theoretically be able to do it with the
inject
variable — mind opening up a ticket?
Otherwise there’s always the workaround to have them separate and join later, but its not as succinct.
w
sorry first time messaging in this channel
where do I go to create a ticket?
Github issues?
e
Then I can look into it. On first glance though I don’t think there’s any reason it wouldn’t work
first time opening an issue too
🙌 1
sorry if it's too simple. I don't know the repo enough to provide further suggestion
e
This is perfect, thank you! I’ll see if I can slot this in shortly — in the meanwhile, this is more a matter of convenience than a blocker, right?
w
well it is a blocker for my use case. I have multiple upstream nodes with expensive computation that I will have to save and load.
e
OK, so I think a valid workaround (however ugly) is:
Copy code
@load_from.json(...)
def A(data) -> ...:
    ...

@load_from.json(...)
def B(data) -> ...:
    return data

def function(A, B):
    ...
But yep, I’ll work on it soon 🙂.
w
haha yeah I wouldn't want that since I will create so many util functions
Im happy to contribute too if needed but I would probably need some pointers
e
Fair enough — kind of a pain 😕 OK, i”ll take a look. I’d happily give you pointers, but it might be a little complex to just go and do, given that the decorators have a decent amount of knowledge to them.
w
alrighty I will leave it to you then
thanks!
e
Will let you know as soon as I have an ETA — thanks for your patience! Wrote out my thoughts on how to do it on the issue 🙂 If you’re feeling brave feel free to jump in, but this is the most complex piece of the hamilton codebase. Its a lot of DAG transformations (conceptually each decorator does some sort of transformation on the DAG/functions it runs), so its not the most intuitive… https://github.com/DAGWorks-Inc/hamilton/issues/219
Hey @Wentao Lu — I’ve implemented this 🙂 https://github.com/DAGWorks-Inc/hamilton/pull/221 Still need to merge/maybe touch up a bit, but if you want to try it out you can install this branch of hamilton. A little bit of extra documentation, but the trick is that you need multiple loaders, each with
inject_
, basically what you specced out.
pip install git+<https://github.com/dagworks-inc/hamilton.git@multiple-load-from>
Otherwise I’ll plan to get it out in a new version shortly.
👍 1
w
thanks for such a quick update!
🫡 1
@Elijah Ben Izzy Hey Elijah, I was trying to use this feature today but I ran into this
Copy code
e.__call__(self, fn)
    111 if hasattr(fn, self.get_lifecycle_name()):
    112     if not self.allows_multiple():
--> 113         raise ValueError(
    114             f"Got multiple decorators for decorator @{self.__class__}. Only one allowed."
    115         )
    116     curr_value = getattr(fn, lifecycle_name)
    117     setattr(fn, lifecycle_name, curr_value + [self])

ValueError: Got multiple decorators for decorator @<class 'hamilton.function_modifiers.adapters.LoadFromDecorator'>. Only one allowed.
Copy code
Requirement already satisfied: sf-hamilton==1.23.2 in /local_disk0/.ephemeral_nfs/envs/pythonEnv-799ca980-f267-48bf-85bf-5c3956a3d512/lib/python3.9/site-packages (from renaissance==88b88d3) (1.23.2)
e
Hmm — this is supposed to work now… Do you have some code you ran that will reproduce this?
w
let me cook up sth
e
🙏 yeah I can debug pretty quickly
So, this pretty bare-bones example works (for me) — curious how yours differs/what edge-cases I might have missed
Copy code
from hamilton.function_modifiers import load_from, value
from hamilton import driver
from hamilton.ad_hoc_utils import create_temporary_module

@load_from.json(path=value("./test.json"), inject_="data_1")
@load_from.json(path=value("./test.json"), inject_="data_2")
@load_from.json(path=value("./test.json"), inject_="data_3")
def foo(data_1: dict, data_2: dict, data_3: dict) -> int:
    return 1


if __name__ == "__main__":
    mod = create_temporary_module(foo)
    dr = driver.Driver({}, mod)
    print(dr.execute(["foo"]))
w
can you parameterize on top?
e
What do you mean?
w
my use case is something like
Copy code
@paremterize(
 xxx
)
@load_from.json(path=value("./test.json"), inject_="data_2")
@load_from.json(path=value("./test.json"), inject_="data_3")
probably that's why?
e
Ahh — interesting
Let me see — I don’t think that’s possible although I can’t say for sure until I try it out
w
parameterize is a node expander
it might be doing something weird when applied on top
e
Yeah, so its not allowed yet. I actually got a different error…
w
what error did you get?
e
Copy code
ValueError: Cannot call NodeExpander: <class 'hamilton.function_modifiers.expanders.parameterize'> on more than one node. This must be called first in the DAG. Called with [<foo {'module': 'temporary_module_57ee4391_b294_4c47_bc9c_fc388ace1822'}>, <load_data.foo.data_1 {'hamilton.data_loader': True, 'hamilton.data_loader.has_metadata': True, 'hamilton.data_loader.source': 'json', 'hamilton.data_loader.classname': 'JSONDataAdapter', 'hamilton.data_loader.node': 'data_1'}>, <data_1 {'hamilton.data_loader': True, 'hamilton.data_loader.has_metadata': False, 'hamilton.data_loader.source': 'json', 'hamilton.data_loader.classname': 'JSONDataAdapter', 'hamilton.data_loader.node': 'data_1'}>, <load_data.foo.data_2 {'hamilton.data_loader': True, 'hamilton.data_loader.has_metadata': True, 'hamilton.data_loader.source': 'json', 'hamilton.data_loader.classname': 'JSONDataAdapter', 'hamilton.data_loader.node': 'data_2'}>, <data_2 {'hamilton.data_loader': True, 'hamilton.data_loader.has_metadata': False, 'hamilton.data_loader.source': 'json', 'hamilton.data_loader.classname': 'JSONDataAdapter', 'hamilton.data_loader.node': 'data_2'}>]
w
that error makes more sense
e
A little more — your error is the one I quashed earlier which is doubly-confusing…
w
Im doing a lot of wild stuff
might be hitting your edge cases
e
Ha!
Just so we’re on the same page — mind trying this?
python3 -c "import hamilton; print(hamilton.version.VERSION)"
Sometimes versions are weird
w
let me try that
e
It should print
(1,23,2)
w
ah I think pyenv messed up
now im getting your errors
e
OK good
If you’re doing a lot of wild stuff, we actually have a ton of different power-user-features for shaping the DAG off of a config — I’d be surprised if there wasn’t something that made what you’re trying to do easy. That said, I’d be very curious to know what you’re trying to do at a high level (or something equivalent) — can give you a sense of how I might model it.
w
so I did two things I believe might not be expected from the lib's perspective
because I want to shield our data scientists from hamilton details, I created this decorator for them to use load_from
Copy code
def load_input(inject_name: str):
    def decorator(func):
        wrapped = load_from.deltalake(
            dag_env=source("dag_env"),
            datestr=source("datestr"),
            db_name=source("db_name"),
            db_suffix=source("db_suffix"),
            spark=source("spark"),
            table=source("table"),
            inject_=inject_name,
        )(func)

        # Overriding the signature with the original for the inspection purpose.
        wrapped.__signature__ = signature(func)

        return wrapped

    return decorator
and you can tell the second thing from this code: I created a deltalake adaptor
e
Ahh awesome, so, nothing you’re doing is crazy, although I’d recommend using
functool.wraps
instead of
wrapped.___signature___
, as it delegates to python. That said, did both of these work?
w
I just started testing today haha. I will let you know
e
Great 🙂
Then the parameterization — what specifically are you trying to parameterize? Is it which files get loaded? Or some other parameter?
w
other params that changes the code behavior.
hmm
can I parameterize first and then load_from?
does injector work on multiple nodes?
e
So yeah, you can, but it would have to be two separate functions
w
I can certainly duplicate
that's easier
e
E.G. return all the data then parameterize over it
So just to be clear, its loading the same set of data, then doing different things with them?
w
yeah
e
So the interesting thing is that the parameterize decorator, even if it could work on multiple, wouldn’t actually be smart enough to know what to duplicate, so it might end up reloading the same thing
So yeah, breaking it into two makes sense?
w
yeah
I have another thing that you might know a better solution. I'm building a really wide table with 200+ features on it. I want to make these features independent so I created functions for all of them. but the assumedly function that creates the table would need to pass in 200 args
while that's fine, it's a bit verbose
the reason they need to be passed in is that they needs to be joined on different keys
e
OK, so there’s an open issue that would make this really easy, although we haven’t decided to do it yet: https://github.com/DAGWorks-Inc/hamilton/issues/51. That said, I think its easier now — you can use
@inject
which can inject your nodes in, either as one function or as a group. https://hamilton.dagworks.io/en/latest/reference/decorators/inject/#hamilton.function_modifiers.inject You could then do that in a for-loop. and if you’re feeling fancy, use the names of the functions from the other module to form the parameters for
inject
w
yeah I was thinking about tags too
e
So if they all have different join keys, you might want to group them by join keys, and have each one of those be a parameter to
inject
w
read the code and realized it doesn't support it
e
Yeah, feel free to comment, that’s something we’d definitely think about
w
this is really cool
🙂 1
e
Yeah these decorators were very fun to build 🙂 But basically, injecting could be done dynamically by just looking through the module of all the functions and searching cleverly, or having a set of constants (easier)
w
🆗
I really appreciate what your team is doing
great work
e
Glad to hear!! Means a lot. Happy to keep answering questions and building features 🙂
w
Hey Elijah, when will the cached graph adaptor release 😄
found it in the repo and doc but it's not released yet
very excited to try it out
e
Was about to release actually in the next 20 mins 🙂 feel free to install that version though and try it out! If you just install from
main
you’re good to go.
w
👌
e
Otherwise caching is absolutely something we’re going to be improving so feedback is welcome
w
just saw
thanks!
👍 1
there's a typo in the doc though
Copy code
import nodes

adapter = h_cache.CachingAdapter(cache_path, base.PandasDataFrameResult())
it should be
CachingGraphAdapter
e
Womp, fixing 🙏
Fixed! Docs are building, but repo is fixed.
w
👌 thanks!
I have question about one thing is not very clear from the doc
say if I have a node called
compute
that's cached with this adaptor but it's also overrode from
execute(override=[...])
which one takes precedence?
e
Override should take precedence — due to the nature of how this is implemented (A graph adapter can wrap
execute()
which is bypassed if there’s no override)
I can run a quick test to double-check soon — or if you find that it doesn’t work like that let me know and I can dig in
w
that makes sense
thanks!
e
Yep!
w
Hey Elijah I see a interesting performance issue that you might have clue of
👀 1
I have a graph thats defined to join 200 features together
with the same backbone index
they are read from several tables and with some light processing
the thing is when I try to run this in spark, it kind of hangs and doesnt seem to do any work for really long time before the job shows up in spark UI
is it because the complexity of the graph?
e
Possibly — would have to see some code and do a repro… happy to debug, do you have something simple that could help me get started? Possibly something that could go on an issue?
w
hmm
it's a bit hard to do that
e
Otherwise we can run some pretty basic profiling
w
is there any way to track how long it took to traverse/build the graph?
e
You have to first use
cProfile
— but it’ll give you the info you need
I’m about to be in a quick call then I can hop on and help — mind taking a stab at initial profiling, and I can help narrow down/identify?
w
yeah let me play around it
🙌 1
e
@Wentao Lu any luck? Happy to hop on a call at some point today to help you look through and see if we can boil it down to a concrete bug.
w
Im going to see if it's spark query plan first
the migration split joins between tables into joins between features which are selected from these tables
couldve been a overly complicated query plan
I thought spark would optimize these
e
Yeah I assumed they would as well — would the complex query plan come from the sheer number of functions (200ish?)
w
let me put it this way
it was
Df1.join(df2, xx).join(df3, xxx)
now it's
feature.join(feature2, x).join(feature3, x) ... (x200) and def feature_x: df.select(col, backbone)
e
Ahh got it
w
it's more modular now
as the features are independent
but this query plan is a bit too complex Im afraid
e
Hmm I wonder if we can get the best of both worlds
Just to be clear, how exactly are you utilizing spark with hamilton? E.g. what does the driver look like?
w
so the nodes are defined to return spark dataframes
and I'm using a
CachingGraphAdapter
e
Ahh ok — so each node returns a spark dataframe, and then it just joins them all, right? In the final function? Mind writing a little pseudo code just to demonstrate (with two features and a join) so we’re all on the same page?
w
yeah
as the evaluation is lazy, all the computation starts at the final node from
final_vars
sparkSession is passed in as a node
e
Got it. Two questions: 1. What does the caching adapter get you? 2. What does the join look like (IIRC you used
inject
?)
w
we can't run the whole thing as a single spark job therefore we will have to run it with multiple steps
the caching lets us to write intermediate steps to a location that can be read by downstream later
e
Ahh nice, that makes sense. Glad its useful!
w
not sure what the second question is about
e
Ok, so you have one spark dataframe for each feature
Then at some point you do a join, no?
w
yeah
e
What does that join step look like? I’m just wondering if there’s an easy way to optimize.
w
Copy code
result: DataFrame = T.pipe(
        T.reduce(
            lambda df1, df2: df1.join(df2, on=[BONE_INDEX], how="left"),
            [
                feature_1,
                feature_2,
                feature_3,
...
e
Do you have time to hop on a call and walk through this? Otherwise happy to keep debugging.
w
sure
e
s
@Wentao Lu not sure what you covered with Elijah. But it seems like you could reuse https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/spark/pyspark_udfs for the map based operations you’re doing.
w
I do run filters and aggregations that need to be accommodated
I'm thinking whats the best way for me to unblock myself
We discussed and short-term wise I could just make the graph smaller
s
Oh right. We just synced, and have a few ideas on how to incorporate those needs more ergonomically.
w
break a big one into multiple smaller ones
s
yup — so rather than one big DAG, create X smaller ones, and then “manually” join them?
e
Yeah so @Wentao Lu I’m hoping to make something that can really cleanly represent/efficiently run groups of UDFs as soon as possible — I think this would ge there majority of your cases/narrow down the space significantly. Then when you’ve broken it into smaller pieces it’ll still work nicely, and you’ll be able to take those smaller ones and group them together.
Also I had another thought — I’m curious if the query plan difficulty comes from it coming from multiple sources, where you know the index is the same but spark doesn’t (as it has no idea about indices), so it’s not partitioned the same. Worth trying out the multiple tables approach to as if it can optimize?
w
yeah I'm going to limit to features from one table just to try it out
👍 1