Slackbot
01/29/2023, 12:42 PMИгорь Хохолко
01/29/2023, 12:58 PMlogs
.
What if I want to create another feature table based on some another table logs_download
or log_share
?😵💫Игорь Хохолко
01/29/2023, 1:22 PMИгорь Хохолко
01/29/2023, 1:25 PMИгорь Хохолко
01/29/2023, 1:33 PM# dataloaders.py
@extract_columns(user_id, vide_id, seconds)
def log_views_df( path ):
return pd.read_csv(path)
@extract_columns(user_id, vide_id)
def log_favourite_df( path ):
return pd.read_csv(path)
def scenes_df(path):
return pd.read_csv(path)
# features.py
def title(log_file, scenes_df) -> pd.Series:
return some Joim based on two
Here into title
I would like to be able to pass both log_views_df
and extract_columns
Based on what I want to work with nowИгорь Хохолко
01/29/2023, 1:37 PMlog
file and same for meta
datalaoders.
And then from cli I would specify what I want to work with:
module_names = [
"log_file" : args.log_file, # "log_views" or "log_favourite" or some other...
"data_loaders", # functions to help load dataloaders
"user_features", # logic for creating user features
"video_features", # logic for creating video features
]
Any thoughts about that ?😄Elijah Ben Izzy
01/29/2023, 6:54 PMid
column — E.G. load it up and not release the ID column, instead make that into the index. That way you can just join on index and you’ll be good to go.Elijah Ben Izzy
01/29/2023, 6:55 PMextract_columns
— a dictionary of name assignments. Think:
@extract_columns(id='video_id', title='video_title', ...)
def load_data(...) -> pd.DataFrame:
...
Elijah Ben Izzy
01/29/2023, 6:56 PMElijah Ben Izzy
01/29/2023, 6:58 PMStefan Krawczyk
01/29/2023, 10:16 PM@config.when
, or putting the functions into separate python modules and constructing the DAG appropriately are what I would recommend. (perhaps this is what you mean by log
vs metadata
?)
Code Example (A):
def log_views_df( path ) -> pd.DataFrame:
return pd.read_csv(path)
def log_favourite_df( path ) -> pd.DataFrame:
return pd.read_csv(path)
def scenes_df(path) -> pd.DataFrame:
return pd.read_csv(path)
@extract_columns(...) # <-- only do extract_columns here
def log_views_favourite_df(log_views_df: pd.DataFrame, log_favourite_df: pd.DataFrame) -> pd.DataFrame:
_df = pd.merge(...)
# rename anything as needed
return _df
Stefan Krawczyk
01/30/2023, 6:40 PMИгорь Хохолко
01/30/2023, 6:53 PM# dataloaders.py
def scenes_df(engine: psycopg2.extensions.connection,
path_cache: Union[str, Path] = None,
cache_format: TableDataFormat = None,
cache_reload_last: bool = False) \
-> pd.DataFrame:
table_name = 'slr.scenes'
sql = """
SELECT * from {}
WHERE date >= '{}' AND date < '{}'
"""
sql = sql.format(table_name, '{}', '{}')
...
df = CacheDumper.execute_sql(sql,
engine,
...
)
df.columns = sanitize_columns(df.columns)
return df
# log_loaders/views_log.py
@extract_columns('application_hmd', 'application_type', ... , 'seconds_separately_str', 'source',
'user__id')
def log_df(engine: psycopg2.extensions.connection,
start_date: str, end_date: str,
path_cache: Union[str, Path] = None,
cache_format: TableDataFormat = None,
cache_reload_last: bool = False) -> pd.DataFrame:
table_name = 'application."logs.views.videos.perseconds"'
sql = """
SELECT * from {}
WHERE date >= '{}' AND date < '{}'
"""
sql = sql.format(table_name, '{}', '{}')
df = CacheDumper.execute_sql(sql,
engine,
table_name,
....
)
df.columns = sanitize_columns(df.columns)
return df
# vide_features.py
def title(scene__id: pd.Series, scenes_df: pd.DataFrame) -> pd.Series:
scene__id_log_df = pd.DataFrame({'scene__id': scene__id})
# Merge the two dataframes on the 'video_id' column
merged_df = pd.merge(scene__id_log_df, scenes_df[['_id','title']], left_on='scene__id', right_on='_id', how='left')
return merged_df.title
module_names = [
f"log_loaders.{application}", # logic for loading logs based on which we build features table
"data_loaders", # functions to help load datatools
"user_features", # logic for creating user features
"video_features", # logic for creating video features
"user_statistics", # logic to calculate user statistics
]
initial_config_and_data = {
"engine": engine,
"start_date": start_date,
"end_date": end_date,
"path_cache": DATA_DIR_RAW,
"cache_format": cache_format,
"cache_reload_last": cache_reload_last
}
dr = driver.Driver(initial_config_and_data, *modules)
logging.log(<http://logging.INFO|logging.INFO>, f"Features to be loaded: {features}")
logging.log(<http://logging.INFO|logging.INFO>, f"Dates Interval: {start_date} -- {end_date}")
df: pd.DataFrame = dr.execute(features)
Stefan Krawczyk
01/30/2023, 7:18 PMStefan Krawczyk
01/30/2023, 9:02 PMStefan Krawczyk
01/30/2023, 9:06 PMИгорь Хохолко
01/31/2023, 8:08 AM@extract_columns
I need only spend
I would like to validate only spend
. Would be great if I could drop rows with invalid variables.Игорь Хохолко
01/31/2023, 8:12 AMimport logging
import sys
import numpy as np
import pandas as pd
import pandera as pa
from hamilton import ad_hoc_utils, driver
from hamilton.function_modifiers import extract_columns, check_output
schema = pa.DataFrameSchema({
"signups": pa.Column(int, checks=pa.Check.le(10)),
"spend": pa.Column(float, checks=<http://pa.Check.lt|pa.Check.lt>(-1.2)),
"other": pa.Column(str, checks=[
pa.Check.str_startswith("value_"),
# define custom checks as functions that take a series as input and
# outputs a boolean or boolean Series
pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
]),
})
@extract_columns('signups', 'spend', 'other')
@check_output(schema = schema)
def log() -> pd.DataFrame:
# data to validate
df = pd.DataFrame({
"signups": [1, 4, 0, 10, 12],
"spend": [-1.3, -1.4, -2.9, -10.1, -20.4],
"other": ["value_1", "value_2", "value_3", "value_2", "value1"],
})
return df
# # Look at `my_functions` to see how these functions connect.
# def signups() -> pd.Series:
# """Returns sign up values"""
# return pd.Series([1, 10, 50, 100, 200, 400])
# def spend() -> pd.Series:
# """Returns the spend values"""
# return pd.Series([10, 10, 20, 40, 40, 50])
def log_spend_per_signup(spend_per_signup: pd.Series) -> pd.Series:
"""Simple function taking the logarithm of spend over signups."""
return np.log(spend_per_signup)
# Place the functions into a temporary module -- the idea is that this should house a curated set of functions.
# Don't be afraid to make multiple of them -- however we'd advise you to not use this method for production.
# Also note, that using a temporary function module does not work for scaling onto Ray, Dask, or Pandas on Spark.
temp_module = ad_hoc_utils.create_temporary_module(
log, log_spend_per_signup, module_name="function_example"
)
import pandas as pd
"""
Notes:
1. This file is used for all the [ray|dask|spark]/hello_world examples.
2. It therefore show cases how you can write something once and not only scale it, but port it
to different frameworks with ease!
"""
def avg_3wk_spend(spend: pd.Series) -> pd.Series:
"""Rolling 3 week average spend."""
return spend.rolling(3).mean()
def spend_per_signup(spend: pd.Series, signups: pd.Series) -> pd.Series:
"""The cost per signup in relation to spend."""
return spend / signups
def spend_mean(spend: pd.Series) -> float:
"""Shows function creating a scalar. In this case it computes the mean of the entire column."""
return spend.mean()
def spend_zero_mean(spend: pd.Series, spend_mean: float) -> pd.Series:
"""Shows function that takes a scalar. In this case to zero mean spend."""
return spend - spend_mean
def spend_std_dev(spend: pd.Series) -> float:
"""Function that computes the standard deviation of the spend column."""
return spend.std()
def spend_zero_mean_unit_variance(spend_zero_mean: pd.Series, spend_std_dev: float) -> pd.Series:
"""Function showing one way to make spend have zero mean and unit variance."""
return spend_zero_mean / spend_std_dev
my_functions = ad_hoc_utils.create_temporary_module(
avg_3wk_spend, spend_per_signup, spend_mean, spend_zero_mean,
spend_std_dev, spend_zero_mean_unit_variance, module_name="my_functions"
)
# Cell 4 - Instantiate the Hamilton driver and pass it the right things in.
initial_config = {}
# we need to tell hamilton where to load function definitions from
dr = driver.Driver(initial_config, my_functions, temp_module) # can pass in multiple modules
# we need to specify what we want in the final dataframe.
output_columns = [
"spend",
"signups",
"avg_3wk_spend",
"spend_per_signup",
"spend_zero_mean_unit_variance",
'other'
]
# Cell 5 - visualize execution
# To visualize do `pip install "sf-hamilton[visualization]"` if you want these to work
dr.execute(output_columns)
Elijah Ben Izzy
01/31/2023, 5:24 PM@extract_columns('signups', 'spend', 'other')
def log(log_raw: pd.DataFrame) -> pd.DataFrame:
return log_raw
@check_output(schema = schema)
def log_raw() -> pd.DataFrame:
# data to validate
df = pd.DataFrame({
"signups": [1, 4, 0, 10, 12],
"spend": [-1.3, -1.4, -2.9, -10.1, -20.4],
"other": ["value_1", "value_2", "value_3", "value_2", "value1"],
})
return df
Elijah Ben Izzy
01/31/2023, 5:27 PMИгорь Хохолко
01/31/2023, 5:28 PM