Andres MM
10/29/2024, 10:08 AMdef bar_union(x: pd.Series) -> t.Union[int, pd.Series]:
try:
return x
def foo_bar(bar_union: int) -> int:
return bar + 1
Thierry Jean
10/29/2024, 12:00 PMfoo_bar
should handle either a series or an exception. The error should disappear if annotate foo_bar
as follow:
def foo_bar(bar_union: t.Union[pd.Series, Exception]) -> None:
return _do_stuff_with_e(e)
Is this the behavior you expected?Andres MM
10/29/2024, 1:37 PMThierry Jean
10/29/2024, 1:45 PMbar_union()
. You currently have this DAGThierry Jean
10/29/2024, 1:46 PMThierry Jean
10/29/2024, 1:46 PMAndres MM
10/29/2024, 1:48 PMThierry Jean
10/29/2024, 1:59 PMDriver
, the DAG is static. When you call Driver.execute(["baz"])
, we know exactly the path from input to the requested baz
node before executing anything. This can have many benefits for safety and validation
## Solutions
Now, we have other tools to handle exceptions and such. What's your end goal with Exception
handling here? For example
⢠give a default value to foo_bar
⢠stop execution
⢠log something / send an alertAndres MM
10/29/2024, 2:34 PMThierry Jean
10/29/2024, 2:43 PMDriver
and do not directly modify the values passed to the DAG.
For example, the following code adds a Slack notifier (reference) that will send a message on exception
from hamilton import driver
from hamilton.plugins import h_slack
notifier = h_slack.SlackNotifier(
api_key="YOUR_API_KEY",
channel="YOUR_CHANNEL"
)
dr = (
driver.Builder()
.with_modules(some_modules)
.with_adapters(notifier)
.build()
)
This provides a healthy separation between the actual logic of your DAG and side-effects. Examples of other adapters: execution progress bar, log artifacts to MLFlow, log runs with the Hamilton UI, send OpenLineage telemetryThierry Jean
10/29/2024, 2:48 PMfrom hamilton.lifecycle import NodeExecutionHook
class ExceptionHandlerHook(NodeExecutionHook):
def __init__(self):
...
def run_after_node_execution(
self,
node_name: str,
result: Any,
error: Optional[Exception]:
success: bool,
**kwargs, # you can get more info
):
if (node_name == "foo_bar") and (success is False):
send_me_an_alert(...)
# then, where you execute your code
dr = (
driver.Builder()
.with_modules(some_modules)
.with_adapters(ExceptionHandlerHook())
.build()
)
dr.execute(...)
Let me know if that's helpful!Andres MM
10/29/2024, 2:59 PMJustin Donaldson
11/05/2024, 4:54 PM