Hello, is there a convenient way to execute the sa...
# hamilton-help
t
Hello, is there a convenient way to execute the same function multiple times with different parameters if they are only known at runtime? example if running without hamilton:
Copy code
input = json.loads(input_parameters)
df1 = load_from_s3(path=input['path_one'])
df2 = load_from_s3(path=input['path_two'])
df3 = some_transformer(df1, df2)
đź‘€ 1
and as an extension what if I have dataframes 1 through N, and the transformer needs to wait on all of them. Note: N is also determined at runtime, not known at compile time.
s
Thanks & welcome @tpnam! There are a couple of ways. Clarifying question, are the dataframes all going through some common processing? For example, in the example above all dataframes would be processed by
load_from_s3
So assuming yes: One way is with Parallel + Collect
Copy code
from hamilton.htypes import Collect, Parallelizable

def input_path(input_parameters) -> Parallelizable[str]:
    input = json.loads(input_parameters)
    for path in input["paths"]:
        yield path

def load_from_s3(input_path: str) -> pd.DataFrame:
    ...
    return df

def some_transformer(load_from_s3: Collect[pd.DataFrame]) -> pd.DataFrame:
    return pd.concat(load_from_s3) # it's list of dfs here.
Anything between Parallelizable & Collect is dynamically processed based on runtime values.

You can see me explain it hereâ–ľ

(the broader section covers reuse/parameterization starting at about 28:27).
t
ok, so if I have arbitrary sources (ex: load_from_snowflake, load_from_postgres, etc.) and I would need a generic load_from function to use with the collector? example input:
Copy code
{
  "loaders":[
    {"type": "s3":, "path": "somepath"},
    {"type": "snowflake":, "query": "somequery"}
  ]
}
it could then have a switch to delegate to the specific type it needs.
e
OK, so it looks like you have a set of sources that you want to bring from a config, right? Some more quick qs: 1. Are these going to change every time? Or are there a fixed set (with different URLs)? 2. Are there multiple instances of loading from the same source? 3. Are there any other operations you want to perform on top of it? If all you want to do is select from a pre-configured set of loaders, you can use functions easily. Otherwise
Parallelizable
or a bit of sophistication can help as well.
t
1. There are a bunch of different jobs with structure similar to the example input above. 2. Same source (ex: s3), but different paths. 3. Yes, loaders just need to get from source and save as a temp stage (spark). a. Transformers just execute spark sql statements, don't need output from loader at this point in time. Use case: looking to refactor some spark jobs (no pandas/polars), and wanted dag based micro level orchestration so I can chain various steps more easily to address changing requirements and handle new hooks/integrations. Was looking to see if I could use hamilton instead of implementing the dags myself.
e
Ok, i think I get it. Big question is whether its a flexible number (E.G. you’ll need to run n loads from s3 where you don’t konw
n
until you run, or each of those corresponds to a source with semantic meaning…). A bit of code in the following, but the TL;DR • If your data sources are more static (E.G. you’re picking/choosing between jobs but have a limited set of files), use individual functions with
@config.when
• If you have a very dynamic set of data sources, pass in a list and load them all in a node (or group them into sources, etc…) • You can mix and match with “groups” • You can use data loaders to make this more ergonomic + track provenance. If you build any custom ones then consider contributing back so others can use it! (option 1) If you’re selecting from a set of possible “sources” Use data loaders, one for each input file. You can configure the inputs, but this requires writing functions/copying for each one. Data loaders can take in parameters at runtime.
Copy code
# you'll want to define/maybe contribute back
@load_from.s3(
    path=source("load_path_1")
)
def loaded_data_1(loaded_1: ps.DataFrame) -> ps.DataFrame:
    return loaded_data # or transform it

@load_from.s3(
    path=source("load_path_2")
)
def loaded_data_2(loaded_2: ps.DataFrame) -> ps.DataFrame:
    return loaded_data # or transform it

# you'll want to define/maybe contribute back
@load_from.s3(
    path=source("load_path_1")
)
def loaded_data_3(loaded_3: ps.DataFrame) -> ps.DataFrame:
    return loaded_data # or transform it

def joined(loaded_data_1: ps.DataFrame, loaded_data_2: ps.DataFrame, loaded_data_3: ps.DataFrame) -> ps.DataFrame:
    ... # do a join
