Slackbot
12/19/2023, 8:49 AMStefan Krawczyk
12/19/2023, 3:32 PMdask.array
but definitely dask.series
. Did you try it? or are you getting some error?Hvuj
12/19/2023, 3:52 PMdask.series
Actual error: No registered subclass of BaseDefaultValidator is available for arg: data_type and type <class 'dask.dataframe.core.Series'>. This either means (a) this arg-type contribution isn't supported or (b) this has not been added yet (but should be). In the case of (b), we welcome contributions. Get started at <http://github.com/dagworks-inc/hamilton|github.com/dagworks-inc/hamilton>.
also tried:
check_output_custom
and target_="function_name"
Hvuj
12/19/2023, 4:04 PMcheck_output_custom
could you provide some guidance for dask.series
?Elijah Ben Izzy
12/19/2023, 4:13 PM@check_output_custom(DaskSeriesValidator(…))
Elijah Ben Izzy
12/19/2023, 4:13 PMHvuj
12/19/2023, 4:16 PMElijah Ben Izzy
12/19/2023, 4:17 PMStefan Krawczyk
12/19/2023, 6:43 PMStefan Krawczyk
12/19/2023, 7:35 PMdask.array
we could create something equivalent to https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/data_quality/default_validators.py so that dask.array
could be handled. Or figure out how to get pandera to validate them 🙂
Or you just do your own thing with @check_output_custom
.Hvuj
12/21/2023, 12:16 PMHvuj
12/21/2023, 3:02 PMWith1.pandera
dask.Series
doesnt work - hangs infinite (@check_output(schema = availability_difference_schema)
)
2. dask.DataFrame
- works great
just1.hamilton
dask.Series
doesnt work - (`Could not resolve validators for @check_output for function [availability_is_valid]. Please check that target_
is set correctly if you're using that argument.`) (@check_output(data_type=dd.Series,importance="warn",data_in_range=(0,1))
)
2. dask.DataFrame
doesnt work - (t`ype object 'DataFrame' has no attribute 'validate'`) - (@check_output(schema=dd.DataFrame, target_="check_sum_availability",importance="warn") def check_sum_availability(agg_od_data:dd.DataFrame, grouped_ddf:dd.DataFrame) -> dd.DataFrame:)
code:
main.py
dga = h_dask.DaskGraphAdapter(
dd_client,
h_dask.DaskDataFrameResult(),
use_delayed=False,
compute_at_end=True,
)
initial_config_and_data = {'raw_ddf': dd.from_delayed(raw_ddf),"grouped_ddf":dd.from_delayed(grouped)}
# dr = driver.Driver({'availability': grouped["availability"]}, *modules,adapter=dga)
dr = driver.Driver(initial_config_and_data,my_function,adapter=dga)
# determine what we want in the end
output_columns = ['agg_od_data','check_sum_availability','availability_is_valid']
df = dr.execute(output_columns)
test_ham.py
def check_sum_availability(agg_od_data:dd.DataFrame, grouped_ddf:dd.DataFrame) -> dd.DataFrame:
grouped_ddf= grouped_ddf[["od","availability"]].groupby(by=["od"]).agg({"availability":"sum"}).reset_index()
merged_ddf = grouped_ddf.merge(agg_od_data, how='inner', on=["od"])
merged_ddf['availability_difference'] = merged_ddf['availability'] - merged_ddf['agg_availability']
comparison_ddf = merged_ddf[['od','availability', 'agg_availability', 'availability_difference']]
comparison_ddf = comparison_ddf.groupby(by=['od']).agg({
"availability":"sum",
"agg_availability":"sum",
"availability_difference":"sum"
}).reset_index()
comparison_ddf = comparison_ddf.rename(columns={'index': 'od'})
return comparison_ddf
def availability_is_valid(check_sum_availability: dd.DataFrame) -> dd.Series:
return check_sum_availability.availability_difference==0
# pandera dask.DataFrame schema example
class Schema(pa.DataFrameModel):
flight__operating_number: Series[np.int8] = pa.Field(nullable=False,coerce=False)
operating_carrier: Series[str] = pa.Field(nullable=False,coerce=False)
od: Series[str] = pa.Field(nullable=False,coerce=False)
commodity__departure_date: Series[str] = pa.Field(nullable=True,coerce=False)
fares__rbd: Series[str] = pa.Field(nullable=False,coerce=False)
flight__issue_date: Series[str] = pa.Field(nullable=False,coerce=False)
commodity__cabin: Series[str] = pa.Field(nullable=True,coerce=False)
fares__pos: Series[str] = pa.Field(nullable=True,coerce=False)
availability: Series[int] = pa.Field(in_range={"min_value":5, "max_value":10000000},nullable=False,coerce=False)
agg_availability: Series[int] = pa.Field(in_range={"min_value":5, "max_value":10000000},nullable=False,coerce=False)
availability_difference: Series[int] = pa.Field(in_range={"min_value":0, "max_value":1},nullable=False,coerce=False)
#pandera dd.Series schema example
availability_difference_schema = pa.SeriesSchema(
bool,
checks=[
pa.Check(lambda x: x.all(), element_wise=False)
],
nullable=False,
unique=False,
name="availability_difference")
Stefan Krawczyk
12/21/2023, 6:12 PMWithpandera
1.doesnt work - hangs infinite (dask.Series
)@check_output(schema = availability_difference_schema)
2.Really, the series hangs? hmm 🤔 . Will try to recreate it.- works greatdask.DataFrame
justhamilton
1.doesnt work - (`Could not resolve validators for @check_output for function [availability_is_valid]. Please check thatdask.Series
is set correctly if you're using that argument.`) (target_
)@check_output(data_type=dd.Series,importance="warn",data_in_range=(0,1))
2.Yep — so dask type support fordoesnt work - (t`ype object 'DataFrame' has no attribute 'validate'`) - (@check_output(schema=dd.DataFrame, target_=“check_sum_availability”,importance=“warn”) def check_sum_availability(agg_od_data:dd.DataFrame, grouped_ddf:dd.DataFrame) -> dd.DataFrame:)dask.DataFrame
@check_output
with hamilton isn’t implemented.Hvuj
12/21/2023, 6:13 PMStefan Krawczyk
12/21/2023, 6:18 PMStefan Krawczyk
12/21/2023, 6:26 PM# functions.py
availability_difference_schema = pa.SeriesSchema(
bool,
checks=[
pa.Check(lambda x: x.all(), element_wise=False)
],
nullable=False,
unique=False,
name="availability_difference"
)
@check_output(schema=availability_difference_schema, importance="fail")
def availability_is_valid(check_sum_availability: dd.DataFrame) -> dd.Series:
return check_sum_availability.availability_difference == 0
with main.py
import logging
import sys
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
import pandas as pd
from hamilton.plugins import h_dask
from hamilton import driver
import functions
if __name__ == '__main__':
logger = logging.getLogger(__name__)
cluster = LocalCluster()
dd_client = Client(cluster)
logger.info(dd_client.cluster)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
dga = h_dask.DaskGraphAdapter(
dd_client,
h_dask.DaskDataFrameResult(),
use_delayed=False,
compute_at_end=True,
)
dr = driver.Driver({},functions,adapter=dga)
output_columns = ['availability_is_valid']
csa = dd.from_pandas(pd.DataFrame({
"availability_difference": [1, 0, 0, 0, 0, 0, 0, 0, 0, 0]
}), npartitions=1)
df = dr.execute(output_columns, overrides={"check_sum_availability": csa})
print(df)
Stefan Krawczyk
12/21/2023, 6:31 PM@check_output
you should be using the DataFrameSchema class, and not the DataFrameModel one. Unless you’re using the specific h_pandera.check_output
(docs) which then you’d annotate the function with — Hvuj
12/21/2023, 6:36 PMStefan Krawczyk
12/21/2023, 6:53 PMcould you provide an example?if you modify the code you gave me to have
@check_output(schema=availability_difference_schema, importance="fail")
on the function.
then run my main.py