Hi! I want to extract data from an URL, save it lo...
# hamilton-help
j
Hi! I want to extract data from an URL, save it locally, load the data using the
PandasExcelReader
, preprocess the dataframe, and save it again to a destination of my choice. I have attached a sample code without the actual URLs
Copy code
# -*- coding: utf-8 -*-

import io
import pathlib
import zipfile

import pandas as pd
import requests
from hamilton import driver
from hamilton.plugins.pandas_extensions import PandasExcelReader


def load_data(url: str, destination_folder: str | pathlib.Path) -> None:
    # Send a GET request to the URL and get the response
    response = requests.get(url)
    response.raise_for_status()
    if isinstance(destination_folder, str):
        destination_folder = pathlib.Path(destination_folder)

    with zipfile.ZipFile(io.BytesIO(response.content)) as zip_ref:
        # Extract the contents of the zip file to the temporary folder
        zip_ref.extractall(destination_folder)


def read_excel(path: pathlib.Path) -> pd.DataFrame:
    xlsx_reader = PandasExcelReader(path)
    raw_df, metadata = xlsx_reader.load_data(pd.DataFrame)
    return raw_df


def preprocess_df(raw_df: pd.DataFrame) -> pd.DataFrame:
    df = raw_df
    return df


def mean(df: pd.DataFrame) -> pd.Series:
    return df.loc[:, "Column1"].mean()


if __name__ == "__main__":
    import importlib

    from hamilton import driver

    functions = importlib.import_module(__name__)
    # define the path variables
    CWD = pathlib.Path().cwd()
    DATA_PATH = CWD / "data"
    FIGURES_PATH = CWD / "figs"

    raw_data_path = DATA_PATH / "raw"
    procurement_data_path = raw_data_path / "data.xlsx"

    VARS = ["load_data", "read_excel", "preprocess_df", "mean"]

    dr = driver.Driver(
        {
            "path": procurement_data_path,
            "url": "<https://some.url>",
            "destination_folder": DATA_PATH,
        },
        functions,
    )
    # a = dr.execute(VARS)
    dr.display_all_functions(
        FIGURES_PATH / "graph.png",
        show_legend=True,
        orient="LR",
    )
