Slackbot
11/22/2023, 9:10 PMStefan Krawczyk
11/22/2023, 10:37 PMParallelizable
and Collect
, are all nodes in between parallel?
Yes. To clarify, parallel in the sense, the chain of functions from Parallelizable
until Collect
could be run in parallel over the âitemsâ that the Parallelizable function returns.
> 1. Does @config
support chaining?
> a. If not can you do multiple? on one node?
What do you mean? You can stack a few decorators one on top of the other, but not always. Do you have an example of what youâre thinking of?Seth Stokes
11/22/2023, 10:46 PMbackfilled_df
node before Parallizable and I use it as an input to where the Collect occurs.
# before parallelizble
def backfilled_df() -> pd.DataFrame:
backfilled_df = pd.read_csv()
print(len(backfilled_df))
return backfilled_df
# some parallelized nodes here
def transformed_df(a: pd.Series, b: pd.Series) -> pd.DataFrame:
...
@config.when(src="src_1", first_run=True)
def collected_df__first_run(
transformed_df: Collect[pd.DataFrame],
backfilled_df: pd.DataFrame
) -> pd.DataFrame:
new_df = pd.concat(transformed_df, axis="rows)
return pd.concat([new_df, backfilled_df], axis="rows")
What appears to be happening is that backfilled_df
node is printing for each file getting parallelized but only returning backfilled_df
once.
I only want it once, so the behavior is correct, but idk why its printing.Stefan Krawczyk
11/22/2023, 10:48 PMSeth Stokes
11/22/2023, 10:50 PM@config.when(run="parallel")
def file_path__parallel(files: List[str]) -> Parallelizable[str]:
for file in files:
print(file)
yield file
Seth Stokes
11/22/2023, 10:56 PM@extract_columns(*["a","b"])
Stefan Krawczyk
11/22/2023, 11:08 PMStefan Krawczyk
11/22/2023, 11:15 PMbackfilled_df
?
E.g.
def backfilled_df(dummy_input: str) -> pd.DataFrame:
backfilled_df = pd.read_csv()
print(len(backfilled_df))
return backfilled_df
Stefan Krawczyk
11/22/2023, 11:15 PMStefan Krawczyk
11/22/2023, 11:15 PMSeth Stokes
11/22/2023, 11:16 PMStefan Krawczyk
11/22/2023, 11:17 PMcollected_df__first_run
is only run once, so yep, itâll only be passed in once.Seth Stokes
11/22/2023, 11:19 PMSeth Stokes
11/22/2023, 11:19 PMStefan Krawczyk
11/22/2023, 11:20 PMStefan Krawczyk
11/22/2023, 11:20 PMStefan Krawczyk
11/22/2023, 11:20 PMSeth Stokes
11/22/2023, 11:21 PMdf_builder = base.PandasDataFrameResult()
adapter = base.SimplePythonGraphAdapter(df_builder)
dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config(config)
.with_modules(loaders, transforms)
.with_adapter(adapter)
.build()
)
df = dr.execute(output_nodes)
Stefan Krawczyk
11/22/2023, 11:25 PMdef back_filleddf() -> str:
print("back_filleddf was executed")
return "this is a string"
def some_input(path:str) -> str:
print("some_input was executed")
return "this is another string"
# and then modified
def parsed_html_collection(parsed_html: Collect[ParsingResult], back_filleddf: str, some_input: str) -> List[ParsingResult]:
...
Then when I run it I get: đ 1 is the image â there shouldnât be crows feet on the two inputs to the collect function. đ 2 is that back_filleddf
is somehow called twice (not 1 or 3 timesâŚ)âŚSeth Stokes
11/22/2023, 11:33 PMElijah Ben Izzy
11/22/2023, 11:38 PMElijah Ben Izzy
11/22/2023, 11:38 PMElijah Ben Izzy
11/24/2023, 9:20 PMElijah Ben Izzy
11/24/2023, 9:25 PMimport collections
import functools
from hamilton.htypes import Parallelizable, Collect
_fn_call_counter = collections.Counter()
def _track_fn_call(fn) -> callable:
@functools.wraps(fn)
def wrapped(*args, **kwargs):
_fn_call_counter[fn.__name__] += 1
return fn(*args, **kwargs)
return wrapped
def _reset_counter():
_fn_call_counter.clear()
@_track_fn_call
def not_to_repeat() -> int:
print("not_to_repeat")
return -1
@_track_fn_call
def number_to_repeat(iterations: int) -> Parallelizable[int]:
for i in range(iterations):
yield i
@_track_fn_call
def something_else_not_to_repeat() -> int:
print("something_else_not_to_repeat")
return -2
@_track_fn_call
def double(number_to_repeat: int) -> int:
print(number_to_repeat)
return number_to_repeat * 2
@_track_fn_call
def summed(double: Collect[int], not_to_repeat: int, something_else_not_to_repeat: int) -> int:
return sum(double) + not_to_repeat + something_else_not_to_repeat
dr = (
driver.Builder()
.with_modules(parallel_collect_multiple_arguments)
.enable_dynamic_execution(allow_experimental_mode=True)
.with_remote_executor(SynchronousLocalTaskExecutor())
.with_grouping_strategy(GroupByRepeatableBlocks())
.build()
)
parallel_collect_multiple_arguments._reset_counter()
res = dr.execute(["summed"], inputs={"iterations" : 10})
assert res["summed"] == 45*2-2-1
counts = parallel_collect_multiple_arguments._fn_call_counter
assert counts == {
"not_to_repeat": 1,
"number_to_repeat": 1,
"something_else_not_to_repeat": 1,
"double": 10,
"summed": 1,
}
Elijah Ben Izzy
11/24/2023, 10:45 PMElijah Ben Izzy
11/24/2023, 11:22 PM