Slackbot
03/01/2024, 3:46 PMStefan Krawczyk
03/01/2024, 4:33 PMWould it be possible to have Hamilton write all the nodes upstream of what needs to be included to a by the user specified path?What do you mean by this specifically? How would you want to use this?
Assume a DAG a -> b -> c and I call dr.execute([âcâ]) or dr.execute([âaâ, âbâ, âcâ]). I want Hamilton to computeYes this is similar to the current CachingGraphAdapter functionality if you annotate the function to be cached â you/we would just need to write a serializer for pyspark. Would you specifically annotate the nodes to cache? Or would you want it done automatically?, store its result and read in back to continue computation fora
, and so on. Does that make sense?b
Stefan Krawczyk
03/01/2024, 5:01 PMRoel Bertens
03/01/2024, 5:21 PMWould you specifically annotate the nodes to cache? Or would you want it done automatically?I think automatically cache everything upstream (including) the final_vars would be useful
Roel Bertens
03/01/2024, 5:23 PMBut yeah the challenge with spark is figuring out when to materialize things so this is an interesting problem â and how to best frame it in terms of the desired workflow.
E.g. is it more like âcheck pointingâ? and you can resume computation from some spot? or?When cache is turned on I would just materialize/cache every node (in my case that is a df). This could slow things down a bit when you are only interested in the final_vars but for debugging or rerunning things you get the benefits
Stefan Krawczyk
03/01/2024, 5:27 PMStefan Krawczyk
03/01/2024, 5:36 PM@write_parquet.register(ps.DataFrame)
def write_parquet_pd1(data: ps.DataFrame, filepath: str, name: str) -> None:
"""Writes a pyspark dataframe to a parquet file."""
data.write.parquet(filepath, mode="overwrite")
@read_parquet.register(ps.DataFrame)
def read_parquet_pd1(data: ps.DataFrame, filepath: str, spark) -> pd.DataFrame:
"""Reads a dataframe from a parquet file."""
return spark.read.parquet(filepath)
weâd need to modify https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/experimental/h_cache.py#L190 to have the spark context too and pass that into the function via _read_cache
as extra kwargs or something.
And then weâd need to tag nodes to start.Stefan Krawczyk
03/01/2024, 5:46 PMRoel Bertens
03/01/2024, 7:15 PMStefan Krawczyk
03/01/2024, 7:17 PMdf.persist()
or I guess modify the way the caching works, by reading from it right away?Stefan Krawczyk
03/01/2024, 7:18 PMdf.persist()
before writing?Stefan Krawczyk
03/01/2024, 7:23 PMread_after_write
flag to read it right after writingâŚStefan Krawczyk
03/01/2024, 7:27 PM