tpnam
05/09/2024, 1:33 AMinput = json.loads(input_parameters)
df1 = load_from_s3(path=input['path_one'])
df2 = load_from_s3(path=input['path_two'])
df3 = some_transformer(df1, df2)
tpnam
05/09/2024, 1:46 AMStefan Krawczyk
05/09/2024, 2:17 AMload_from_s3
Stefan Krawczyk
05/09/2024, 6:16 AMfrom hamilton.htypes import Collect, Parallelizable
def input_path(input_parameters) -> Parallelizable[str]:
input = json.loads(input_parameters)
for path in input["paths"]:
yield path
def load_from_s3(input_path: str) -> pd.DataFrame:
...
return df
def some_transformer(load_from_s3: Collect[pd.DataFrame]) -> pd.DataFrame:
return pd.concat(load_from_s3) # it's list of dfs here.
Anything between Parallelizable & Collect is dynamically processed based on runtime values. (the broader section covers reuse/parameterization starting at about 28:27).tpnam
05/09/2024, 12:59 PM{
"loaders":[
{"type": "s3":, "path": "somepath"},
{"type": "snowflake":, "query": "somequery"}
]
}
it could then have a switch to delegate to the specific type it needs.Elijah Ben Izzy
05/09/2024, 1:39 PMParallelizable
or a bit of sophistication can help as well.tpnam
05/09/2024, 2:10 PMElijah Ben Izzy
05/09/2024, 2:31 PMn
until you run, or each of those corresponds to a source with semantic meaning…).
A bit of code in the following, but the TL;DR
• If your data sources are more static (E.G. you’re picking/choosing between jobs but have a limited set of files), use individual functions with @config.when
• If you have a very dynamic set of data sources, pass in a list and load them all in a node (or group them into sources, etc…)
• You can mix and match with “groups”
• You can use data loaders to make this more ergonomic + track provenance. If you build any custom ones then consider contributing back so others can use it!
(option 1) If you’re selecting from a set of possible “sources”
Use data loaders, one for each input file. You can configure the inputs, but this requires writing functions/copying for each one. Data loaders can take in parameters at runtime.
# you'll want to define/maybe contribute back
@load_from.s3(
path=source("load_path_1")
)
def loaded_data_1(loaded_1: ps.DataFrame) -> ps.DataFrame:
return loaded_data # or transform it
@load_from.s3(
path=source("load_path_2")
)
def loaded_data_2(loaded_2: ps.DataFrame) -> ps.DataFrame:
return loaded_data # or transform it
# you'll want to define/maybe contribute back
@load_from.s3(
path=source("load_path_1")
)
def loaded_data_3(loaded_3: ps.DataFrame) -> ps.DataFrame:
return loaded_data # or transform it
def joined(loaded_data_1: ps.DataFrame, loaded_data_2: ps.DataFrame, loaded_data_3: ps.DataFrame) -> ps.DataFrame:
... # do a join
Note that you can havera. config.when
statement to conditionally include them:
@load_from.s3(
path=source("load_path_1")
)
@config.when(include_1=True)
def loaded_data_1(loaded_1: ps.DataFrame) -> ps.DataFrame:
return loaded_data # or transform it
@load_from.s3(
path=source("load_path_2")
)
@config.when(include_2=True)
def loaded_data_2(loaded_2: ps.DataFrame) -> ps.DataFrame:
return loaded_data # or transform it
# you'll want to define/maybe contribute back
@load_from.s3(
path=source("load_path_1")
)
@config.when(include_3=True)
def loaded_data_3(loaded_3: ps.DataFrame) -> ps.DataFrame:
return loaded_data # or transform it
def joined(loaded_data_1: ps.DataFrame=None, loaded_data_2: ps.DataFrame=None, loaded_data_3: ps.DataFrame=None) -> ps.DataFrame:
... # do a join, account for nulls
Then in the driver:
dr = driver.Builder()....with_config({"include_1" : True, "include_2" : True}).build()
dr.execute([...], inputs={"load_path_1" : ..., ...}) # you can also hardcode these above, or hardcode a portion of the path...
This is optimal if they have a discrete semantic meaning and a fixed set of them. You can also use functions to load with a helper function if you don’t want to use data loaders (although we recommend them as it abstracts nicely). https://hamilton.dagworks.io/en/latest/concepts/best-practices/loading-data/. You’ll likely want to define your own that set the s3 namespace — should have some examples/links in there. Advantage is full provenance.
(option 2) If it’s more dynamic
Pass in lists and load them all:
def all_data(paths: List[str]) -> List[ps.DataFrame]:
# load all of them + join
def joined_data(all_data: List[ps.DataFrame]) -> ps.DataFrame:
...
This doesn’t show up as a bunch of nodes in the DAGs, but you get provenance. You can also use a custom loader to load a set of them. In this case you’d define a loader that accepts a list of files as the parameter (named paths
) and outputs a list of dataframes:
@load_from.s3_multiple(
paths=source("input_paths")
)
def joined_data(dfs: List[ps.DataFrame]) -> ps.DataFrame:
return ...
# driver file
dr = driver.Builder(...)....build()
dr.execute(..., inputs={"input_paths": [...]}) # execute joined_data or something upstream, pass in all paths
You can also have multiple joined_data
for the fixed set of “groups” or however you want to do it.
IMO these are two very clean options — it really depends on how dynamic you want it to be.
I don’t have the code for custom data loaders here (you can follow the link earlier), but happy to jot down some notes if it would be helpful!Stefan Krawczyk
05/09/2024, 4:35 PM@extract_fields({"s3_sources": list[str], "snowflake_sources": list[str]})
def load_configs(config_source: str) -> dict:
sources = json.loads(config_source)
# split sources
return {
"s3_sources": list_of_s3_locations,
"snowflake_sources": list_of_snowflake_sources,
}
def joined_s3(s3_sources: list[str]) -> ps.DataFrame:
# return joined pyspark df / whatever object makes sense
return ...
def joined_snowflake(snowflake_sources: list[str]) -> ps.DataFrame:
# return joined pyspark df / whatever object makes sense
return ...
def joined_ds(joined_s3: ps.DataFrame, joined_snowflake: ps.DataFrame) -> ps.DataFrame:
# do joins etc
return ...
Now the above functions load data without using the data loaders.
But you could do (implementing s3_multiple yourself):
@load_from.s3_multiple(
paths=source("s3_sources"), # this would use the runtime output
...
)
def joined_s3(dfs: List[ps.DataFrame]) -> ps.DataFrame:
return ...
Just to mention the value add of the @load_from way is that if you were to track things with the Hamilton UI, then the loaders expose metadata that you can then track there, e.g. what sources were loaded for a particular run.