Slackbot
12/04/2023, 10:28 PMRyan Whitten
12/04/2023, 10:30 PMStefan Krawczyk
12/04/2023, 10:35 PMRyan Whitten
12/04/2023, 10:36 PMStefan Krawczyk
12/04/2023, 10:38 PMRyan Whitten
12/04/2023, 10:41 PMStefan Krawczyk
12/04/2023, 10:42 PMRyan Whitten
12/04/2023, 10:43 PMStefan Krawczyk
12/04/2023, 10:55 PMStefan Krawczyk
12/04/2023, 11:00 PMRyan Whitten
12/04/2023, 11:40 PM# features.py
import logging
import time
import pandas as pd
logger = logging.getLogger(__name__)
def parent_id() -> pd.Series:
return pd.Series(["abc123"])
def child_a(parent_id: pd.Series) -> pd.Series:
logger.info("Calculating child_a...")
# some_data = requests.get(...)
time.sleep(2) # blocking call
logger.info("child_a calculated!")
return pd.Series(["def456"])
def child_b(parent_id: pd.Series) -> pd.Series:
logger.info("Calculating child_b...")
logger.info("child_b calculated!")
return pd.Series(["ghi789"])
Driver:
# driver.py
import logging
from hamilton.driver import Driver
from src import features
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)
def main():
driver = Driver({}, features)
driver.execute(["parent_id", "child_a", "child_b"])
if __name__ == "__main__":
main()
Example log output where child_a
is holding up child_b
from calculating for the 2 second sleep
2023-12-04 18:42:56,107 Calculating child_a...
2023-12-04 18:42:58,111 child_a calculated!
2023-12-04 18:42:58,112 Calculating child_b...
2023-12-04 18:42:58,112 child_b calculated!
Stefan Krawczyk
12/04/2023, 11:49 PMRyan Whitten
12/04/2023, 11:56 PMStefan Krawczyk
12/05/2023, 1:24 AMfoo
to be async, while bar
to not be async.
# async_modele.py
import asyncio
import json
import logging
import aiohttp
import fastapi
logger = logging.getLogger(__name__)
async def request_raw(request: fastapi.Request) -> dict:
try:
return await request.json()
except json.JSONDecodeError as e:
logger.warning(f"Unable to get JSON from request. Error is:\n{e}")
return {}
async def foo(request_raw: dict) -> str:
logger.info("foo")
# time.sleep(2)
await asyncio.sleep(2)
logger.info("foo done")
return request_raw.get("foo", "far")
def bar(request_raw: dict) -> str:
logger.info("bar")
logger.info("bar done")
return request_raw.get("bar", "baz")
async def computation1(foo: str, some_data: dict) -> bool:
logger.info("computation1")
await asyncio.sleep(1)
logger.info("computation1 done")
return False
async def some_data() -> dict:
logger.info("some_data")
async with aiohttp.ClientSession() as session:
async with session.get("<http://httpbin.org/get>") as resp:
resp = await resp.json()
logger.info("some_data done")
return resp
async def computation2(bar: str) -> bool:
logger.info("computation2")
await asyncio.sleep(1)
logger.info("computation2 done")
return True
async def pipeline(computation1: bool, computation2: bool) -> dict:
await asyncio.sleep(1)
return {"computation1": computation1, "computation2": computation2}
The driver is then:
import async_module
import fastapi
from hamilton import base
from hamilton.experimental import h_async
app = fastapi.FastAPI()
# can instantiate a driver once for the life of the app:
dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
@app.post("/execute")
async def call(request: fastapi.Request) -> dict:
"""Handler for pipeline call"""
input_data = {"request": request}
# Can instantiate a driver within a request as well:
# dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
result = await dr.execute(["pipeline"], inputs=input_data)
# dr.visualize_execution(["pipeline"], "./pipeline.dot", {"format": "png"}, inputs=input_data)
return result
and itâll log:
2023-12-04 17:23:33,036 some_data
2023-12-04 17:23:33,038 bar
2023-12-04 17:23:33,038 bar done
2023-12-04 17:23:33,038 foo
2023-12-04 17:23:33,038 computation2
2023-12-04 17:23:33,183 some_data done
2023-12-04 17:23:34,039 computation2 done
2023-12-04 17:23:35,039 foo done
2023-12-04 17:23:35,039 computation1
2023-12-04 17:23:36,039 computation1 done
Showing that a few things are done concurrently! Let me know if you have more questions / needs.Ryan Whitten
12/05/2023, 11:19 AMexecute_node
implementation and saw that it would be a small change to use <http://asyncio.to|asyncio.to>_thread
to run any non-async functions. This seems to work and have the intended result to let the blocking call run on a separate thread. Any thoughts on adding this as an option for the Async driver so we could flip on this functionality when needed?Ryan Whitten
12/05/2023, 11:32 AMElijah Ben Izzy
12/05/2023, 3:03 PM@tag(launch_thread_async=True)
, then only do that if the node has that specific tag. My guess is that explicitly making it async would likely be cleanest, but we could definitely take PRs for extra options!Ryan Whitten
12/05/2023, 3:05 PMElijah Ben Izzy
12/05/2023, 3:15 PMRyan Whitten
12/05/2023, 3:32 PMStefan Krawczyk
12/05/2023, 5:36 PM@config.when(mode="online")
async def foo__async(...) -> ...:
return await ...
@config.when(mode="batch")
def foo(...) -> ...:
return # load from parquet, or do a SQL call.
Then you wouldnât need async in the batch side of things.Ryan Whitten
12/05/2023, 6:02 PMconfig.when
option and think it will perfectly solve the cases I do need to change it between batch and onlineRyan Whitten
12/05/2023, 6:05 PMconfig.when
that wraps a function like foo
in your example, which is set up for batch. Then the custom decorator can automatically fetch it from a feature store when in online mode, since the function name would == feature name in the store. In that scenario it should be easy enough to auto-build the async version of the function that makes a standard feature store callStefan Krawczyk
12/05/2023, 6:20 PMexecute_node
implementation and saw that it would be a small change to use <http://asyncio.to|asyncio.to>_thread
to run any non-async functions.
That sounds like a good feature to add with a âmodeâ option to the AsyncDriver ?
> custom decorator can automatically fetch it
@Ryan Whitten yep thatâs one way. Custom decorators and adapters are pretty close in functionality â but an adapter has more scope and doesnât have to leak an import into the transform code.
Another way/idea to get at what you want, is do a little work outside the Hamilton Driver. E.g. instantiate the driver, ask questions of it (dr.list_available_variables()) â e.g. determine what inputs are required/what nodes have certain tags â and then use that to perform a single request to the feature store for the features required and then use overrides=
to inject them into the DAG; you could wrap this in your own custom driver.
(in short: we have a lot of platform hooks in Hamilton đ )Ryan Whitten
12/05/2023, 6:29 PMdr.list_available_variables()
to cross verify the features list with what's available in the dag and read tags. The next step would be integrating it with the feature store to fetch the values. In reality I will probably forego the async function route for online, and go with a single request to inject the values from the store like you said.
With all your help I am more comfortable understanding everything Hamilton provides. And don't think there is anything else I'd need Hamilton to support right now đ You guys did a great job thinking of all these optionsStefan Krawczyk
12/05/2023, 7:01 PM