Hi guys, can you help me with the following probl...
# hamilton-help
v
Hi guys, can you help me with the following problem? I have a parallelized node, which yields a string (source paths) and a downstream node, that uses this string as the input. However, inside this downstream node, the string seems to be a list instead. I guess it has something to do with the parallelization. Here is a simple toy example.
Copy code
from hamilton.function_modifiers import parameterize, value, source

from hamilton.htypes import Parallelizable
import glob
import os

PARAMS = {
    "orders": {"path": value("src/orders")},
    "products": {"path": value("src/products")},
    "sap": {"path": value("src/sap")},
}


@parameterize(**PARAMS)
def src_path(
    path: str,
) -> Parallelizable[str]:
    src = glob.glob(os.path.join(path, "*"))

    for new_file in src:
        yield new_file


@parameterize(
    filetype_orders=dict(src_path=source("orders")),
    filetype_products=dict(src_path=source("products")),
    filetype_sap=dict(src_path=source("sap")),
)
def filetype(src_path: str) -> str:

    if src_path.endswith(".json"):
        print(f"{src_path}: I am a json file")

    else:
        print(f"{src_path}: I am a csv file")
    return src_path


if _name_ == "_main_":
    from hamilton import driver
    from hamilton.execution import executors
    import _main_ as debug_parallel

    remote_executor = executors.MultiThreadingExecutor(max_tasks=20)

    dr = (
        driver.Builder()
        .with_modules(debug_parallel)
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_remote_executor(remote_executor)
        .build()
    )

    # dr.display_all_functions()

    final_vars = ["filetype_orders", "filetype_products", "filetype_sap"]
    inputs = dict()

    dr.execute(final_vars=final_vars, inputs=inputs)
Ok. I´ve figured it out on myself. I had to split the "filetype" node into two nodes. One, that processes every list item and a second, that collects the results of the previous node Here is the solution for toy example:
Copy code
from hamilton.function_modifiers import parameterize, value, source

from hamilton.htypes import Parallelizable, Collect
import glob
import os

PARAMS = {
    "orders": {"path": value("src/orders")},
    "products": {"path": value("src/products")},
    "sap": {"path": value("src/sap")},
}


@parameterize(**PARAMS)
def src_path(
    path: str,
) -> Parallelizable[str]:
    src = sorted(glob.glob(os.path.join(path, "*")))

    for t in src:
        yield t


@parameterize(
    filetype_orders=dict(src_path=source("orders")),
    filetype_products=dict(src_path=source("products")),
    filetype_sap=dict(src_path=source("sap")),
)
def filetype(src_path: str) -> str:

    if src_path.endswith(".json"):
        print(f"{src_path}: I am a json file")
        return "json"
    else:
        print(f"{src_path}: I am a csv file")
        return "csv"


@parameterize(
    filetypes_orders=dict(filetype=source("filetype_orders")),
    filetypes_products=dict(filetype=source("filetype_products")),
    filetypes_sap=dict(filetype=source("filetype_sap")),
)
def all_filetype(filetype: Collect[str]) -> list[str]:
    filetypes = []
    for filetype_ in filetype:
        filetypes.append(filetype_)
    return filetypes


if __name__ == "__main__":
    from hamilton import driver
    from hamilton.execution import executors
    import __main__ as debug_parallel

    remote_executor = executors.MultiThreadingExecutor(max_tasks=20)

    dr = (
        driver.Builder()
        .with_modules(debug_parallel)
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_remote_executor(remote_executor)
        .build()
    )

    # dr.display_all_functions()

    final_vars = [
        "filetypes_orders",
        "filetypes_products",
        "filetypes_sap",
    ]  # , "filetype_products", "filetype_sap"]
    inputs = dict()

    dr.execute(final_vars=final_vars, inputs=inputs)
e
Hey! Yep you got it. The lack of “Collect” makes it treat it like a list (we could have a better guard here). This should be an error case — will look into it!
👍 2