This message was deleted.
# hamilton-help
s
This message was deleted.
и
But all this works for some table
logs
. What if I want to create another feature table based on some another table
logs_download
or
log_share
?😵‍💫
Okay. Here I see the split into two types of dataloaders then: log and meta. We build our feature tables based on logs: take features from logs and create based on meta. So we may do not expose metas features outside with @extract_colums as we want to use them internally.
But it is not clear yet how to handle situation when there are multiple log files which share the columns, features. How to choose with which to work and based on which one generate the features
Example:
Copy code
# dataloaders.py

@extract_columns(user_id, vide_id, seconds)
def log_views_df( path ):
   return pd.read_csv(path)

@extract_columns(user_id, vide_id)
def log_favourite_df( path ):
   return pd.read_csv(path)

def scenes_df(path):
   return pd.read_csv(path)
Copy code
# features.py

def title(log_file, scenes_df) -> pd.Series:
   return some Joim based on two
Here into
title
I would like to be able to pass both
log_views_df
and
extract_columns
Based on what I want to work with now
Maybe I could use different files for each
log
file and same for
meta
datalaoders. And then from cli I would specify what I want to work with:
Copy code
module_names = [
        "log_file" : args.log_file,  # "log_views" or "log_favourite" or some other...
        "data_loaders",  # functions to help load dataloaders
        "user_features",  # logic for creating user features
        "video_features",  # logic for creating video features
    ]
Any thoughts about that ?😄
e
Ok — so the current best practices is to rename inside the function as you did. However, you do have one other option — if you like relying on indices it should help with the
id
column — E.G. load it up and not release the ID column, instead make that into the index. That way you can just join on index and you’ll be good to go.
However, I think that we could expose something clearer in
extract_columns
— a dictionary of name assignments. Think:
Copy code
@extract_columns(id='video_id', title='video_title', ...)
def load_data(...) -> pd.DataFrame:
   ...
🔥 1
This allows you to name it whatever you want. Its basically a shortcut for renaming inside the function.
(to be clear, the example above doesn’t work but it wouldn’t be too hard to implement…)
s
@Игорь Хохолко to clarify for my understanding: 1. do you expect to load those DFs all at the same time? If so, then you either need to rename columns (as mentioned), or do any merging etc before doing extract_columns (see code (A) below). 2. If you don’t expect to load them all at the same time, then using
@config.when
, or putting the functions into separate python modules and constructing the DAG appropriately are what I would recommend. (perhaps this is what you mean by
log
vs
metadata
?) Code Example (A):
Copy code
def log_views_df( path ) -> pd.DataFrame:
   return pd.read_csv(path)

def log_favourite_df( path ) -> pd.DataFrame:
   return pd.read_csv(path)

def scenes_df(path) -> pd.DataFrame:
   return pd.read_csv(path)

@extract_columns(...) # <-- only do extract_columns here
def log_views_favourite_df(log_views_df: pd.DataFrame, log_favourite_df: pd.DataFrame) -> pd.DataFrame:
   _df = pd.merge(...)
   # rename anything as needed
   return _df
@Игорь Хохолко just checking in — did you to get to a satisfactory place? Otherwise some ideas we’ve been thinking about that you might have input on: • https://github.com/stitchfix/hamilton/issues/290https://github.com/stitchfix/hamilton/issues/86
и
Hello @Stefan Krawczyk! Thank you, I got something working😁 I used separate files for my "log" loaders. For other loaders which I claimed "meta"-loaders I do not expose features outside of them. My code:
Copy code
# dataloaders.py
def scenes_df(engine: psycopg2.extensions.connection, 
              path_cache: Union[str, Path] = None,
              cache_format: TableDataFormat = None, 
              cache_reload_last: bool = False) \
                  -> pd.DataFrame:
                      
    table_name = 'slr.scenes'

    sql =   """

            SELECT * from {}
            WHERE date >= '{}' AND date < '{}'

            """
    sql = sql.format(table_name, '{}', '{}')

    ...
            
    df = CacheDumper.execute_sql(sql, 
                                 engine, 
                                 ...
                                 )
    df.columns =  sanitize_columns(df.columns)
    return df
