This message was deleted.
# hamilton-help
s
This message was deleted.
👀 1
s
Copy code
from hamilton.function_modifiers import extract_columns

# -- could be in a data_loaders.py module -- #
def source_data1_df(filename1: str) -> pd.DataFrame:
    # load source data1 DF
    return df

def source_data2_df(filename2: str) -> pd.DataFrame:
    # load source data2 DF.
    return df

@extract_columns('COL1', 'COL2', ...)
def merged_data_fx_df(source_data1_df: pd.DataFrame, source_data2_df: pd.DataFrame) -> pd.DataFrame:
    # logic to merge
    return merged_df

# -- could be in a more general transforms.py module -- #
def transform_col1(COL1: pd.Series) -> pd.Series:
   # transform COL1

...
In Hamilton you have a few options. You could 1. Create functions to load the data that return dataframes, and then write functions that depend on those to merge/map dataframes. Then have transform functions that work from the basis of that merged dataframe. This is what I’ve stubbed out in the code above. The driver would then only need to be passed configuration to know where to load the dataframes… 2. You could do the merging/mapping outside of hamilton, and then pass in the merged dataframe to the driver. The transform logic functions would operate on the basis of those inputs. In terms of what do you actually pass to the driver, you can write a function that extracts the columns from the dataframe, or pass in the individual columns. In terms of the two options, (1) is a superset of (2), as the transform functions should be the same in both. It just depends how much of the workflow you want to model within Hamilton itself.
👍 1
e
Also, welcome to the community, glad you're giving hamilton a spin!
n
@Stefan Krawczyk In terms of data lineage, I’m thinking of passing an empty pd.DataFrame() to my driver first, and then using the functions it scours to load in both DFs that I want to combine. This is so in the DAG I can see which columns (i.e. data) comes from which table. Do you think something like this is possible?
s
@Neb Jovanovic, just to clarify, you want to: (1) load two data frames (df1, and df2) comprised of M columns and N columns respectfully. (2) create a single dataframe that is potentially <= M + N columns wide, using a common primary key (i.e. index). (3) and you want to have lineage at column level granularity to help identify what column came from what dataframe? If so, you could do something like this:
Copy code
@extract_columns('a', 'b', 'c')
def source_table_df1(...) -> pd.DataFrame:
    # logic to load data -- and ensure common index
    df = ...
    return df

@extract_columns('x', 'y', 'z')
def source_table_df2(...) -> pd.DataFrame:
    # logic to load data -- and ensure common index
    df = ...
    return df

def joined_df(a: pd.Series, b: pd.Series, c: pd.Series, x: pd.Series, y: pd.Series, z: pd.Series) -> pd.DataFrame:
    # rather than doing pandas join, construct the dataframe manually 
    df = pd.DataFrame({...})
    return df

# you'd then request `joined_df` from execute().
Just to show what it would look like without expressing what columns were in the dataframes:
Copy code
def source_table_df1(...) -> pd.DataFrame:
    # logic to load data -- and ensure common index
    df = ...
    return df

def source_table_df2(...) -> pd.DataFrame:
    # logic to load data -- and ensure common index
    df =  ...
    return df

def joined_df(source_table_df1: pd.DataFrame, source_table_df2: pd.DataFrame) -> pd.DataFrame:
    # do a pandas join
    df = source_table_df1.join(...)
    return df

# you'd then request `joined_df` from execute().
n
Amazing! Thanks!
👍 1