This message was deleted.
# hamilton-help
s
This message was deleted.
đź‘€ 1
e
Hey! What’s the code you’re currently using to validate? And do you mean you want to drop those? Or not log an error/fail?
l
hi! thank you! we are currently using great expectations, adding a decorator to each feature, and we would like to be able to retrieve a list of all the invalid rows, and the features that are invalid, at any point in the DAG, so that we can remove those rows from the output we didn't find any way of extracting this info from the driver execution, is there a way of doing it? should we use a custom graph_adapter?
e
Ahh sorry I didn’t see the response until just now! So, just to clarify: 1. You want to gather a list of “bad rows” (keyed by some ID) 2. You want to remove them from the output? Do you want to remove them all at the end? Or along the way? And you’re using hamilton’s custom data validation feature, leveraging great expectations? If you have a code sample of what it looks like now, that would be helpful to make more concrete. A custom reslut builder would do what you want, you can have the data validator output metadata containing “bad” rows then remove it. You could also have the data validator modify it, or do a custom tool that does the modification.
l
> 1. You want to gather a list of “bad rows” (keyed by some ID) > 2. You want to remove them from the output? that's correct! > Do you want to remove them all at the end? Or along the way? it's the same to remove them at the end or during computation, maybe it would be better to do the latter so that we can skip useless steps later! > And you’re using hamilton’s custom data validation feature, leveraging great expectations? exactly, currently we raise an exception when expectations are not met we raise an error a simple code example would be something like:
Copy code
import importlib

from pathlib import Path

import pandas as pd

from hamilton.driver import Driver
from hamilton.function_modifiers import check_output


filename = Path(__file__).name.split(".")[0]
module = importlib.import_module(".", filename)


@check_output(range=(0, 100))
def ratio(a: pd.Series, b: pd.Series) -> pd.Series:
    """ratio"""
    return a / b


driver = Driver({}, module)

input_df = pd.DataFrame({"a": [1.0, 2.0, 3.0], "b": [2.0, 0, 4.0]})

results = driver.execute(final_vars=["ratio", "a"], inputs=input_df)
running this, we get a log (twice for some reason) telling us that one line has failed validation, but I have no idea on how to act on that since it's just a log and I cannot use it in my code (also, it does not tell me which line has failed. if I were to replace
@check_output
with a custom validator built using great expectations (or anything else like it), I still would have no idea how to extract any information from the validator since the result of the driver is only a dataframe
e
Hey! Will respond more in depth later. But the high-level is: 1. you can customize result builders, if you want to use the metadata from your validators to build the results 2. We’ve observed people using validators to filter as well — I think this is supported but need to dig in more So both should be possible! Will provide some links/examples/thoughts in a bit
Ok, I have a pretty complex POC that does what you’re looking at but I’m not convinced its the best approach. Some ideas ahead, realizing that this is something that we should improve in Hamilton — add the ability to make a post-hoc change to a node and have it managed through the framework. IMO this isn’t the best for pipelines — if you’re dropping rows you want it to be explicit, and not done after the fact. And data validation isn’t really meant to be a mutation operation (although it can be…). I’d suggest: 1. Centralizing/adding specific data validation/dropping nodes that do both 2. Then just letting it run and be smart about joining This could be as simple as:
Copy code
def ratio(a: pd.Series, b: pd.Series) -> pd.Series:
    return a/b

def ratio_cleaned(ratio: pd.Series) -> pd.Series:
    return ratio[(ratio < 100) & (ratio > 0)]
Or, define your own decorator that mutates, you can leverage pandera:
Copy code
@validate_and_drop(pandera_schema) # pandera allows mutation, this doesn't leverage hamilton to do this
def ratio(a: pd.Series, b: pd.Series) -> pd.Series:
    return a/b
In fact, in the case above, you could use something like
pipe
but its a little verbose and still requires a second one. Good if you have lots or want them to be Hamilton nodes.
Copy code
def ratio_unprocessed(a: pd.Series, b: pd.Series) -> pd.Series:
    return a/b

@pipe(
    step(_with_outliers_removed, range=(0,100)),
)
def processed_ratio(ratio_unprocessed: pd.series) -> pd.Series:
    return raw_data
Finally here’s a POC of using data validators: 1. Writing a custom data validator that I want to use 2. Separating out the two types of nodes (created data validation nodes versus requested ones) 3. Using the pandas result builder to build the core result 4. Using the other results to filter (I don’t implement this but its easy) Code at bottom. Note you can use a custom ResultBuilder, but the tags aren’t exposed yet (they will be soon) so you have to be a bit clever.
Copy code
from typing import Type

import pandas as pd

from hamilton import driver, base
from hamilton.base import PandasDataFrameResult
from hamilton.data_quality.base import DataValidator, ValidationResult
from hamilton.function_modifiers import check_output_custom


class FilteringRangeValidator(DataValidator):
    def __init__(self, range: tuple):
        super(FilteringRangeValidator, self).__init__(importance="warn")
        self.range = range

    def applies_to(self, datatype: Type[Type]) -> bool:
        return datatype == pd.Series

    def description(self) -> str:
        return "example data validator"

    @classmethod
    def name(cls) -> str:
        return "filtering_data_validator"

    def validate(self, dataset: pd.DataFrame) -> ValidationResult:
        return ValidationResult(
            passes=True,
            message="Data is valid, dropping some rows.",
            diagnostics={"rows_to_drop": dataset[(dataset < 0) | (dataset > 10)].index}  # drop the first 10 rows as an example
        )


@check_output_custom(FilteringRangeValidator(range=(0, 100)))
def ratio(a: pd.Series, b: pd.Series) -> pd.Series:
    """ratio"""
    return a / b


if __name__ == "__main__":
    import __main__

    dr = driver.Builder().with_modules(__main__).with_adapters(base.DictResult()).build()
    data_nodes = ["ratio"]
    validators = [item.name for item in dr.list_available_variables(tag_filter={"hamilton.data_quality.source_node": data_nodes})]
    raw_results = dr.execute(validators + data_nodes, inputs=pd.DataFrame({"a": [1.0, 2.0, 3.0], "b": [2.0, 0, 4.0]}))
    result_builder = PandasDataFrameResult()  # just reuse the result builder from hamilton
    results_to_merge = {k: v for k, v in raw_results.items() if k in data_nodes}
    merged_results = result_builder.build_result(**results_to_merge)
    rows_to_drop = {k: v.diagnostics["rows_to_drop"] for k, v in raw_results.items() if k in validators}
    for key, row in rows_to_drop.items():
        merged_results = merged_results.drop(row)
    print(merged_results)
l
thank you for the answer, and for being so thorough! I'll look into the POC to understand what exacly it does and try to adapt it to our problem, by a first look I think I got the gist of it though!
e
Yeah! I think there’s room for a better abstraction here (a modification after a node, rather than a data quality assertion), so let us know what you find!
Opened up https://github.com/DAGWorks-Inc/hamilton/issues/701 to track — feel free to comment if you think this would solve your problem (I think its much cleaner than what I presented above)
l
great! thank you! think we got your POC, we're currently trying to use the same logic with our situation (~80/100 features, and we'll try to use a single column for all validators, we only need to know if the row is valid and, if not, which feature is the culprit) will update as soon as possible (and try to hel on the issue as well!)
e
Yeah! Just let me know which one you choose/what you like, that way I can better know what works for people!