Volker Lorrmann
04/12/2024, 8:25 AMfrom hamilton.function_modifiers import parameterize, value, source
from hamilton.htypes import Parallelizable
import glob
import os
PARAMS = {
"orders": {"path": value("src/orders")},
"products": {"path": value("src/products")},
"sap": {"path": value("src/sap")},
}
@parameterize(**PARAMS)
def src_path(
path: str,
) -> Parallelizable[str]:
src = glob.glob(os.path.join(path, "*"))
for new_file in src:
yield new_file
@parameterize(
filetype_orders=dict(src_path=source("orders")),
filetype_products=dict(src_path=source("products")),
filetype_sap=dict(src_path=source("sap")),
)
def filetype(src_path: str) -> str:
if src_path.endswith(".json"):
print(f"{src_path}: I am a json file")
else:
print(f"{src_path}: I am a csv file")
return src_path
if _name_ == "_main_":
from hamilton import driver
from hamilton.execution import executors
import _main_ as debug_parallel
remote_executor = executors.MultiThreadingExecutor(max_tasks=20)
dr = (
driver.Builder()
.with_modules(debug_parallel)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(remote_executor)
.build()
)
# dr.display_all_functions()
final_vars = ["filetype_orders", "filetype_products", "filetype_sap"]
inputs = dict()
dr.execute(final_vars=final_vars, inputs=inputs)
Volker Lorrmann
04/12/2024, 9:24 AMfrom hamilton.function_modifiers import parameterize, value, source
from hamilton.htypes import Parallelizable, Collect
import glob
import os
PARAMS = {
"orders": {"path": value("src/orders")},
"products": {"path": value("src/products")},
"sap": {"path": value("src/sap")},
}
@parameterize(**PARAMS)
def src_path(
path: str,
) -> Parallelizable[str]:
src = sorted(glob.glob(os.path.join(path, "*")))
for t in src:
yield t
@parameterize(
filetype_orders=dict(src_path=source("orders")),
filetype_products=dict(src_path=source("products")),
filetype_sap=dict(src_path=source("sap")),
)
def filetype(src_path: str) -> str:
if src_path.endswith(".json"):
print(f"{src_path}: I am a json file")
return "json"
else:
print(f"{src_path}: I am a csv file")
return "csv"
@parameterize(
filetypes_orders=dict(filetype=source("filetype_orders")),
filetypes_products=dict(filetype=source("filetype_products")),
filetypes_sap=dict(filetype=source("filetype_sap")),
)
def all_filetype(filetype: Collect[str]) -> list[str]:
filetypes = []
for filetype_ in filetype:
filetypes.append(filetype_)
return filetypes
if __name__ == "__main__":
from hamilton import driver
from hamilton.execution import executors
import __main__ as debug_parallel
remote_executor = executors.MultiThreadingExecutor(max_tasks=20)
dr = (
driver.Builder()
.with_modules(debug_parallel)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(remote_executor)
.build()
)
# dr.display_all_functions()
final_vars = [
"filetypes_orders",
"filetypes_products",
"filetypes_sap",
] # , "filetype_products", "filetype_sap"]
inputs = dict()
dr.execute(final_vars=final_vars, inputs=inputs)
Elijah Ben Izzy
04/12/2024, 1:37 PM