Copy code
# log_loaders/views_log.py

@extract_columns('application_hmd', 'application_type',  ... , 'seconds_separately_str', 'source',
       'user__id')
def log_df(engine: psycopg2.extensions.connection, 
           start_date: str, end_date: str,
           path_cache: Union[str, Path] = None, 
           cache_format: TableDataFormat = None,
           cache_reload_last: bool = False) -> pd.DataFrame:

    table_name = 'application."logs.views.videos.perseconds"'

    sql =   """

            SELECT * from {}
            WHERE date >= '{}' AND date < '{}'

            """
    sql = sql.format(table_name, '{}', '{}')


    df = CacheDumper.execute_sql(sql, 
                                engine, 
                                table_name,
                                ....
                                )
    df.columns =  sanitize_columns(df.columns)
    return df
Copy code
# vide_features.py


def title(scene__id: pd.Series, scenes_df: pd.DataFrame) -> pd.Series:
    
   scene__id_log_df = pd.DataFrame({'scene__id': scene__id})

   # Merge the two dataframes on the 'video_id' column
   merged_df = pd.merge(scene__id_log_df, scenes_df[['_id','title']], left_on='scene__id', right_on='_id', how='left')
   
   return merged_df.title
Copy code
module_names = [
        f"log_loaders.{application}",   # logic for loading logs based on which we build features table
        "data_loaders",                 # functions to help load datatools
        "user_features",                # logic for creating user features
        "video_features",               # logic for creating video features
        "user_statistics",              # logic to calculate user statistics
    ]

