Stefan Krawczyk
03/12/2024, 4:39 PMSeth Stokes
03/12/2024, 5:12 PMStefan Krawczyk
03/12/2024, 6:10 PMTom Barber
03/15/2024, 4:31 PMTom Barber
03/15/2024, 4:34 PMmaterializers = [
to.spark(dependencies=["generate_customers"], id="custs_to_df", table_name=f"input.customers_{random_string}", spark=spark, combine=base.PandasDataFrameResult()),
to.spark(dependencies=["generate_accounts"], id="accounts_to_df", table_name=f"input.accounts_{random_string}", spark=spark, combine=base.PandasDataFrameResult()),
to.spark(dependencies=["generate_transactions"], id="transactions_to_df", table_name=f"input.transactions_{random_string}", spark=spark, combine=base.PandasDataFrameResult()),
to.spark(dependencies=["generate_aml"], id="aml_to_df", table_name=f"input.aml_{random_string}", spark=spark, combine=base.PandasDataFrameResult()),
to.spark(dependencies=["generate_entity_link_table"], id="entity_links_to_df", table_name=f"input.entity_links_{random_string}", spark=spark, combine=base.PandasDataFrameResult()),
to.spark(dependencies=["generate_entity_table"], id="entities_to_df", table_name=f"input.entities_{random_string}", spark=spark, combine=base.PandasDataFrameResult()),
]
I'd like to do something like this. But then the generate_transactions dependency depends on the generate_accounts block and the entity_link also depends on accounts and customers for example.Tom Barber
03/15/2024, 4:35 PMTom Barber
03/15/2024, 4:42 PMTom Barber
03/18/2024, 5:46 PMPengyu Chen
03/19/2024, 7:28 PM@resolve
with the code I will be attaching in this thread. I got this error however: hamilton.function_modifiers.base.InvalidDecoratorException: Dependency: <generator object <lambda>.<locals>.<genexpr> at 0x7f90febd67b0> is not a valid dependency type for group(), must be a LiteralDependency or UpstreamDependency.
.
If I replace the group()
call with group(source("cohort_month"), source("cohort_quarter"))
the issue would be resolved; however I do not want to hardcode this. I'm still pretty new to Hamilton and wondering if I could please get some help? Thanks 🙏Dhruv Sahi
03/26/2024, 11:43 AM# main.py
from typing import Optional, List
import polars as pl
from hamilton import driver, log_setup
import logging
from stages.raw.load_raw_data import RawDataStage
from stages.intermediate import transaction_clean, jurisdiction_clean
from config import get_config, PipelineConfig
class Pipeline:
def __init__(self, _cfg: PipelineConfig) -> None:
self._cfg = _cfg
self.raw_data_stage = RawDataStage(_cfg)
self.dr = driver.Driver(
{},
self.raw_data_stage,
adapter=base.SimplePythonGraphAdapter(base.DictResult)
)
def run(self) -> None:
result = self.dr.execute(final=vars=[self.raw_data_stage])
if __name__ == "__main__":
_cfg = get_config()
pipeline = Pipeline(_cfg=_cfg)
pipeline.run()
My load_raw_data.py looks like this:
from typing import List
import polars as pl
from hamilton.function_modifiers import tag
import logging
from config import PipelineConfig
logger = logging.getLogger(__name__)
INPUT_TABLES = [
"table_1",
"table_2",
]
class RawDataStage:
def __init__(self, config: PipelineConfig) -> None:
self._cfg = config
@staticmethod
def load_parquet(paths: List[str]) -> pl.LazyFrame:
return pl.scan_parquet(paths)
def run(self):
data = {}
for table in INPUT_TABLES:
file_paths = self._cfg.file_paths
data[table] = getattr(self, f"_read_{table}")(paths=file_paths)
return data
@tag(stage="load", input_type="table_1")
def table_1(self, paths=List[str]) -> pl.LazyFrame:
return self.load_parquet(paths=paths)
@tag(stage="load", input_type="table_2")
def table_2(self, paths=List[str]) -> pl.LazyFrame:
return self.load_parquet(paths=paths)
Seth Stokes
03/26/2024, 4:02 PM@extract_columns(
*COLS_TO_EXPOSE
)
@pipe(
step(_filter_to_src_trades),
step(_filter_to_unrealized)
)
@config.when(env="test")
def raw_data_src__test(
cob_date: datetime,
cache_dir: str = r"C:\codebase\data"
) -> pd.DataFrame:
return query(cob_date)
to
@extract_columns(
*COLS_TO_EXPOSE
)
@config.when(env="test")
def raw_data_src__test(
cob_date: datetime,
cache_dir: str = r"C:\codebase\data"
) -> pd.DataFrame:
raw = query(cob_date)
return raw.pipe(_filter_to_src_trades).pipe(_filter_to_unrealized)
Asking because of the following error.
ValueError: Error: raw_data_src is expecting raw_data_src.with_filter_to_unrealized:<class 'datetime.datetime'>, but found raw_data_src.with_filter_to_unrealized:<class 'pandas.core.frame.DataFrame'>.
Hamilton does not consider these types to be equivalent. If you believe they are equivalent, please reach out to the developers. Note that, if you have types that are equivalent for your purposes, you can create a graph adapter that checks the types against each other in a more lenient manner
Stefan Krawczyk
03/26/2024, 4:32 PMRoy Kid
03/27/2024, 12:44 PMto.pickle
. I found the materilizer already added into graph, but it can not get node.version
. The materializer's version is None, so it can not be sorted. Here is a snapshot:Seth Stokes
03/27/2024, 7:41 PMsaved_formatted_data_output
is typed as a dict
?
@save_to.excel(
path=source("path_to_save"),
output_name_="saved_formatted_data_output",
index=False
)
@config.when(data_product="gui")
@schema.output(
("Attribute_A", "str"),
("Attribute_B", "str"),
("Attribute_C", "str"),
)
def formatted_data_output__gui(data: pd.DataFrame) -> pd.DataFrame:
...
hamilton.function_modifiers.base.InvalidDecoratorException: Node saved_formatted_data_output has type typing.Dict[str, typing.Any] which is not a registered type for a dataset. Registered types are {'pandas': <class 'pandas.core.frame.DataFrame'>, 'polars': <class 'polars.dataframe.frame.DataFrame'>}. If you found this, either (a) ensure you have the right package installed, or (b) reach out to the team to figure out how to add yours.
Seth Stokes
03/28/2024, 10:05 PM.with_remote_executor(SynchronousLocalTaskExecutor())
execute more than once ~5/6 times?
Given len(accounts) == 1
.
dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({
"cob_date": datetime(2023,3,25),
"portfolio_config": portfolio_config,
"execution": "parallel",
"accounts": ["account_1"] # the `Parallelizable` part
})
.with_remote_executor(SynchronousLocalTaskExecutor())
.with_modules(
data_loader, # accounts are broken up here to load each
transfrom, # performed after collect
)
.build()
)
Stefan Krawczyk
04/02/2024, 4:35 PMStefan Krawczyk
04/09/2024, 4:32 PMFefun
04/09/2024, 8:44 PM@pipe
decorator to define optional transformations to run against my PySpark DataFrame.
I have one transformation that takes as input parameter a variable that I wish would come from the inputs
dictionary. The one I pass to the driver when I execute the DAG.
E.g.,
datasets = dr.execute(datasets_to_request, inputs={"start": "2024-01-01"})
I would like to make the value of the start
key available to the function ran in the @pipe
step I defined. Like so:
@pipe(
step(
do_something,
input1=[...], # This input can be defined in the source code here.
start=start, # However here I would like to use the value of start passed as input, so 2024-01-01
)
...
Is that possible? Or would I perhaps need to put that variable in the config
passed to the Driver?
Disclaimer: Apologies, I haven't actually tried this yet. It might be that by trying I would have found the answer. But due to stupid constraints I couldn't find time to try this out. Yet I sort of need to know if that's possible, hence my post.Volker Lorrmann
04/12/2024, 8:25 AMfrom hamilton.function_modifiers import parameterize, value, source
from hamilton.htypes import Parallelizable
import glob
import os
PARAMS = {
"orders": {"path": value("src/orders")},
"products": {"path": value("src/products")},
"sap": {"path": value("src/sap")},
}
@parameterize(**PARAMS)
def src_path(
path: str,
) -> Parallelizable[str]:
src = glob.glob(os.path.join(path, "*"))
for new_file in src:
yield new_file
@parameterize(
filetype_orders=dict(src_path=source("orders")),
filetype_products=dict(src_path=source("products")),
filetype_sap=dict(src_path=source("sap")),
)
def filetype(src_path: str) -> str:
if src_path.endswith(".json"):
print(f"{src_path}: I am a json file")
else:
print(f"{src_path}: I am a csv file")
return src_path
if _name_ == "_main_":
from hamilton import driver
from hamilton.execution import executors
import _main_ as debug_parallel
remote_executor = executors.MultiThreadingExecutor(max_tasks=20)
dr = (
driver.Builder()
.with_modules(debug_parallel)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(remote_executor)
.build()
)
# dr.display_all_functions()
final_vars = ["filetype_orders", "filetype_products", "filetype_sap"]
inputs = dict()
dr.execute(final_vars=final_vars, inputs=inputs)
Alan Morgan
04/16/2024, 4:15 PMdef get_range() -> (int, int): return (1, 3)
And when I ran my pipeline it duplicated the data into 2 rows:
other_column get_range
Some Text 1
Some Text 3
But I wanted just one row:
other_column get_range
Some Text (1, 3)
Is there any way to do that, or am I designing my pipeline wrong?Alex Pavlides
04/18/2024, 9:02 AMArtem
04/20/2024, 5:12 PM# functions.py - declare and link your transformations as functions....
import pandas as pd
def a(input: pd.Series) -> pd.Series:
return input % 7
def b(a: pd.Series) -> pd.Series:
return a * 2
# And run them!
import functions
from hamilton import driver
dr = driver.Driver({}, functions)
result = dr.execute(
['a', 'b'],
inputs={'input': pd.Series([1, 2, 3, 4, 5])}
)
print(result)
I want to define my functions as
def a(input: float) -> float:
return input % 7
def b(a: float) -> float:
return a * 2
and apply them to the same input and get same output as in the example above. My goal is to avoid using pandas in the functions and be able to apply them in real-time production environment (which does not use pandas), apply them to pandas dataframes and spark dataframes in notebooks when developing those functions.
I would very appreciate help and recommendations. This thing is blocking me to make a decision to use Hamilton for our feature development. Thanks.Seth Stokes
04/23/2024, 2:41 PMpipe
filter step
to a node, could this with_filter_criteria
node be accessed from the driver?
E.g.
def _filter_criteria(df: pd.DataFrame, col: str) -> pd.DataFrame:
return df[df[col] == "US"]
@pipe(step(_filter_criteria, col="region"))
def filtered_data(raw_data: pd.DataFrame) -> pd.DataFrame:
# some work with raw_data
Pardon.... just had to change
dr.execute(["with_filter_criteria"])
to
dr.execute(["filtered_data.with_filter_criteria"])
Elijah Ben Izzy
04/23/2024, 4:34 PMRoy Kid
04/29/2024, 12:45 PMparameterize_extract_columns
, and I got an error about result building I think. My code like this:
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda: parameterize_extract_columns(
*[ParameterizedExtract(tuple(exp.name), {"exp": value(exp)}) for exp in self]
),
)
def mapper(exp: Experiment) -> dict:
os.chdir(exp["run_dir"])
dr = driver.Builder().with_modules(*modules).build()
result = dr.materialize(*materializers, inputs=exp.param)
# result = dr.execute(inputs=exp.param, final_vars=["load_sin"])
return result
and the driver is like this:
dr = (
driver.Builder()
.with_modules(tracker)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_execution_manager(execution_manager)
.with_adapter(base.SimplePythonGraphAdapter(base.DictResult()))
.with_config({settings.ENABLE_POWER_USER_MODE: True})
.build()
)
os.chdir(root)
results = dr.execute(
final_vars=[name for name in parameters],
)
I don't know how to resiger types to result builder, or parameterize_extract_columns
only support pd.dataframe?Fefun
04/29/2024, 1:17 PMor
condition with the .when()
construct?
For instance, suppose I want to run _some_function1()
when either param1=True
or param2=True
, and I have the following base code:
from hamilton.function_modifiers import step, pipe, source
from my_module import _some_function1, _some_function2
@pipe(
step(_some_function1, node1=source("node1")).named("add_1", namespace=None).when(param1=True),
step(_some_function2, node2=source("node2")).named("add_2", namespace=None).when(param2=True),
)
def some_node(df: 1, param1: bool, param2: bool) -> int:
return df
Where:
- param1
controls when _some_function1()
should run.
- param2
controls when _some_function2()
should run.
Both functions add columns to the same dataframe.
However _some_function2()
has a dependency on the columns generated by _some_function1()
So, when param1 == True
, then _some_function1()
runs and that's it.
However, when param2 == True
, then both _some_function1()
and then _some_function2()
need to run.
I found a way to make this work by doing this:
from hamilton.function_modifiers import step, pipe, source
from my_module import _some_function1, _some_function2
@pipe(
step(_some_function1, node1=source("node1")).named("add_1", namespace=None).when(param1=True),
step(_some_function1, node1=source("node1")).named("add_1_and_2", namespace=None).when(param2=True),
step(_some_function2, node2=source("node2")).named("add_2", namespace=None).when(param2=True),
)
def some_node(df: 1, param1: bool, param2: bool) -> int:
return df
And then in _some_function1()
I added a first check to see if it already ran (by looking the columns available).
But I get the feeling that I'm stretching a bit too far with how the @pipe decorator should be used 😅Stefan Krawczyk
04/30/2024, 4:32 PMSeth Stokes
05/01/2024, 10:52 PMn
excel files to process each corresponding to a day's snapshot of data,
I could load them with Parallelizable
and Collect
yielding over filepaths.
But each file has m=3
sheets that i need to load as seperate data sets.
The Parallelizable
works on the n
items but not the m
sheets.
Is there a hamiltonian idiom for that yet?Seth Stokes
05/02/2024, 3:50 PMSynchronousLocalTaskExecutor
(with Parallelizable/Collect
) pretty hit or miss?
Or do I jut happen to use it on odd file types (xlsb
, xlsm
).
For some reason it takes quite a long time >15 for one file using the this executor.
I scaled it down to 1 file since the full batch was taking alarmingly long as a sanity check.
Could the speed issue be?
1. The file type (xlsm
, xlsb
)
a. The same transforms ran in <1min for a single file without the executor.
2. The SynchronousLocalTaskExecutor
needs a bit more understanding/setup to perform properly.Seth Stokes
05/02/2024, 9:32 PMoverrides
.
I am trying to use overrides
but the driver is saying I am missing input values from nodes higher up the dag.