This message was deleted.
# hamilton-help
s
This message was deleted.
👀 1
r
Use case is feature engineering where I’d want to be able to scale up the same code that runs online inference to being able to handle large batch processing. Either by swapping pandas to Modin, or swapping the graph adapter depending on the run context
s
To clarify, you’re trying to use both modin & ray?
r
Yes I’m not necessarily tied to Modin but thinking either Modin + Ray, or just Ray via Hamilton
s
For the batch case, will data still fit in memory? or?
r
No I’d like to handle larger than memory. I know Modin can help do that on the Ray cluster. I assume Hamilton’s Ray adapter is just parallelizing the function calls but the machine running the driver would need to fit the initial dataframe in memory before it gets passed to the child functions?
s
Yep, cool, I’m just making sure I have the right understanding. I assume you have aggregate calculations as part of your features (i.e. require the whole dataset to be taken into account)?
r
Yep we will definitely have those
s
Cool. Options in my view are: • dask dataframes on dask • pandas on spark / pyspark • modin dataframes (on ray, or on dask) To integrate with Hamilton code that is using Pandas: • with modin it should be just an import swap — no adapter available. ◦ To enable sharing code, you’ll like want to do something like try to import modin, and use if it exists, else default to pandas. • with dask & pandas on spark, it would be changing how data is loaded + using an adapter — and as long as you stick to the pandas API they implement, nothing much else would have to change. Otherwise Ray could power modin, but it wouldn’t work well for things that don’t fit into memory just by itself, and to me I don’t think you’d see benefit of combing modin and using the Hamilton Ray adapter (there’s cases where it might, but without knowing more it probably wont).
Happy to jump on a quick call if that’s simpler
r
Ok great, thanks! Using Modin on Ray seems like the simplest way for now. As far as having Hamilton run DAG nodes in parallel without the subdag/Parallelize+Collect workflow, is there a way to allow it to use threads/processes if it identifies nodes or trees that can be run independently? An example would be if I have a slow function (e.g. network request). It seems like the DAG may run functions at the same level in the graph alphabetically, so the 2nd child has to wait for the first one to finish. Since they don't depend on each other it would be nice to allow them to compute in parallel. Example function code:
Copy code
# features.py 

import logging
import time

import pandas as pd

logger = logging.getLogger(__name__)


def parent_id() -> pd.Series:
    return pd.Series(["abc123"])


def child_a(parent_id: pd.Series) -> pd.Series:
    logger.info("Calculating child_a...")
    # some_data = requests.get(...)
    time.sleep(2) # blocking call
    logger.info("child_a calculated!")
    return pd.Series(["def456"])


def child_b(parent_id: pd.Series) -> pd.Series:
    logger.info("Calculating child_b...")
    logger.info("child_b calculated!")
    return pd.Series(["ghi789"])
Driver:
Copy code
# driver.py
import logging

from hamilton.driver import Driver

from src import features

logging.basicConfig(format='%(asctime)s %(message)s', level=logging.INFO)


def main():
    driver = Driver({}, features)
    driver.execute(["parent_id", "child_a", "child_b"])


if __name__ == "__main__":
    main()
Example log output where
child_a
is holding up
child_b
from calculating for the 2 second sleep
Copy code
2023-12-04 18:42:56,107 Calculating child_a...
2023-12-04 18:42:58,111 child_a calculated!
2023-12-04 18:42:58,112 Calculating child_b...
2023-12-04 18:42:58,112 child_b calculated!
s
Good question. Naively with Hamilton that’s how it works. But yeah we can augment how the graph is walked, so there’s ways to get what you want; we might even need to add something to Hamilton depending on where you want to run this. I have a call to jump on, but I’ll get back to you later tonight my time. Question - would this be running in a web-server or some offline process?
r
Awesome, thank you! For this case it would be more important in the web server where we want to lower latency while calculating features on demand from a request. So most likely using vanilla pandas instead of Modin.
s
@Ryan Whitten yep so in a web-service we would rely on async. You can modify https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/async and it should do what you want. i.e. anything that you want to run concurrently, you need to mark as async. I modified the code in the example to log and changed
foo
to be async, while
bar
to not be async.
Copy code
# async_modele.py
import asyncio
import json
import logging

import aiohttp
import fastapi

logger = logging.getLogger(__name__)


async def request_raw(request: fastapi.Request) -> dict:
    try:
        return await request.json()
    except json.JSONDecodeError as e:
        logger.warning(f"Unable to get JSON from request. Error is:\n{e}")
        return {}


async def foo(request_raw: dict) -> str:
    logger.info("foo")
    # time.sleep(2)
    await asyncio.sleep(2)
    logger.info("foo done")
    return request_raw.get("foo", "far")


def bar(request_raw: dict) -> str:
    logger.info("bar")
    logger.info("bar done")
    return request_raw.get("bar", "baz")


async def computation1(foo: str, some_data: dict) -> bool:
    logger.info("computation1")
    await asyncio.sleep(1)
    logger.info("computation1 done")
    return False


async def some_data() -> dict:
    logger.info("some_data")
    async with aiohttp.ClientSession() as session:
        async with session.get("<http://httpbin.org/get>") as resp:
            resp = await resp.json()
            logger.info("some_data done")
            return resp


async def computation2(bar: str) -> bool:
    logger.info("computation2")
    await asyncio.sleep(1)
    logger.info("computation2 done")
    return True