Note that you can havera.
config.when
statement to conditionally include them:
Copy code
@load_from.s3(
    path=source("load_path_1")
)
@config.when(include_1=True)
def loaded_data_1(loaded_1: ps.DataFrame) -> ps.DataFrame:
    return loaded_data # or transform it

@load_from.s3(
    path=source("load_path_2")
)
@config.when(include_2=True)
def loaded_data_2(loaded_2: ps.DataFrame) -> ps.DataFrame:
    return loaded_data # or transform it

# you'll want to define/maybe contribute back
@load_from.s3(
    path=source("load_path_1")
)
@config.when(include_3=True)
def loaded_data_3(loaded_3: ps.DataFrame) -> ps.DataFrame:
    return loaded_data # or transform it


def joined(loaded_data_1: ps.DataFrame=None, loaded_data_2: ps.DataFrame=None, loaded_data_3: ps.DataFrame=None) -> ps.DataFrame:
    ... # do a join, account for nulls
Then in the driver:
Copy code
dr = driver.Builder()....with_config({"include_1" : True, "include_2" : True}).build()
dr.execute([...], inputs={"load_path_1" : ..., ...}) # you can also hardcode these above, or hardcode a portion of the path...
This is optimal if they have a discrete semantic meaning and a fixed set of them. You can also use functions to load with a helper function if you don’t want to use data loaders (although we recommend them as it abstracts nicely). https://hamilton.dagworks.io/en/latest/concepts/best-practices/loading-data/. You’ll likely want to define your own that set the s3 namespace — should have some examples/links in there. Advantage is full provenance. (option 2) If it’s more dynamic Pass in lists and load them all:
Copy code
def all_data(paths: List[str]) -> List[ps.DataFrame]:
    # load all of them + join

def joined_data(all_data: List[ps.DataFrame]) -> ps.DataFrame:
    ...
This doesn’t show up as a bunch of nodes in the DAGs, but you get provenance. You can also use a custom loader to load a set of them. In this case you’d define a loader that accepts a list of files as the parameter (named
paths
) and outputs a list of dataframes:
Copy code
@load_from.s3_multiple(
    paths=source("input_paths") 
)
def joined_data(dfs: List[ps.DataFrame]) -> ps.DataFrame:
    return ...

# driver file
dr = driver.Builder(...)....build()
dr.execute(..., inputs={"input_paths": [...]}) # execute joined_data or something upstream, pass in all paths
You can also have multiple
joined_data
for the fixed set of “groups” or however you want to do it. IMO these are two very clean options — it really depends on how dynamic you want it to be. I don’t have the code for custom data loaders here (you can follow the link earlier), but happy to jot down some notes if it would be helpful!
s
@tpnam as I mentioned, there’s a few ways! Hopefully we haven’t overwhelmed you with options. To sketch some more code out, if the loading of data is going to be mixed and you don’t know ahead of time you could segment it at runtime, so you could do something like this at the beginning and connect it with what @Elijah Ben Izzy mentioned
Copy code
@extract_fields({"s3_sources": list[str], "snowflake_sources": list[str]})
def load_configs(config_source: str) -> dict:
   sources = json.loads(config_source)
   # split sources
   return {
      "s3_sources": list_of_s3_locations,
      "snowflake_sources": list_of_snowflake_sources,
   }

def joined_s3(s3_sources: list[str]) -> ps.DataFrame:
   # return joined pyspark df / whatever object makes sense
   return ...

def joined_snowflake(snowflake_sources: list[str]) -> ps.DataFrame:
   # return joined pyspark df / whatever object makes sense
   return ...

def joined_ds(joined_s3: ps.DataFrame, joined_snowflake: ps.DataFrame) -> ps.DataFrame:
   # do joins etc
   return ...
Now the above functions load data without using the data loaders. But you could do (implementing s3_multiple yourself):
Copy code
@load_from.s3_multiple(
    paths=source("s3_sources"),   # this would use the runtime output
    ...
)
def joined_s3(dfs: List[ps.DataFrame]) -> ps.DataFrame:
    return ...
Just to mention the value add of the @load_from way is that if you were to track things with the Hamilton UI, then the loaders expose metadata that you can then track there, e.g. what sources were loaded for a particular run.