This message was deleted.
# hamilton-help
s
This message was deleted.
s
I’m not sure about
dask.array
but definitely
dask.series
. Did you try it? or are you getting some error?
h
yea an error for
dask.series
Copy code
Actual error: No registered subclass of BaseDefaultValidator is available for arg: data_type and type <class 'dask.dataframe.core.Series'>. This either means (a) this arg-type contribution isn't supported or (b) this has not been added yet (but should be). In the case of (b), we welcome contributions. Get started at <http://github.com/dagworks-inc/hamilton|github.com/dagworks-inc/hamilton>.
also tried:
check_output_custom
and
target_="function_name"
so i guess i need to create my own validator using
check_output_custom
could you provide some guidance for
dask.series
?
e
Hmm, AFK now but we should get this to work without too much trouble. It should just work but I don’t think we’ve extended it. Will take a look today to determine! To do it yourself you’d want to: 1. Implement this: https://github.com/DAGWorks-Inc/hamilton/blob/main/data_quality.md#custom-validators 2. Pass it to your decorator
@check_output_custom(DaskSeriesValidator(…))
But IIRC pandera works with dask so we should be able to get it to work out of the box as well
👀 1
h
ill try it out thx
👍 1
e
Yep will get back to you when I look at doing it using the default path
👍 1
s
@Hvuj can you provide the signature of the function you’re trying to annotate with @check_output please? So I can write a test to ensure it passes.
👀 1
@Hvuj you could try installing hamilton from my branch - https://github.com/DAGWorks-Inc/hamilton/pull/596 and see if that gets the Series going for you. For supporting
dask.array
we could create something equivalent to https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/data_quality/default_validators.py so that
dask.array
could be handled. Or figure out how to get pandera to validate them 🙂 Or you just do your own thing with
@check_output_custom
.
👍 1
h
trying now
findings on `check_output`: (for testing the parquet file size is 30MIB )
With
pandera
1.
dask.Series
doesnt work - hangs infinite (
@check_output(schema = availability_difference_schema)
) 2.
dask.DataFrame
- works great
just
hamilton
1.
dask.Series
doesnt work - (`Could not resolve validators for @check_output for function [availability_is_valid]. Please check that
target_
is set correctly if you're using that argument.`) (
@check_output(data_type=dd.Series,importance="warn",data_in_range=(0,1))
) 2.
dask.DataFrame
doesnt work - (t`ype object 'DataFrame' has no attribute 'validate'`) - (@check_output(schema=dd.DataFrame, target_="check_sum_availability",importance="warn") def check_sum_availability(agg_od_data:dd.DataFrame, grouped_ddf:dd.DataFrame) -> dd.DataFrame:)
code:
main.py
Copy code
dga = h_dask.DaskGraphAdapter(
        dd_client,
        h_dask.DaskDataFrameResult(),
        use_delayed=False,
        compute_at_end=True,
    )
    initial_config_and_data = {'raw_ddf': dd.from_delayed(raw_ddf),"grouped_ddf":dd.from_delayed(grouped)}

    # dr = driver.Driver({'availability': grouped["availability"]}, *modules,adapter=dga)
    dr = driver.Driver(initial_config_and_data,my_function,adapter=dga)

    # determine what we want in the end
    output_columns = ['agg_od_data','check_sum_availability','availability_is_valid']

    df = dr.execute(output_columns)
test_ham.py
Copy code
def check_sum_availability(agg_od_data:dd.DataFrame, grouped_ddf:dd.DataFrame) -> dd.DataFrame:

    grouped_ddf= grouped_ddf[["od","availability"]].groupby(by=["od"]).agg({"availability":"sum"}).reset_index()

    merged_ddf = grouped_ddf.merge(agg_od_data, how='inner', on=["od"])

    merged_ddf['availability_difference'] = merged_ddf['availability'] - merged_ddf['agg_availability']

    comparison_ddf = merged_ddf[['od','availability', 'agg_availability', 'availability_difference']]
    
    comparison_ddf = comparison_ddf.groupby(by=['od']).agg({
        "availability":"sum",
        "agg_availability":"sum",
        "availability_difference":"sum"
        }).reset_index()

    comparison_ddf = comparison_ddf.rename(columns={'index': 'od'})

    return comparison_ddf

def availability_is_valid(check_sum_availability: dd.DataFrame) -> dd.Series:
    return check_sum_availability.availability_difference==0

