This message was deleted.
# hamilton-help
s
This message was deleted.
👀 1
s
> When using
Parallelizable
and
Collect
, are all nodes in between parallel? Yes. To clarify, parallel in the sense, the chain of functions from
Parallelizable
until
Collect
could be run in parallel over the “items” that the Parallelizable function returns. > 1. Does
@config
support chaining? > a. If not can you do multiple? on one node? What do you mean? You can stack a few decorators one on top of the other, but not always. Do you have an example of what you’re thinking of?
s
RE: Parallelizable OK, I have
backfilled_df
node before Parallizable and I use it as an input to where the Collect occurs.
Copy code
# before parallelizble
def backfilled_df() -> pd.DataFrame:
    backfilled_df = pd.read_csv()
    print(len(backfilled_df))
    return backfilled_df

# some parallelized nodes here
def transformed_df(a: pd.Series, b: pd.Series) -> pd.DataFrame:
    ...

@config.when(src="src_1", first_run=True)
def collected_df__first_run(
    transformed_df: Collect[pd.DataFrame],
    backfilled_df: pd.DataFrame
    ) -> pd.DataFrame:
    new_df = pd.concat(transformed_df, axis="rows)
    return pd.concat([new_df, backfilled_df], axis="rows")
What appears to be happening is that
backfilled_df
node is printing for each file getting parallelized but only returning
backfilled_df
once. I only want it once, so the behavior is correct, but idk why its printing.
s
what’s the parallelizeable function?
s
Copy code
@config.when(run="parallel")
def file_path__parallel(files: List[str]) -> Parallelizable[str]:
    for file in files:
        print(file)
        yield file
then read file and
@extract_columns(*["a","b"])
s
ok I’m trying to recreate an example on my side
@Seth Stokes I think you might have found a 🐛 . Why don’t you add an input parameter to
backfilled_df
? E.g.
Copy code
def backfilled_df(dummy_input: str) -> pd.DataFrame:
    backfilled_df = pd.read_csv()
    print(len(backfilled_df))
    return backfilled_df
since I think with an input parameter it behaves as intended
but without , you’re right, it seems to get re-executed… actually maybe not … struggling to recreate it now.
s
When it gets re-executed does it only return that df once for you as well - despite printing twice?
s
yep, so
collected_df__first_run
is only run once, so yep, it’ll only be passed in once.
s
I am using an input parameter
just didnt show in the pseudo-code, appologies
s
ah — hmm --- well then
I got nothing
what executor are you using?
s
Copy code
df_builder = base.PandasDataFrameResult()
    adapter = base.SimplePythonGraphAdapter(df_builder)

    dr = (
        driver.Builder()
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_config(config)
        .with_modules(loaders, transforms)
        .with_adapter(adapter)
        .build()
    )

    df = dr.execute(output_nodes)
👍 1
s
Okay I modified https://hub.dagworks.io/docs/Users/zilto/webscraper/ and added:
Copy code
def back_filleddf() -> str:
    print("back_filleddf was executed")
    return "this is a string"

def some_input(path:str) -> str:
    print("some_input was executed")
    return "this is another string"

# and then modified
def parsed_html_collection(parsed_html: Collect[ParsingResult], back_filleddf: str, some_input: str) -> List[ParsingResult]:
   ...
Then when I run it I get: 🐛 1 is the image — there shouldn’t be crows feet on the two inputs to the collect function. 🐛 2 is that
back_filleddf
is somehow called twice (not 1 or 3 times…)…
s
@Elijah Ben Izzy Could this have to do with the slow parallelization I showed you yesterday?
e
Yep, that looks like a bug. And… quite possibly.
Should hopefully be an easyish fix
OK, I’m debugging this. Weird behavior. Have it so that… • half of the time it’s running twice • half of the time its running 3 times
Here’s my repro with a bit of information/debugging stuff injected:
Copy code
import collections
import functools

from hamilton.htypes import Parallelizable, Collect

_fn_call_counter = collections.Counter()


def _track_fn_call(fn) -> callable:
    @functools.wraps(fn)
    def wrapped(*args, **kwargs):
        _fn_call_counter[fn.__name__] += 1
        return fn(*args, **kwargs)

    return wrapped


def _reset_counter():
    _fn_call_counter.clear()


@_track_fn_call
def not_to_repeat() -> int:
    print("not_to_repeat")
    return -1


@_track_fn_call
def number_to_repeat(iterations: int) -> Parallelizable[int]:
    for i in range(iterations):
        yield i


@_track_fn_call
def something_else_not_to_repeat() -> int:
    print("something_else_not_to_repeat")
    return -2


@_track_fn_call
def double(number_to_repeat: int) -> int:
    print(number_to_repeat)
    return number_to_repeat * 2


@_track_fn_call
def summed(double: Collect[int], not_to_repeat: int, something_else_not_to_repeat: int) -> int:
    return sum(double) + not_to_repeat + something_else_not_to_repeat
Copy code
dr = (
        driver.Builder()
        .with_modules(parallel_collect_multiple_arguments)
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_remote_executor(SynchronousLocalTaskExecutor())
        .with_grouping_strategy(GroupByRepeatableBlocks())
        .build()
    )
    parallel_collect_multiple_arguments._reset_counter()
    res = dr.execute(["summed"], inputs={"iterations" : 10})
    assert res["summed"] == 45*2-2-1
    counts = parallel_collect_multiple_arguments._fn_call_counter
    assert counts == {
        "not_to_repeat": 1,
        "number_to_repeat": 1,
        "something_else_not_to_repeat": 1,
        "double": 10,
        "summed": 1,
    }
We have a fix! Grouped in with a few others as separate commits. One line change (although it doesn’t look it as I’ve added tests/done a little bit of related cleanup): https://github.com/DAGWorks-Inc/hamilton/pull/545/commits/79127d44dc7dee9af75b32b2be11cf612df224a4. The TL;DR is we track the state of each task. The tasks go from “INITIALIZED” when they’re created to “QUEUED” when all dependencies are completed. We’d put them on the queue initially and forgot to mark them as QUEUED, which made it so the state tracker decided to run them again. We could probably clean this up by a multi-queue system without external state tracking, but I think this fix is pretty simple and there are some architectural trade-offs that I haven’t scoped out yet. That said @Seth Stokes I’d be surprised if this is responsible for the snlowness you saw (due to the double-run), but its definitely possible.
🙌 1
@Stefan Krawczyk note this is not related to the issue you saw — that’s separate and I documented it here: https://github.com/DAGWorks-Inc/hamilton/issues/556