initial_config_and_data = {
        "engine": engine,
        "start_date": start_date,
        "end_date": end_date,
        "path_cache": DATA_DIR_RAW,
        "cache_format": cache_format,
        "cache_reload_last": cache_reload_last
        }
    dr = driver.Driver(initial_config_and_data, *modules)

    logging.log(<http://logging.INFO|logging.INFO>, f"Features to be loaded: {features}")
    logging.log(<http://logging.INFO|logging.INFO>, f"Dates Interval: {start_date}  --  {end_date}")
    df: pd.DataFrame = dr.execute(features)
s
great — will digest this in a bit!
Nice! looks good.
@Игорь Хохолко just so that you’re aware, Hamilton also integrates with pandera — examples here — so if you wanted a runtime data quality check, it should be easy to add them (feedback appreciated here on what we could add to make it more useful).
🔥 1
и
@Stefan Krawczyk I don't see an example for this situation: (Fig) Is it handled in Hamilton? Doing this I an getting some errors. The behaviour I would like to get: Validate only features I need at the moment. It means if from all
@extract_columns
I need only
spend
I would like to validate only
spend
. Would be great if I could drop rows with invalid variables.
Code to reproduce:
Copy code
import logging
import sys

import numpy as np
import pandas as pd
import pandera as pa

from hamilton import ad_hoc_utils, driver
from hamilton.function_modifiers import extract_columns, check_output



schema = pa.DataFrameSchema({
    "signups": pa.Column(int, checks=pa.Check.le(10)),
    "spend": pa.Column(float, checks=<http://pa.Check.lt|pa.Check.lt>(-1.2)),
    "other": pa.Column(str, checks=[
        pa.Check.str_startswith("value_"),
        # define custom checks as functions that take a series as input and
        # outputs a boolean or boolean Series
        pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
    ]),
})



@extract_columns('signups', 'spend', 'other')
@check_output(schema = schema)
def log() -> pd.DataFrame:
    # data to validate
    df = pd.DataFrame({
        "signups": [1, 4, 0, 10, 12],
        "spend": [-1.3, -1.4, -2.9, -10.1, -20.4],
        "other": ["value_1", "value_2", "value_3", "value_2", "value1"],
    })
    return df




# # Look at `my_functions` to see how these functions connect.
# def signups() -> pd.Series:
#     """Returns sign up values"""
#     return pd.Series([1, 10, 50, 100, 200, 400])


# def spend() -> pd.Series:
#     """Returns the spend values"""
#     return pd.Series([10, 10, 20, 40, 40, 50])


def log_spend_per_signup(spend_per_signup: pd.Series) -> pd.Series:
    """Simple function taking the logarithm of spend over signups."""
    return np.log(spend_per_signup)


# Place the functions into a temporary module -- the idea is that this should house a curated set of functions.
# Don't be afraid to make multiple of them -- however we'd advise you to not use this method for production.
# Also note, that using a temporary function module does not work for scaling onto Ray, Dask, or Pandas on Spark.
temp_module = ad_hoc_utils.create_temporary_module(
    log, log_spend_per_signup, module_name="function_example"
)







import pandas as pd

"""
Notes:
  1. This file is used for all the [ray|dask|spark]/hello_world examples.
  2. It therefore show cases how you can write something once and not only scale it, but port it
     to different frameworks with ease!
"""


def avg_3wk_spend(spend: pd.Series) -> pd.Series:
    """Rolling 3 week average spend."""
    return spend.rolling(3).mean()


def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
    """The cost per signup in relation to spend."""
    return spend / signups


def spend_mean(spend: pd.Series) -> float:
    """Shows function creating a scalar. In this case it computes the mean of the entire column."""
    return spend.mean()


def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
    """Shows function that takes a scalar. In this case to zero mean spend."""
    return spend - spend_mean


def spend_std_dev(spend: pd.Series) -> float:
    """Function that computes the standard deviation of the spend column."""
    return spend.std()


def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
    """Function showing one way to make spend have zero mean and unit variance."""
    return spend_zero_mean / spend_std_dev


my_functions = ad_hoc_utils.create_temporary_module(
    avg_3wk_spend, spend_per_signup, spend_mean, spend_zero_mean, 
    spend_std_dev, spend_zero_mean_unit_variance, module_name="my_functions"
)







# Cell 4 - Instantiate the Hamilton driver and pass it the right things in.

initial_config = {}
# we need to tell hamilton where to load function definitions from
dr = driver.Driver(initial_config, my_functions, temp_module)  # can pass in multiple modules
# we need to specify what we want in the final dataframe.
output_columns = [
    "spend",
    "signups",
    "avg_3wk_spend",
    "spend_per_signup",
    "spend_zero_mean_unit_variance",
    'other'
]
# Cell 5 - visualize execution
# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work





dr.execute(output_columns)
e
Ooh nice finds. Some thoughts: 1. Currently data_quality acts on all the outputs — this is not optimal, but we’d have to be smart about how it works with dataframes… 2. If you break it into two steps it should work, although it’ll validate everything — see code below: 3. Currently it doesn’t allow us to drop things, although I think that shouldn’t be too hard, and would be reasonable to do. Could be a param to pandera/check_output, which allows a pretty natural approach — need to look into it. It would involve a slight re-wiring but nothing crazy…
Copy code
@extract_columns('signups', 'spend', 'other')
def log(log_raw: pd.DataFrame) -> pd.DataFrame:
    return log_raw


@check_output(schema = schema)
def log_raw() -> pd.DataFrame:
    # data to validate
    df = pd.DataFrame({
        "signups": [1, 4, 0, 10, 12],
        "spend": [-1.3, -1.4, -2.9, -10.1, -20.4],
        "other": ["value_1", "value_2", "value_3", "value_2", "value1"],
    })
    return df
Hopefully this unblocks you for now — note if you just want it to run on signups, you can just run it on a signups column, although its suboptimal. Otherwise, would you open up an issue or two on github?
👍 1
и
I will, thank you
🙏 2