Slackbot
11/18/2022, 9:17 PMStefan Krawczyk
11/18/2022, 9:35 PMfrom hamilton.function_modifiers import extract_columns
# -- could be in a data_loaders.py module -- #
def source_data1_df(filename1: str) -> pd.DataFrame:
# load source data1 DF
return df
def source_data2_df(filename2: str) -> pd.DataFrame:
# load source data2 DF.
return df
@extract_columns('COL1', 'COL2', ...)
def merged_data_fx_df(source_data1_df: pd.DataFrame, source_data2_df: pd.DataFrame) -> pd.DataFrame:
# logic to merge
return merged_df
# -- could be in a more general transforms.py module -- #
def transform_col1(COL1: pd.Series) -> pd.Series:
# transform COL1
...
In Hamilton you have a few options.
You could
1. Create functions to load the data that return dataframes, and then write functions that depend on those to merge/map dataframes. Then have transform functions that work from the basis of that merged dataframe. This is what I’ve stubbed out in the code above. The driver would then only need to be passed configuration to know where to load the dataframes…
2. You could do the merging/mapping outside of hamilton, and then pass in the merged dataframe to the driver. The transform logic functions would operate on the basis of those inputs. In terms of what do you actually pass to the driver, you can write a function that extracts the columns from the dataframe, or pass in the individual columns.
In terms of the two options, (1) is a superset of (2), as the transform functions should be the same in both. It just depends how much of the workflow you want to model within Hamilton itself.Elijah Ben Izzy
11/18/2022, 9:40 PMNeb Jovanovic
12/02/2022, 9:45 AMStefan Krawczyk
12/02/2022, 6:14 PM@extract_columns('a', 'b', 'c')
def source_table_df1(...) -> pd.DataFrame:
# logic to load data -- and ensure common index
df = ...
return df
@extract_columns('x', 'y', 'z')
def source_table_df2(...) -> pd.DataFrame:
# logic to load data -- and ensure common index
df = ...
return df
def joined_df(a: pd.Series, b: pd.Series, c: pd.Series, x: pd.Series, y: pd.Series, z: pd.Series) -> pd.DataFrame:
# rather than doing pandas join, construct the dataframe manually
df = pd.DataFrame({...})
return df
# you'd then request `joined_df` from execute().
Just to show what it would look like without expressing what columns were in the dataframes:
def source_table_df1(...) -> pd.DataFrame:
# logic to load data -- and ensure common index
df = ...
return df
def source_table_df2(...) -> pd.DataFrame:
# logic to load data -- and ensure common index
df = ...
return df
def joined_df(source_table_df1: pd.DataFrame, source_table_df2: pd.DataFrame) -> pd.DataFrame:
# do a pandas join
df = source_table_df1.join(...)
return df
# you'd then request `joined_df` from execute().
Neb Jovanovic
12/05/2022, 9:13 AM