Slackbot
02/13/2024, 11:22 AMElijah Ben Izzy
02/13/2024, 1:29 PMLuca Mattiazzi
02/13/2024, 2:04 PMElijah Ben Izzy
02/13/2024, 4:20 PMLuca Mattiazzi
02/14/2024, 10:28 AMimport importlib
from pathlib import Path
import pandas as pd
from hamilton.driver import Driver
from hamilton.function_modifiers import check_output
filename = Path(__file__).name.split(".")[0]
module = importlib.import_module(".", filename)
@check_output(range=(0, 100))
def ratio(a: pd.Series, b: pd.Series) -> pd.Series:
"""ratio"""
return a / b
driver = Driver({}, module)
input_df = pd.DataFrame({"a": [1.0, 2.0, 3.0], "b": [2.0, 0, 4.0]})
results = driver.execute(final_vars=["ratio", "a"], inputs=input_df)
running this, we get a log (twice for some reason) telling us that one line has failed validation, but I have no idea on how to act on that since it's just a log and I cannot use it in my code (also, it does not tell me which line has failed.
if I were to replace @check_output
with a custom validator built using great expectations (or anything else like it), I still would have no idea how to extract any information from the validator since the result of the driver is only a dataframeElijah Ben Izzy
02/14/2024, 5:12 PMElijah Ben Izzy
02/14/2024, 10:22 PMdef ratio(a: pd.Series, b: pd.Series) -> pd.Series:
return a/b
def ratio_cleaned(ratio: pd.Series) -> pd.Series:
return ratio[(ratio < 100) & (ratio > 0)]
Or, define your own decorator that mutates, you can leverage pandera:
@validate_and_drop(pandera_schema) # pandera allows mutation, this doesn't leverage hamilton to do this
def ratio(a: pd.Series, b: pd.Series) -> pd.Series:
return a/b
In fact, in the case above, you could use something like pipe
but its a little verbose and still requires a second one. Good if you have lots or want them to be Hamilton nodes.
def ratio_unprocessed(a: pd.Series, b: pd.Series) -> pd.Series:
return a/b
@pipe(
step(_with_outliers_removed, range=(0,100)),
)
def processed_ratio(ratio_unprocessed: pd.series) -> pd.Series:
return raw_data
Finally here’s a POC of using data validators:
1. Writing a custom data validator that I want to use
2. Separating out the two types of nodes (created data validation nodes versus requested ones)
3. Using the pandas result builder to build the core result
4. Using the other results to filter (I don’t implement this but its easy)
Code at bottom. Note you can use a custom ResultBuilder, but the tags aren’t exposed yet (they will be soon) so you have to be a bit clever.
from typing import Type
import pandas as pd
from hamilton import driver, base
from hamilton.base import PandasDataFrameResult
from hamilton.data_quality.base import DataValidator, ValidationResult
from hamilton.function_modifiers import check_output_custom
class FilteringRangeValidator(DataValidator):
def __init__(self, range: tuple):
super(FilteringRangeValidator, self).__init__(importance="warn")
self.range = range
def applies_to(self, datatype: Type[Type]) -> bool:
return datatype == pd.Series
def description(self) -> str:
return "example data validator"
@classmethod
def name(cls) -> str:
return "filtering_data_validator"
def validate(self, dataset: pd.DataFrame) -> ValidationResult:
return ValidationResult(
passes=True,
message="Data is valid, dropping some rows.",
diagnostics={"rows_to_drop": dataset[(dataset < 0) | (dataset > 10)].index} # drop the first 10 rows as an example
)
@check_output_custom(FilteringRangeValidator(range=(0, 100)))
def ratio(a: pd.Series, b: pd.Series) -> pd.Series:
"""ratio"""
return a / b
if __name__ == "__main__":
import __main__
dr = driver.Builder().with_modules(__main__).with_adapters(base.DictResult()).build()
data_nodes = ["ratio"]
validators = [item.name for item in dr.list_available_variables(tag_filter={"hamilton.data_quality.source_node": data_nodes})]
raw_results = dr.execute(validators + data_nodes, inputs=pd.DataFrame({"a": [1.0, 2.0, 3.0], "b": [2.0, 0, 4.0]}))
result_builder = PandasDataFrameResult() # just reuse the result builder from hamilton
results_to_merge = {k: v for k, v in raw_results.items() if k in data_nodes}
merged_results = result_builder.build_result(**results_to_merge)
rows_to_drop = {k: v.diagnostics["rows_to_drop"] for k, v in raw_results.items() if k in validators}
for key, row in rows_to_drop.items():
merged_results = merged_results.drop(row)
print(merged_results)
Luca Mattiazzi
02/15/2024, 10:40 AMElijah Ben Izzy
02/15/2024, 4:06 PMElijah Ben Izzy
02/15/2024, 6:58 PMLuca Mattiazzi
02/16/2024, 3:40 PMElijah Ben Izzy
02/16/2024, 3:53 PM