Jannik Buhr
06/05/2024, 3:47 PMtargets
R package (https://books.ropensci.org/targets/walkthrough.html#change-code).
Is there a way of querying the driver for differences between it's cache and workflow? Something like a dry-run that checks the nodes but doesn't execute them.Roy Kid
06/05/2024, 7:22 PM@resolve
expand a (template) function to n
(not fixed, determined when CONFIG_AVAILABLE
) functions with different inputs, and collect them in the next function:
from hamilton.function_modifiers import resolve, ResolveAt, parameterize_sources
from hamilton.function_modifiers import source, value
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with= lambda names: parameterize_sources(
**{
f'parallel_{name}': dict(name=name) for name in names
}
)
)
def serial(name: str) -> str:
# expand serial func to n functions
# n is not fixed
return name
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with= lambda names: parameterize_sources(
**{
f'collect': {f"parallel_{name}": value(name) for name in names}
}
)
)
def collect(all_name: tuple[str]) -> str:
# expected input node: parallel_a, parallel_b, parallel_c
# each value of input is calculated by serial function
# serail is like a template
return ' '.join(all_name)
# above equal to:
def parallel_a:...
def parallel_b:...
def collect(parallel_a, parallel_b, ...):...
# but the number of inputs of collect is not fixed, determined when config_available
I think it could be achieved with Parallelable/Collect, but this "static & compile" style looks fancy, and it maybe more friendly to graph/UI?
in case you want to know why: A molecule construct with different segments, and each segments need to be processed(and calculated) in advanced. So a general workflow dont know how many segments(inputs) would be taken before "CONFIG_AVAILABLE". I hope I can define a template, and specialize it when it start.
Sundeep Amrute
06/08/2024, 2:16 PMStefan Krawczyk
06/11/2024, 4:32 PMSeth Stokes
06/13/2024, 3:52 PMhamilton
is that you cannot redefine a node with the same name.
However today, I noticed that this is possible in the contrived example below.
It was accidentally accomplished via load_from/inject_
.
Is this intended behavior or should an error have been raise?
@subdag(
scraper_a,
inputs={"data_location": source("a_data_location")}
)
def a_raw(
a_scraped_data: pd.DataFrame,
) -> pd.DataFrame:
return a_scraped_data
@extract_fields({
"a_processed": pd.DataFrame,
"b_processed": pd.DataFrame,
"c_processed": pd.DataFrame
})
@load_from.excel(path=..., inject_="a_raw")
def do_some_work(
a_raw: pd.DataFrame,
b_raw: pd.DataFrame,
c_raw: pd.DataFrame,
) -> Dict[str, pd.DataFrame]:
# some work
return {
"a_processed": pd.DataFrame(...),
"b_processed": pd.DataFrame(...),
"c_processed": pd.DataFrame(...),
}
Kyle Pounder
06/13/2024, 5:41 PMDavid
06/14/2024, 10:28 AMAlexander Cai
06/14/2024, 2:00 PMwhat_is_upstream_of
return the nodes in? Is there any way to get the list of nodes required to compute a target node in topological order?Noppadon Kemmanee
06/14/2024, 3:28 PMRohith Kumar
06/19/2024, 6:21 AMJulian
06/24/2024, 9:06 AMPandasExcelReader
, preprocess the dataframe, and save it again to a destination of my choice. I have attached a sample code without the actual URLs
# -*- coding: utf-8 -*-
import io
import pathlib
import zipfile
import pandas as pd
import requests
from hamilton import driver
from hamilton.plugins.pandas_extensions import PandasExcelReader
def load_data(url: str, destination_folder: str | pathlib.Path) -> None:
# Send a GET request to the URL and get the response
response = requests.get(url)
response.raise_for_status()
if isinstance(destination_folder, str):
destination_folder = pathlib.Path(destination_folder)
with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref:
# Extract the contents of the zip file to the temporary folder
zip_ref.extractall(destination_folder)
def read_excel(path: pathlib.Path) -> pd.DataFrame:
xlsx_reader = PandasExcelReader(path)
raw_df, metadata = xlsx_reader.load_data(pd.DataFrame)
return raw_df
def preprocess_df(raw_df: pd.DataFrame) -> pd.DataFrame:
df = raw_df
return df
def mean(df: pd.DataFrame) -> pd.Series:
return df.loc[:, "Column1"].mean()
if __name__ == "__main__":
import importlib
from hamilton import driver
functions = importlib.import_module(__name__)
# define the path variables
CWD = pathlib.Path().cwd()
DATA_PATH = CWD / "data"
FIGURES_PATH = CWD / "figs"
raw_data_path = DATA_PATH / "raw"
procurement_data_path = raw_data_path / "data.xlsx"
VARS = ["load_data", "read_excel", "preprocess_df", "mean"]
dr = driver.Driver(
{
"path": procurement_data_path,
"url": "<https://some.url>",
"destination_folder": DATA_PATH,
},
functions,
)
# a = dr.execute(VARS)
dr.display_all_functions(
FIGURES_PATH / "graph.png",
show_legend=True,
orient="LR",
)
When I uncomment the execution, I get the following error:
Traceback (most recent call last):
File "/home//software/data-analysis/pipeline.py", line 64, in <module>
a = dr.execute(VARS)
^^^^^^^^^^^^^^^^
File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 564, in execute
raise e
File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 554, in execute
outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 644, in raw_execute
Driver.validate_inputs(
File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 525, in validate_inputs
raise ValueError(error_str)
ValueError: 2 errors encountered:
Error: Required input df not provided for nodes: ['mean'].
Error: Required input raw_df not provided for nodes: ['preprocess_df'].
Also, the DAG looks not as I have expected. I find it really hard to use the PandasExcelReader since there is no proper documentation on how to use this. I am really thankful for any help!Elias Willemse
06/25/2024, 1:19 PMhamilton_ui
example (https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/hamilton_ui), and a a minor error popped up (I wouldnโt even call it that). When running the example, the python run.py
bit, it gave a ValueError: The 'dtype_backend' argument is not supported for the fastparquet engine
error. I fixed this by installing pyarrow
, which is not currently listed in the requirements.txt
file.
Second, the REAMDE.MD file is a bit outdated.
Current:
python run.py --email <email> --project_id <project_id>
should be
python run.py --username <email> --project_id <project_id>
and
python run.py --email <email> --project_id <project_id> --load-from-parquet True
should be
python run.py --username <email> --project_id <project_id> --load-from-parquet
without the True
flag.
Whatโs the easiest for you folks, should I create a bug report via github?Alexander Cai
06/26/2024, 7:57 AMconfig
options available as metadata anywhere on the function? Specifically, I'd like to check: For each function in a module, get the list of @config.when_in(...)
values as well as the keys.Carl Trachte
06/26/2024, 1:04 PMMichal Siedlaczek
06/26/2024, 1:16 PMSeth Stokes
07/02/2024, 3:36 PMdr.visualize_execution()
reveal what possible @config.when
arguments exist with their paths?Elijah Ben Izzy
07/02/2024, 4:33 PMCarl Trachte
07/02/2024, 11:33 PMElijah Ben Izzy
07/09/2024, 4:30 PMRyan Whitten
07/11/2024, 5:22 PMresult_builder
. With the regular Builder I would normally set it using with_adapters()
, e.g.:
result_builder = base.PandasDataFrameResult()
await AsyncBuilder().with_config(self.config).with_modules(*self.included_modules).with_adapters(result_builder).build()
but this raises ValueError("You cannot pass more than one result builder to the async driver. Please pass in a single result builder")
here: https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/async_driver.py#L225-L229
Looks like that is due to a default DictResult getting set if the AsyncBuilder.result_builder
is None: https://github.com/DAGWorks-Inc/hamilton/blob/main/hamilton/async_driver.py#L478-L480
I'm able to work around it by setting the builder's result_builder
attr before calling build()
, but not sure if that is the intended usage:
result_builder = base.PandasDataFrameResult()
builder = AsyncBuilder().with_config(self.config).with_modules(*self.included_modules)
builder.result_builder = result_builder
await builder.build()
Gilad Rubin
07/15/2024, 9:34 AMfrom langchain_community.document_loaders.pdf import BasePDFLoader
def pdf_pages(pdf_path: Union[str, Path],
pdf_loader: BasePDFLoader) -> List[Document]:
return pdf_loader(pdf_path).load()
Driver:
from langchain_community.document_loaders import PyMuPDFLoader
results = dr.execute(final_vars = outputs, inputs={"pdf_loader" : PyMuPDFLoader)
PyMuPDFLoader inherits from BasePDFLoader.
I'm getting the following error:
Error: Type requirement mismatch. Expected pdf_loader:typing.Type[langchain_community.document_loaders.pdf.BasePDFLoader] got class 'langchain_community.document_loaders.pdf.PyMuPDFLoader':class 'abc.ABCMeta' instead.
My motivation comes from wanting to check the DAG with a different Langchain PDF loaders.
Thanks!Jernej Frank
07/15/2024, 2:35 PMto.mlflow()
materializer since our models are not in their named flavors?Elijah Ben Izzy
07/16/2024, 4:30 PMArthur Andres
07/17/2024, 9:41 AMvenv/lib/python3.11/site-packages/hamilton/plugins/polars_post_1_0_0_extension.py:597: DeprecationWarning: The `polars.type_aliases` module is deprecated. The type aliases have moved to the `polars._typing` module to explicitly mark them as private. Please define your own type aliases, or temporarily import from the `polars._typing` module. A public `polars.typing` module will be added in the future.
from polars.type_aliases import ColumnTotalsDefinition, RowTotalsDefinition
Thanks.Roy Kid
07/17/2024, 10:05 AMcacheadapter
. In CachingGraphAdapter.execute_node
, it says the condition 3 that recomputes a node is upstream node was recomputed. I rerun the program with different input, the cached node seems wont be recomputed.
"""Executes nodes conditionally according to caching rules.
This node is executed if at least one of these is true:
* no cache is present,
* it is explicitly forced by passing it to the adapter in ``force_compute``,
* at least one of its upstream nodes that had a @cache annotation was computed,
either due to lack of cache or being explicitly forced.
"""
What if we write input of this node to the cache file? So when we check if a node need to be recomputed, we can compare input from upstream node with input in cache file. I think the advantage here is even if we rerun the program, the caching system still work.
PS: maybe I don't fully understand the source code...Roy Kid
07/17/2024, 10:14 AMspy
in pytest_mock
to check call_count
of function. I think it because when it wrap the object, it loss __module__
information or something else, and inspace.getmodule
can not find its module. I ask this because I want to know how many times the node is executed when I write unit test of caching. Maybe there is another way to do that?Gilad Rubin
07/22/2024, 1:59 PMfrom hamilton.plugins import h_diskcache
from hamilton.driver import Builder
cache_adapter = h_diskcache.DiskCacheAdapter()
builder = (Builder()
.with_adapters(cache_adapter)
)
It runs for about a minute and then outputs:
---------------------------------------------------------------------------
OperationalError Traceback (most recent call last)
Cell In[9], <vscode-notebook-cell:?execution_count=9&line=4|line 4>
<vscode-notebook-cell:?execution_count=9&line=1|1> from hamilton.plugins import h_diskcache
<vscode-notebook-cell:?execution_count=9&line=2|2> from hamilton.driver import Builder
----> <vscode-notebook-cell:?execution_count=9&line=4|4> cache_adapter = h_diskcache.DiskCacheAdapter()
<vscode-notebook-cell:?execution_count=9&line=6|6> builder = (Builder()
<vscode-notebook-cell:?execution_count=9&line=7|7> .with_adapters(cache_adapter)
<vscode-notebook-cell:?execution_count=9&line=8|8> )
File /anaconda/envs/pdf-env/lib/python3.10/site-packages/hamilton/plugins/h_diskcache.py:84, in DiskCacheAdapter.__init__(self, cache_vars, cache_path, **cache_settings)
82 self.cache_vars = cache_vars if cache_vars else []
83 self.cache_path = cache_path
---> 84 self.cache = diskcache.Cache(directory=cache_path, **cache_settings)
85 self.nodes_history: Dict[str, List[str]] = self.cache.get(
86 key=DiskCacheAdapter.nodes_history_key, default=dict()
87 ) # type: ignore
88 self.used_nodes_hash: Dict[str, str] = dict()
File /anaconda/envs/pdf-env/lib/python3.10/site-packages/diskcache/core.py:478, in Cache.__init__(self, directory, timeout, disk, **settings)
476 for key, value in sorted(sets.items()):
477 if key.startswith('sqlite_'):
--> 478 self.reset(key, value, update=False)
480 sql(
481 'CREATE TABLE IF NOT EXISTS Settings ('
482 ' key TEXT NOT NULL UNIQUE,'
483 ' value)'
484 )
486 # Setup Disk object (must happen after settings initialized).
File /anaconda/envs/pdf-env/lib/python3.10/site-packages/diskcache/core.py:2438, in Cache.reset(self, key, value, update)
2436 update = True
2437 if update:
-> 2438 sql('PRAGMA %s = %s' % (pragma, value)).fetchall()
2439 break
2440 except sqlite3.OperationalError as exc:
OperationalError: database is locked
===
Any ideas?Yijun Tang
07/23/2024, 1:40 AM@subdag
? Thank you!Gilad Rubin
07/23/2024, 8:38 AMStefan Krawczyk
07/23/2024, 4:32 PM