Hi everyone! First of all, great product! I'm just...
# hamilton-help
f
Hi everyone! First of all, great product! I'm just starting to use it and I'm liking it so far. Quick question. I am using the
@pipe
decorator to define optional transformations to run against my PySpark DataFrame. I have one transformation that takes as input parameter a variable that I wish would come from the
inputs
dictionary. The one I pass to the driver when I execute the DAG. E.g.,
Copy code
datasets = dr.execute(datasets_to_request, inputs={"start": "2024-01-01"})
I would like to make the value of the
start
key available to the function ran in the
@pipe
step I defined. Like so:
Copy code
@pipe(
    step(
        do_something,
        input1=[...], # This input can be defined in the source code here.
        start=start, # However here I would like to use the value of start passed as input, so 2024-01-01
    )
    ...
Is that possible? Or would I perhaps need to put that variable in the
config
passed to the Driver? Disclaimer: Apologies, I haven't actually tried this yet. It might be that by trying I would have found the answer. But due to stupid constraints I couldn't find time to try this out. Yet I sort of need to know if that's possible, hence my post.
šŸ‘€ 1
s
Thanks for the question @Fefun
t
Hi @Fefun! What you're trying to achieve is possible, you're almost there! What you need is a way to specify to the
step()
within
@pipe
to read a value from the dataflow. This is possible through the
source()
construct (more details here).
Copy code
from hamilton.function_modifiers import step, pipe, source

@pipe(
    step(
        _parse_something,
        input1=[...],
        start=source("start"),
    )
)
This will read the value of the node named
start
at execution time, which you can provide via
driver.execute(inputs={"start": "2024-01-01"})
šŸ‘ 2
The counterpart to
source()
is
value()
(essentially a constant). For example, this would mean
{"start": "start"}
Copy code
from hamilton.function_modifiers import step, pipe, value

@pipe(
    step(
        _parse_something,
        start=value("start"),  # using `value()`
    )
)
f
Oh wow thanks! It's actually much easier than I thought! So, to be sure I understand the concept correctly. What I add inside my
inputs
dictionary when I execute the DAG can be considered nodes just like any other function as node I defined?
šŸ‘ 1
t
Correct! It takes a bit of time to grasp the difference between functions,
inputs
, and
overrides
, so here's a brief explanation. You have the dataflow
Copy code
def A(external: int) -> int:
  return A * 2

def B(A: int) -> int:
  return A * 7
## inputs •
external
is an input because it's a dependency but not defined by a function. • To compute
A
or
B
you need to pass a value with
inputs={"external": ...})
• Once you set
inputs={"external": ...})
, all other others nodes have access to
external
## overrides • passing ``inputs={"A": ...})` would be invalid because
A
is already defined by a function • however, it's possible to set
driver.execute(overrides={"A": 7})
to override the value of
A
(and therefore,
external
is no longer required in the DAG)
šŸ™Œ 1
f
Very clear, thanks Thierry!
😁 1