async def pipeline(computation1: bool, computation2: bool) -> dict:
    await asyncio.sleep(1)
    return {"computation1": computation1, "computation2": computation2}
The driver is then:
Copy code
import async_module
import fastapi

from hamilton import base
from hamilton.experimental import h_async

app = fastapi.FastAPI()

# can instantiate a driver once for the life of the app:
dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())

@app.post("/execute")
async def call(request: fastapi.Request) -> dict:
    """Handler for pipeline call"""
    input_data = {"request": request}
    # Can instantiate a driver within a request as well:
    # dr = h_async.AsyncDriver({}, async_module, result_builder=base.DictResult())
    result = await dr.execute(["pipeline"], inputs=input_data)
    # dr.visualize_execution(["pipeline"], "./pipeline.dot", {"format": "png"}, inputs=input_data)
    return result
and it’ll log:
Copy code
2023-12-04 17:23:33,036 some_data
2023-12-04 17:23:33,038 bar
2023-12-04 17:23:33,038 bar done
2023-12-04 17:23:33,038 foo
2023-12-04 17:23:33,038 computation2
2023-12-04 17:23:33,183 some_data done
2023-12-04 17:23:34,039 computation2 done
2023-12-04 17:23:35,039 foo done
2023-12-04 17:23:35,039 computation1
2023-12-04 17:23:36,039 computation1 done
Showing that a few things are done concurrently! Let me know if you have more questions / needs.
r
Thanks @Stefan Krawczyk, I think I'm getting closer. The one downside with async here is it requires rewriting the hamilton functions to be async. I did some looking through the AsyncGraphAdapter's
execute_node
implementation and saw that it would be a small change to use
<http://asyncio.to|asyncio.to>_thread
to run any non-async functions. This seems to work and have the intended result to let the blocking call run on a separate thread. Any thoughts on adding this as an option for the Async driver so we could flip on this functionality when needed?
Though perhaps it is better to be explicit and make any blocking functions async. I could then technically run the same code in batch pipelines but use the Async driver in both scenarios 🤔
e
Interesting — yeah, I think that’s quite a reasonable option to have. That said, the explicitness of asynchronous functions (given the overhead of threads) may end up being preferable. Another option is to tag it with
@tag(launch_thread_async=True)
, then only do that if the node has that specific tag. My guess is that explicitly making it async would likely be cleanest, but we could definitely take PRs for extra options!
r
Yeah for now I think having it explicit per function would give the most control and help minimize any extra overhead from running every single function in a thread. I'm still working through some use cases and requirements but hoping to keep it very simple for data scientists so we can reuse the same functions in most contexts
e
Great! Yeah, I think that: 1. reusing should be easy (potentially a good reason to sue threads, although my guess is that the operations will likely be quite different between the contexts in those cases) 2. Using them between contexts makes a lot of sense — you just need the right organizational scheme to ensure that the right one gets run in the right context. So keep us in the loop! Happy to help out/answer any questions.
gratitude thank you 1
r
Thank you guys, appreciate the help!
🙌 1
s
@Ryan Whitten would batch also make external requests? Usually that’s not a good pattern due to the “batch” ness of things; most APIs are designed for “row-level” requests. Commonly you’d have two implementations in this case:
Copy code
@config.when(mode="online")
async def foo__async(...) -> ...:
    return await ...

@config.when(mode="batch")
def foo(...) -> ...:
    return # load from parquet, or do a SQL call.
Then you wouldn’t need async in the batch side of things.
r
Probably not batch, but maybe streaming to enrich certain events. Async could also work there too. I was planning to play around with the
config.when
option and think it will perfectly solve the cases I do need to change it between batch and online
Overall I'm looking to build out a feature platform, so one idea here would be a custom decorator like
config.when
that wraps a function like
foo
in your example, which is set up for batch. Then the custom decorator can automatically fetch it from a feature store when in online mode, since the function name would == feature name in the store. In that scenario it should be easy enough to auto-build the async version of the function that makes a standard feature store call
s
> AsyncGraphAdapter’s
execute_node
implementation and saw that it would be a small change to use
<http://asyncio.to|asyncio.to>_thread
to run any non-async functions. That sounds like a good feature to add with a “mode” option to the AsyncDriver ? > custom decorator can automatically fetch it @Ryan Whitten yep that’s one way. Custom decorators and adapters are pretty close in functionality — but an adapter has more scope and doesn’t have to leak an import into the transform code. Another way/idea to get at what you want, is do a little work outside the Hamilton Driver. E.g. instantiate the driver, ask questions of it (dr.list_available_variables()) — e.g. determine what inputs are required/what nodes have certain tags — and then use that to perform a single request to the feature store for the features required and then use
overrides=
to inject them into the DAG; you could wrap this in your own custom driver. (in short: we have a lot of platform hooks in Hamilton 😉 )
r
Ha I am already ahead of you there. Right now I have a "FeatureSet" class that users can set a list of features on, and it uses the driver under the hood, utilizing
dr.list_available_variables()
to cross verify the features list with what's available in the dag and read tags. The next step would be integrating it with the feature store to fetch the values. In reality I will probably forego the async function route for online, and go with a single request to inject the values from the store like you said. With all your help I am more comfortable understanding everything Hamilton provides. And don't think there is anything else I'd need Hamilton to support right now 🙂 You guys did a great job thinking of all these options
🙌 1
s
@Ryan Whitten you’re welcome! Blog posts appreciated 😉
😀 1