Seth Stokes
05/02/2024, 9:32 PMoverrides
.
I am trying to use overrides
but the driver is saying I am missing input values from nodes higher up the dag. Where the input value missing, has a default None
value in the function definition.Seth Stokes
05/02/2024, 9:34 PMdef open_positions_charged_delivery_fees_w_trades(
positions_charged_delivery_fees: pd.DataFrame,
filtered_open_deliverable_positions: pd.DataFrame
) -> pd.DataFrame:
return filtered_open_deliverable_positions.merge(
positions_charged_delivery_fees,
left_on=["Exch Contract"],
right_on=["KEY"],
how="left"
)
def open_positions_w_prorata_fee_by_trade(
open_positions_charged_delivery_fees_w_trades: pd.DataFrame,
cob_date: datetime = None,
) -> pd.DataFrame:
tbl = (
open_positions_charged_delivery_fees_w_trades
.assign(total_qty_by_key=lambda _df: _df.groupby(["KEY"])["EXTENDED_QTY"].transform("sum"))
.assign(factor=lambda _df: _df["EXTENDED_QTY"] / _df["total_qty_by_key"])
.assign(prorata_fee=lambda _df: _df["delivery_fee"] *_df["factor"] * -1)
)
if cob_date is not None:
tbl.insert(0, "cob_date", cob_date)
return tbl
driver
dr = (
driver.Builder()
# .with_config({
# "files_to_process": derived_files_to_load(),
# "report_filepath": ...
# })
# .enable_dynamic_execution(allow_experimental_mode=True)
.with_modules(dataflow_delivery_fee_allocation)
.with_adapter(base.PandasDataFrameResult())
# .with_remote_executor(SynchronousLocalTaskExecutor())
.build()
)
cached_results = pd.read_excel(...)
df = dr.execute(["delivery_fee_pivot"], overrides={"open_positions_charged_delivery_fees_w_trades": cached_results})
Seth Stokes
05/02/2024, 9:35 PMTraceback (most recent call last):
File "C:\codebase\rec-delivery-fee\run.py", line 99, in <module>
df = dr.execute(["delivery_fee_pivot"], overrides={"open_positions_charged_delivery_fees_w_trades": cached_results})
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\codebase\rec-delivery-fee\dfee-venv\Lib\site-packages\hamilton\driver.py", line 552, in execute
raise e
File "C:\codebase\rec-delivery-fee\dfee-venv\Lib\site-packages\hamilton\driver.py", line 542, in execute
outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\codebase\rec-delivery-fee\dfee-venv\Lib\site-packages\hamilton\driver.py", line 632, in raw_execute
Driver.validate_inputs(
File "C:\codebase\rec-delivery-fee\dfee-venv\Lib\site-packages\hamilton\driver.py", line 513, in validate_inputs
raise ValueError(error_str)
ValueError: 1 errors encountered:
Error: Required input file_name not provided for nodes: ['cob_date', 'table_1', 'table_2', 'table_3'].
Elijah Ben Izzy
05/02/2024, 9:37 PMtable_1
, table_2
, table_3
and another one: cob_date
that all need the file_name
field. If you do visualize_execution
instead of execute
will it show that there are missing upstream dependecies?Seth Stokes
05/02/2024, 9:38 PMElijah Ben Izzy
05/02/2024, 9:40 PMParallelizable
block?Seth Stokes
05/02/2024, 9:41 PMSeth Stokes
05/02/2024, 9:42 PMis_parallel
to the configElijah Ben Izzy
05/02/2024, 9:43 PMtable_{1,2,3}
and cob_date
should be made redundant by the override? There’s no way they’re in the path?Seth Stokes
05/02/2024, 9:43 PMSeth Stokes
05/02/2024, 9:49 PMdef open_positions_w_prorata_fee_by_trade(
open_positions_charged_delivery_fees_w_trades: pd.DataFrame,
cob_date: datetime = None,
) -> pd.DataFrame:
cob_date
is derived from file_name
which is not passed to the config.
Could cob_date: datetime = None
make the dag think that those nodes are needed ?Elijah Ben Izzy
05/02/2024, 9:50 PM= None
does it work?Elijah Ben Izzy
05/02/2024, 9:50 PMElijah Ben Izzy
05/02/2024, 9:51 PMcob_date
Seth Stokes
05/02/2024, 9:52 PMfile_name
-> cob_date
-> open_positions_w_prorata_fee_by_trade
(non dependent on either node but set to None
because the cob_date
column is already in my table from the overrides)Elijah Ben Izzy
05/02/2024, 9:54 PMopen_positions_w_prorata_fee_by_trade
depends on cob_date
Seth Stokes
05/02/2024, 10:04 PMcob_date
to None
in the function to avoid using it when using overrides
on a precomputed historical file, this still indicated to the dag that is was an upstream dependency.
The FIX. Pass cob_date
to the overrides
as well.Elijah Ben Izzy
05/02/2024, 10:06 PMSeth Stokes
05/16/2024, 1:43 PMoverrides
work when asking for a node in a subdag
? I'm erroring out on an input
when trying to use overrides
on a downstream node.Elijah Ben Izzy
05/16/2024, 2:23 PMsubdag_name.node_name
, meaning that the standard overrides won’t work (if I undrestand what you’re asking)Seth Stokes
05/16/2024, 4:39 PM# transform_cme_raw_fee_schedule.py
def raw_cme_fee_schedule(data_location: str) -> pd.DataFrame:
return pd.read_excel(data_location, header=[1, 2, 3])
def execution_types() -> pd.DataFrame: ...
# fee_schedules.py
import cme
@subdag(
cme,
inputs={"data_location": source("data_location")}
)
def cme_fee_schedule(execution_types: pd.DataFrame) -> pd.DataFrame:
...
# run.py
from hamilton import driver, base
dr = (
driver.Builder()
.with_modules(fee_schedules)
.with_adapter(base.PandasDataFrameResult())
.build()
)
raw_cme_fee_schedule_ = pd.DataFrame()
dr.execute(["cme_fee_schedule"], overrides={"raw_cme_fee_schedule": raw_cme_fee_schedule_})
Elijah Ben Izzy
05/16/2024, 4:59 PMsubdag_name.node_name
= cme_fee_schedule.raw_cme_fee_schedule
2. It’s thus ambiguous as to which it refers to
So, try it with the new name: cme_fee_schedule.raw_cme_fee_schedule
as the key?