# pandera dask.DataFrame schema example
class Schema(pa.DataFrameModel):
    flight__operating_number: Series[np.int8] = pa.Field(nullable=False,coerce=False)
    operating_carrier: Series[str] = pa.Field(nullable=False,coerce=False)
    od: Series[str] = pa.Field(nullable=False,coerce=False)
    commodity__departure_date: Series[str] = pa.Field(nullable=True,coerce=False)
    fares__rbd: Series[str] = pa.Field(nullable=False,coerce=False)
    flight__issue_date: Series[str] = pa.Field(nullable=False,coerce=False)
    commodity__cabin: Series[str] = pa.Field(nullable=True,coerce=False)
    fares__pos: Series[str] = pa.Field(nullable=True,coerce=False)
    availability: Series[int] = pa.Field(in_range={"min_value":5, "max_value":10000000},nullable=False,coerce=False)
    agg_availability: Series[int] = pa.Field(in_range={"min_value":5, "max_value":10000000},nullable=False,coerce=False)
    availability_difference: Series[int] = pa.Field(in_range={"min_value":0, "max_value":1},nullable=False,coerce=False)

#pandera dd.Series schema example
availability_difference_schema = pa.SeriesSchema(
    bool,
    checks=[
        pa.Check(lambda x: x.all(), element_wise=False)
    ],
    nullable=False,
    unique=False,
    name="availability_difference")
👀 1
s
With
pandera
1.
dask.Series
doesnt work - hangs infinite (
@check_output(schema = availability_difference_schema)
)
2.
dask.DataFrame
- works great
Really, the series hangs? hmm 🤔 . Will try to recreate it.
just
hamilton
1.
dask.Series
doesnt work - (`Could not resolve validators for @check_output for function [availability_is_valid]. Please check that
target_
is set correctly if you're using that argument.`) (
@check_output(data_type=dd.Series,importance="warn",data_in_range=(0,1))
)
2.
dask.DataFrame
doesnt work - (t`ype object 'DataFrame' has no attribute 'validate'`) - (@check_output(schema=dd.DataFrame, target_=“check_sum_availability”,importance=“warn”) def check_sum_availability(agg_od_data:dd.DataFrame, grouped_ddf:dd.DataFrame) -> dd.DataFrame:)
Yep — so dask type support for
@check_output
with hamilton isn’t implemented.
h
awesome thx for the assistance
s
do you have some sample data I could use to make your code run?
This fails as expected for me when I just try to exercise the series function and pandera check.
Copy code
# functions.py

availability_difference_schema = pa.SeriesSchema(
    bool,
    checks=[
        pa.Check(lambda x: x.all(), element_wise=False)
    ],
    nullable=False,
    unique=False,
    name="availability_difference"
)

@check_output(schema=availability_difference_schema, importance="fail")
def availability_is_valid(check_sum_availability: dd.DataFrame) -> dd.Series:
    return check_sum_availability.availability_difference == 0
with main.py
Copy code
import logging
import sys

import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import pandas as pd
from hamilton.plugins import h_dask
from hamilton import driver
import functions

if __name__ == '__main__':
    logger = logging.getLogger(__name__)
    cluster = LocalCluster()
    dd_client = Client(cluster)
    logger.info(dd_client.cluster)
    logging.basicConfig(stream=sys.stdout, level=logging.INFO)
    dga = h_dask.DaskGraphAdapter(
        dd_client,
        h_dask.DaskDataFrameResult(),
        use_delayed=False,
        compute_at_end=True,
    )
    dr = driver.Driver({},functions,adapter=dga)
    output_columns = ['availability_is_valid']
    csa = dd.from_pandas(pd.DataFrame({
        "availability_difference": [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
    }), npartitions=1)
    df = dr.execute(output_columns, overrides={"check_sum_availability": csa})
    print(df)
Otherwise for dataframes — with
@check_output
you should be using the DataFrameSchema class, and not the DataFrameModel one. Unless you’re using the specific
h_pandera.check_output
(docs) which then you’d annotate the function with — and it seems like we have an error in the docs saying they’re equivalent docs are fixed. Happy to clarify if I’m confusing you.
h
u did hhh could you provide an example? trying to get u a file to reproduce
s
could you provide an example?
if you modify the code you gave me to have
@check_output(schema=availability_difference_schema, importance="fail")
on the function. then run my
main.py