Quick question Error: Hamilton does not consider ...
# general
a
Quick question Error: Hamilton does not consider these types to be equivalent. If you believe they are equivalent, please reach out to the developers. Note that, if you have types that are equivalent for your purposes, you can create a graph adapter that checks the types against each other in a more lenient manner. What I want would be
Copy code
def bar_union(x: pd.Series) -> t.Union[int, pd.Series]:
    try: 
return x
def foo_bar(bar_union: int) -> int:
    return bar + 1
t
The body of
foo_bar
should handle either a series or an exception. The error should disappear if annotate
foo_bar
as follow:
Copy code
def foo_bar(bar_union: t.Union[pd.Series, Exception]) -> None:
   return _do_stuff_with_e(e)
Is this the behavior you expected?
a
not fully cause I would like to propagate the series to a different function,
t
My understanding is that you would like to dispatch the implementation based on the return type of
bar_union()
. You currently have this DAG
But would like to have this one instead
(just trying to get the requirements and your end goals right)
a
yes the second one is exactly what I'd like
t
In short, Hamilton doesn't allow dispatching based on type. More generally, Hamilton doesn't have conditional branches at runtime ## Why Once you create your
Driver
, the DAG is static. When you call
Driver.execute(["baz"])
, we know exactly the path from input to the requested
baz
node before executing anything. This can have many benefits for safety and validation ## Solutions Now, we have other tools to handle exceptions and such. What's your end goal with
Exception
handling here? For example • give a default value to
foo_bar
• stop execution • log something / send an alert
a
say send an alert, that way I can handle it within that dag
t
When it comes to side-effects, like sending an alert, Hamilton has lifecycle adapters which allows you to add behaviors before/after a node or a graph execution. Adapters are added to the
Driver
and do not directly modify the values passed to the DAG. For example, the following code adds a Slack notifier (reference) that will send a message on exception
Copy code
from hamilton import driver
from hamilton.plugins import h_slack

notifier = h_slack.SlackNotifier(
  api_key="YOUR_API_KEY",
  channel="YOUR_CHANNEL"
)

dr = (
    driver.Builder()
    .with_modules(some_modules)
    .with_adapters(notifier)
    .build()
)
This provides a healthy separation between the actual logic of your DAG and side-effects. Examples of other adapters: execution progress bar, log artifacts to MLFlow, log runs with the Hamilton UI, send OpenLineage telemetry
Implementing your own adapter is pretty straightforward. It should look something like this
Copy code
from hamilton.lifecycle import NodeExecutionHook

class ExceptionHandlerHook(NodeExecutionHook):
  def __init__(self):
     ...

  def run_after_node_execution(
    self,
    node_name: str,
    result: Any,
    error: Optional[Exception]:
    success: bool,
    **kwargs,  # you can get more info
  ):
     if (node_name == "foo_bar") and (success is False):
        send_me_an_alert(...)

# then, where you execute your code
dr = (
    driver.Builder()
    .with_modules(some_modules)
    .with_adapters(ExceptionHandlerHook())
    .build()
)

dr.execute(...)
Let me know if that's helpful!
šŸ’” 1
šŸ™ 2
a
Thanks! that was super helpful
😁 1
j
Well explained, glad you all have this covered.