When I uncomment the execution, I get the following error:
Copy code
Traceback (most recent call last):
  File "/home//software/data-analysis/pipeline.py", line 64, in <module>
    a = dr.execute(VARS)
        ^^^^^^^^^^^^^^^^
  File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 564, in execute
    raise e
  File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 554, in execute
    outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 644, in raw_execute
    Driver.validate_inputs(
  File "/home//.cache/pypoetry/virtualenvs/data-analysis-GDaM7nkF-py3.12/lib/python3.12/site-packages/hamilton/driver.py", line 525, in validate_inputs
    raise ValueError(error_str)
ValueError: 2 errors encountered: 
  Error: Required input df not provided for nodes: ['mean'].
  Error: Required input raw_df not provided for nodes: ['preprocess_df'].
Also, the DAG looks not as I have expected. I find it really hard to use the PandasExcelReader since there is no proper documentation on how to use this. I am really thankful for any help!
This is what the graph looks like. However, I expected it to be a chain since I first need to load the data, read it, preprocess it, perform some actions, and then save it
t
Hi Julian! Here are two things to unblock you: writing the DAG and using Hamilton plugins. ## 1 In Hamilton, you use function names and parameters to connect nodes. Here's how I would rewrite the DAG. I only changed the function parameter names.
Copy code
def read_excel(path: pathlib.Path) -> pd.DataFrame:
    xlsx_reader = PandasExcelReader(path)
    raw_df, metadata = xlsx_reader.load_data(pd.DataFrame)
    return raw_df


def preprocess_df(read_excel: pd.DataFrame) -> pd.DataFrame:
    df = raw_df
    return df


def mean(preprocess_df: pd.DataFrame) -> pd.Series:
    return df.loc[:, "Column1"].mean()
In Hamilton, it's also useful to think of functions in terms of "nouns" rather than "verbs". I would personally rename the functions: •
read_excel -> raw_df
preprocess_df -> preprocessed_df
## 2 Files under
hamilton.plugins
with names
FOO_extensions.py
aren't meant to be directly imported. The
PandasExcelReader
is an internal class (hence no extensive documentation) that powers
from_.excel()
or the decorator
@load_from.excel()
. I suggest reading the section on materialization and see if it unblocks you! Depending on your use case, it might be simpler / more transparent to use pandas.read_excel(). Using the
PandasExcelReader
with the methods I mentioned above is just a matter of convenience
j
Thank you for the fast answer, that helped me a lot! I am wondering: If I want to create a function that is responsible for loading data from various sources, is a construct like this possible?
Copy code
@parameterize(
    load_data_xlsx={"fileytpe": value("xlsx")},
    load_data_csv={"fileytpe": value("csv")},
    load_data_json={"fileytpe": value("json")},
)
def load_data(
    path: str | pathlib.Path, filetype: str
) -> tuple[pd.DataFrame, dict]:
    f = filetype
    match filetype:
        case "xlsx":
            reader = PandasExcelReader(path)
            _type = pd.DataFrame
        case "csv":
            reader = PandasCSVReader(path)
            _type = pd.DataFrame
        case "json":
            reader = JSONDataLoader(path)
            _type = dict
    data, metadata = reader.load_data(_type)
    return data, metadata
I see that I should not access
hamilton.plugin
functions directly. Can I use parameterize in combination with load_from or materialization? The code above gives me the following error:
hamilton.function_modifiers.base.InvalidDecoratorException: Parametrization is invalid: the following parameters don't appear in the function itself: fileytpe
To add to this: I have attached an minimum working example following this guide: https://github.com/DAGWorks-Inc/hamilton/tree/main/examples/pandas/materialization
Copy code
# -*- coding: utf-8 -*-

# --- START IMPORT SECTION
import logging

from hamilton import contrib

logger = logging.getLogger(__name__)

with contrib.catch_import_errors(__name__, __file__, logger):
    import pathlib

    import pandas as pd


def raw_df(input: pd.DataFrame) -> pd.DataFrame:
    return input


def processed_data(raw_df: pd.DataFrame) -> pd.Series:
    return raw_df.loc[:, "A"]


# def data_processing(raw_df):
#     pass


if __name__ == "__main__":
    import importlib

    from hamilton import base, driver
    from hamilton.io.materialization import from_, to

    C = "W"
    P = "0"
    F = "B"

    CWD = pathlib.Path().cwd()
    DATA_PATH = CWD / "data"

    excel_data_path = DATA_PATH / f"{C}/{P}/{F}.xlsx"

    # define hamilton adapter
    dict_builder = base.DictResult()
    adapter = base.SimplePythonGraphAdapter(dict_builder)

    # define hamilton materializers
    materializers = [
        from_.excel(target="raw_df", path=f"{excel_data_path.resolve()}")
    ]

    # define hamilton functions
    functions = importlib.import_module(__name__)

    dr = (
        driver.Builder()
        .with_adapter(adapter)
        .with_materializers(materializers)
        # .with_modules(functions)
        .build()
    )

    additional_vars = ["processed_data"]

    mat_results = dr.materialize(
        additional_vars=additional_vars,
    )
But I get the following error:
raise ValueError(f"Unknown nodes [{missing_vars_str}] requested. Check for typos?")
ValueError: Unknown nodes [processed_data] requested. Check for typos?
My hamilton version is 1.67.0, my Python is 3.12, and I am working on WSL Ubuntu 22.04.
t
Hi Julian! I'll start with answering your latest message. I see a lot of Hamilton concepts and features used here, so I just want to validate the goal of the code first. From my understanding, the pipeline should 1. loads data as a dataframe 2. transforms the dataframe 3. do something with the results returned by the Driver Here's the "shortest" code to achieve this with some comments
Copy code
import pandas as pd

# functions defined before `if __name__ == "__main__":`
# are part of the DAG

def raw_df(input: pd.DataFrame) -> pd.DataFrame:
    return input


def processed_data(raw_df: pd.DataFrame) -> pd.Series:
    return raw_df.loc[:, "A"]


if __name__ == "__main__":
    import pathlib
    from hamilton import driver
    # we will need `from_` to load data
    from hamilton.io.materialization import from_

    # here, `__main__` refers to the current Python module.
    # It allows us to load the functions raw_df() and processed_data()
    import __main__ 

    # set your excel Path
    C = "W"
    P = "0"
    F = "B"
    CWD = pathlib.Path().cwd()
    DATA_PATH = CWD / "data"
    excel_data_path = DATA_PATH / f"{C}/{P}/{F}.xlsx"

    # define a list of materializers (with only 1 item)
    materializers = [
        # define a materalizer that loads from an excel file
        from_.excel(
            # `target` is the name of a node, which needs to be a `pd.DataFrame`
            # that will receive the loaded value.
            # `raw_df` would be incorrect because there's already a function with that name
            # `input` is correct because it's an "input" of the DAG. In other words, there's
            # no function with that name / defining that value
            target="input",  
            path=str(excel_data_path.resolve())  # `path` needs to be a string
        )
    ]

    # create the `Driver` using the `Builder`
    dr = (
        driver.Builder()
        # pass the module `__main__` (the current one) which contains raw_df() and processed_data()
        .with_modules(__main__)
        # pass the materializers to the driver; the `*` is required to unpack the list
        # reference: <https://www.pythontutorial.net/python-basics/python-unpack-list/>
        .with_materializers(*materializers)
        .build()  # this build the Driver
    )

    # use `dr.execute()` with the name of the nodes we want returned
    # we don't need to use `.materialize()` since we already use `.with_materializers()`
    results = dr.execute(["processed_data"])

    # this will be the computed pd.Series
    results["processed_data"]
Let me know if that's helpful and I'm happy to continue the conversation if you have more questions 🙂
j
Hi Thierry, thank you for the answers, that helped me out a lot! It works perfectly fine. Regarding this concept:
Copy code
@parameterize(
    load_data_xlsx={"fileytpe": value("xlsx")},
    load_data_csv={"fileytpe": value("csv")},
    load_data_json={"fileytpe": value("json")},
)
def load_data(
    path: str | pathlib.Path, filetype: str
) -> tuple[pd.DataFrame, dict]:
    f = filetype
    match filetype:
        case "xlsx":
            reader = PandasExcelReader(path)
            _type = pd.DataFrame
        case "csv":
            reader = PandasCSVReader(path)
            _type = pd.DataFrame
        case "json":
            reader = JSONDataLoader(path)
            _type = dict
    data, metadata = reader.load_data(_type)
    return data, metadata
Is it possible to create a "single"
load_data
function that I can reuse throughout my code? I understand now that using the modules from
hamilton.plugins
should not be accessed directly.