Fefun
04/09/2024, 8:44 PM@pipe
decorator to define optional transformations to run against my PySpark DataFrame.
I have one transformation that takes as input parameter a variable that I wish would come from the inputs
dictionary. The one I pass to the driver when I execute the DAG.
E.g.,
datasets = dr.execute(datasets_to_request, inputs={"start": "2024-01-01"})
I would like to make the value of the start
key available to the function ran in the @pipe
step I defined. Like so:
@pipe(
step(
do_something,
input1=[...], # This input can be defined in the source code here.
start=start, # However here I would like to use the value of start passed as input, so 2024-01-01
)
...
Is that possible? Or would I perhaps need to put that variable in the config
passed to the Driver?
Disclaimer: Apologies, I haven't actually tried this yet. It might be that by trying I would have found the answer. But due to stupid constraints I couldn't find time to try this out. Yet I sort of need to know if that's possible, hence my post.Stefan Krawczyk
04/09/2024, 8:47 PMThierry Jean
04/09/2024, 8:50 PMstep()
within @pipe
to read a value from the dataflow. This is possible through the source()
construct (more details here).
from hamilton.function_modifiers import step, pipe, source
@pipe(
step(
_parse_something,
input1=[...],
start=source("start"),
)
)
This will read the value of the node named start
at execution time, which you can provide via driver.execute(inputs={"start": "2024-01-01"})
Thierry Jean
04/09/2024, 8:52 PMsource()
is value()
(essentially a constant). For example, this would mean {"start": "start"}
from hamilton.function_modifiers import step, pipe, value
@pipe(
step(
_parse_something,
start=value("start"), # using `value()`
)
)
Fefun
04/10/2024, 7:22 AMinputs
dictionary when I execute the DAG can be considered nodes just like any other function as node I defined?Thierry Jean
04/10/2024, 11:49 AMinputs
, and overrides
, so here's a brief explanation.
You have the dataflow
def A(external: int) -> int:
return A * 2
def B(A: int) -> int:
return A * 7
## inputs
⢠external
is an input because it's a dependency but not defined by a function.
⢠To compute A
or B
you need to pass a value with inputs={"external": ...})
⢠Once you set inputs={"external": ...})
, all other others nodes have access to external
## overrides
⢠passing ``inputs={"A": ...})` would be invalid because A
is already defined by a function
⢠however, it's possible to set driver.execute(overrides={"A": 7})
to override the value of A
(and therefore, external
is no longer required in the DAG)Fefun
04/10/2024, 7:48 PM