This message was deleted.
# hamilton-help
s
This message was deleted.
👀 1
s
Would it be possible to have Hamilton write all the nodes upstream of what needs to be included to a by the user specified path?
What do you mean by this specifically? How would you want to use this?
Assume a DAG a -> b -> c and I call dr.execute([‘c’]) or dr.execute([‘a’, ‘b’, ‘c’]). I want Hamilton to compute
a
, store its result and read in back to continue computation for
b
, and so on. Does that make sense?
Yes this is similar to the current CachingGraphAdapter functionality if you annotate the function to be cached — you/we would just need to write a serializer for pyspark. Would you specifically annotate the nodes to cache? Or would you want it done automatically?
But yeah the challenge with spark is figuring out when to materialize things so this is an interesting problem — and how to best frame it in terms of the desired workflow. E.g. is it more like “check pointing”? and you can resume computation from some spot? or?
r
Would you specifically annotate the nodes to cache? Or would you want it done automatically?
I think automatically cache everything upstream (including) the final_vars would be useful
But yeah the challenge with spark is figuring out when to materialize things so this is an interesting problem — and how to best frame it in terms of the desired workflow.
E.g. is it more like “check pointing”? and you can resume computation from some spot? or?
When cache is turned on I would just materialize/cache every node (in my case that is a df). This could slow things down a bit when you are only interested in the final_vars but for debugging or rerunning things you get the benefits
s
yep I was going to say, this would come at the cost of speed. e.g. you’d want to persist/cache (using pyspark), then write, then continue for each “function”. Without persisting/caching spark would recompute everything IIRC.
I think we could quickly prototype what you want with https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/experimental/h_cache.py i.e. implement two functions:
Copy code
@write_parquet.register(ps.DataFrame)
def write_parquet_pd1(data: ps.DataFrame, filepath: str, name: str) -> None:
    """Writes a pyspark dataframe to a parquet file."""
    data.write.parquet(filepath, mode="overwrite")

@read_parquet.register(ps.DataFrame)
def read_parquet_pd1(data: ps.DataFrame, filepath: str, spark) -> pd.DataFrame:
    """Reads a dataframe from a parquet file."""
    return spark.read.parquet(filepath)
we’d need to modify https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/experimental/h_cache.py#L190 to have the spark context too and pass that into the function via
_read_cache
as extra kwargs or something. And then we’d need to tag nodes to start.
🙌 1
r
Nice! Would it be possible to load the node directly after writing such that it will be read in stead of recomputed for downstream usage? E.g. a -> b After caching a you directly read in again such that a is not recomputed when computing b but in stead read from disk.
s
Well this is where I think you could just do
df.persist()
or I guess modify the way the caching works, by reading from it right away?
as in, in the write function stick in a
df.persist()
before writing?
or we have a
read_after_write
flag to read it right after writing…
Okay I updated the PR with the flag
👌 1