Julian
06/24/2024, 9:06 AMPandasExcelReader
, preprocess the dataframe, and save it again to a destination of my choice. I have attached a sample code without the actual URLs
# -*- coding: utf-8 -*-
import io
import pathlib
import zipfile
import pandas as pd
import requests
from hamilton import driver
from hamilton.plugins.pandas_extensions import PandasExcelReader
def load_data(url: str, destination_folder: str | pathlib.Path) -> None:
# Send a GET request to the URL and get the response
response = requests.get(url)
response.raise_for_status()
if isinstance(destination_folder, str):
destination_folder = pathlib.Path(destination_folder)
with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref:
# Extract the contents of the zip file to the temporary folder
zip_ref.extractall(destination_folder)
def read_excel(path: pathlib.Path) -> pd.DataFrame:
xlsx_reader = PandasExcelReader(path)
raw_df, metadata = xlsx_reader.load_data(pd.DataFrame)
return raw_df
def preprocess_df(raw_df: pd.DataFrame) -> pd.DataFrame:
df = raw_df
return df
def mean(df: pd.DataFrame) -> pd.Series:
return df.loc[:, "Column1"].mean()
if __name__ == "__main__":
import importlib
from hamilton import driver
functions = importlib.import_module(__name__)
# define the path variables
CWD = pathlib.Path().cwd()
DATA_PATH = CWD / "data"
FIGURES_PATH = CWD / "figs"
raw_data_path = DATA_PATH / "raw"
procurement_data_path = raw_data_path / "data.xlsx"
VARS = ["load_data", "read_excel", "preprocess_df", "mean"]
dr = driver.Driver(
{
"path": procurement_data_path,
"url": "<https://some.url>",
"destination_folder": DATA_PATH,
},
functions,
)
# a = dr.execute(VARS)
dr.display_all_functions(
FIGURES_PATH / "graph.png",
show_legend=True,
orient="LR",
)
When I uncomment the execution, I get the following error:
Traceback (most recent call last):
File "/home//software/data-analysis/pipeline.py", line 64, in <module>
a = dr.execute(VARS)
^^^^^^^^^^^^^^^^
File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 564, in execute
raise e
File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 554, in execute
outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 644, in raw_execute
Driver.validate_inputs(
File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 525, in validate_inputs
raise ValueError(error_str)
ValueError: 2 errors encountered:
Error: Required input df not provided for nodes: ['mean'].
Error: Required input raw_df not provided for nodes: ['preprocess_df'].
Also, the DAG looks not as I have expected. I find it really hard to use the PandasExcelReader since there is no proper documentation on how to use this. I am really thankful for any help!Julian
06/24/2024, 9:12 AMThierry Jean
06/24/2024, 12:10 PMdef read_excel(path: pathlib.Path) -> pd.DataFrame:
xlsx_reader = PandasExcelReader(path)
raw_df, metadata = xlsx_reader.load_data(pd.DataFrame)
return raw_df
def preprocess_df(read_excel: pd.DataFrame) -> pd.DataFrame:
df = raw_df
return df
def mean(preprocess_df: pd.DataFrame) -> pd.Series:
return df.loc[:, "Column1"].mean()
In Hamilton, it's also useful to think of functions in terms of "nouns" rather than "verbs". I would personally rename the functions:
• read_excel -> raw_df
• preprocess_df -> preprocessed_df
## 2
Files under hamilton.plugins
with names FOO_extensions.py
aren't meant to be directly imported. The PandasExcelReader
is an internal class (hence no extensive documentation) that powers from_.excel()
or the decorator @load_from.excel()
. I suggest reading the section on materialization and see if it unblocks you!
Depending on your use case, it might be simpler / more transparent to use pandas.read_excel(). Using the PandasExcelReader
with the methods I mentioned above is just a matter of convenienceJulian
06/25/2024, 7:02 AM@parameterize(
load_data_xlsx={"fileytpe": value("xlsx")},
load_data_csv={"fileytpe": value("csv")},
load_data_json={"fileytpe": value("json")},
)
def load_data(
path: str | pathlib.Path, filetype: str
) -> tuple[pd.DataFrame, dict]:
f = filetype
match filetype:
case "xlsx":
reader = PandasExcelReader(path)
_type = pd.DataFrame
case "csv":
reader = PandasCSVReader(path)
_type = pd.DataFrame
case "json":
reader = JSONDataLoader(path)
_type = dict
data, metadata = reader.load_data(_type)
return data, metadata
I see that I should not access hamilton.plugin
functions directly. Can I use parameterize in combination with load_from or materialization?
The code above gives me the following error:
hamilton.function_modifiers.base.InvalidDecoratorException: Parametrization is invalid: the following parameters don't appear in the function itself: fileytpe
Julian
06/25/2024, 10:17 AM# -*- coding: utf-8 -*-
# --- START IMPORT SECTION
import logging
from hamilton import contrib
logger = logging.getLogger(__name__)
with contrib.catch_import_errors(__name__, __file__, logger):
import pathlib
import pandas as pd
def raw_df(input: pd.DataFrame) -> pd.DataFrame:
return input
def processed_data(raw_df: pd.DataFrame) -> pd.Series:
return raw_df.loc[:, "A"]
# def data_processing(raw_df):
# pass
if __name__ == "__main__":
import importlib
from hamilton import base, driver
from hamilton.io.materialization import from_, to
C = "W"
P = "0"
F = "B"
CWD = pathlib.Path().cwd()
DATA_PATH = CWD / "data"
excel_data_path = DATA_PATH / f"{C}/{P}/{F}.xlsx"
# define hamilton adapter
dict_builder = base.DictResult()
adapter = base.SimplePythonGraphAdapter(dict_builder)
# define hamilton materializers
materializers = [
from_.excel(target="raw_df", path=f"{excel_data_path.resolve()}")
]
# define hamilton functions
functions = importlib.import_module(__name__)
dr = (
driver.Builder()
.with_adapter(adapter)
.with_materializers(materializers)
# .with_modules(functions)
.build()
)
additional_vars = ["processed_data"]
mat_results = dr.materialize(
additional_vars=additional_vars,
)
But I get the following error:
raise ValueError(f"Unknown nodes [{missing_vars_str}] requested. Check for typos?")
ValueError: Unknown nodes [processed_data] requested. Check for typos?
My hamilton version is 1.67.0, my Python is 3.12, and I am working on WSL Ubuntu 22.04.Thierry Jean
06/25/2024, 3:22 PMimport pandas as pd
# functions defined before `if __name__ == "__main__":`
# are part of the DAG
def raw_df(input: pd.DataFrame) -> pd.DataFrame:
return input
def processed_data(raw_df: pd.DataFrame) -> pd.Series:
return raw_df.loc[:, "A"]
if __name__ == "__main__":
import pathlib
from hamilton import driver
# we will need `from_` to load data
from hamilton.io.materialization import from_
# here, `__main__` refers to the current Python module.
# It allows us to load the functions raw_df() and processed_data()
import __main__
# set your excel Path
C = "W"
P = "0"
F = "B"
CWD = pathlib.Path().cwd()
DATA_PATH = CWD / "data"
excel_data_path = DATA_PATH / f"{C}/{P}/{F}.xlsx"
# define a list of materializers (with only 1 item)
materializers = [
# define a materalizer that loads from an excel file
from_.excel(
# `target` is the name of a node, which needs to be a `pd.DataFrame`
# that will receive the loaded value.
# `raw_df` would be incorrect because there's already a function with that name
# `input` is correct because it's an "input" of the DAG. In other words, there's
# no function with that name / defining that value
target="input",
path=str(excel_data_path.resolve()) # `path` needs to be a string
)
]
# create the `Driver` using the `Builder`
dr = (
driver.Builder()
# pass the module `__main__` (the current one) which contains raw_df() and processed_data()
.with_modules(__main__)
# pass the materializers to the driver; the `*` is required to unpack the list
# reference: <https://www.pythontutorial.net/python-basics/python-unpack-list/>
.with_materializers(*materializers)
.build() # this build the Driver
)
# use `dr.execute()` with the name of the nodes we want returned
# we don't need to use `.materialize()` since we already use `.with_materializers()`
results = dr.execute(["processed_data"])
# this will be the computed pd.Series
results["processed_data"]
Thierry Jean
06/25/2024, 3:23 PMJulian
06/26/2024, 5:23 AM@parameterize(
load_data_xlsx={"fileytpe": value("xlsx")},
load_data_csv={"fileytpe": value("csv")},
load_data_json={"fileytpe": value("json")},
)
def load_data(
path: str | pathlib.Path, filetype: str
) -> tuple[pd.DataFrame, dict]:
f = filetype
match filetype:
case "xlsx":
reader = PandasExcelReader(path)
_type = pd.DataFrame
case "csv":
reader = PandasCSVReader(path)
_type = pd.DataFrame
case "json":
reader = JSONDataLoader(path)
_type = dict
data, metadata = reader.load_data(_type)
return data, metadata
Is it possible to create a "single" load_data
function that I can reuse throughout my code? I understand now that using the modules from hamilton.plugins
should not be accessed directly.