This message was deleted.
# hamilton-help
s
This message was deleted.
t
You could rewrite it as
Copy code
def cob_date(raw_df: pd.DataFrame) -> datetime:
  return ...
since the name
raw_df
is what will be available in parallel. Otherwise, to collect the parallel nodes, I suggest writing it this way:
Copy code
def norm_df(cob_date: datetime, other_transform: pd.Series) -> pd.DataFrame:
  return ...

def norm_df_collection(norm_df: Collect[pd.DataFrame]) -> pd.DataFrame:
  return ...
This way
norm_df_collection()
only depends on one
Collect[]
parameter. This could allow you to inspect the
norm_df
of each branch if you wanted to!
s
Copy code
def raw_df(files: List[str]) -> Parallelizable[pd.DataFrame]:
    for file in files:
        yield pd.read_csv(file)

def cob_date(files: List[str]) -> Parallelizable[datetime]:
    for file in files:
        yield pd.to_datetime(re.findall('20\d{6}', file)[0])
So to access the individual file name, could I redefine it as above?
I have it set up as you advised, but now the
Parallelizable[pd.DataFrame]
in a individual downstream transform appears to be interpretted as a list BEFORE I use
Collect
.
AttributeError: 'list' object has no attribute 'loc'
t
Ooh, I thought
cob_date()
was applied on the dataframe. If you want to iterate over the files you could do:
Copy code
def file_path(files: List[str]) -> Parallelizable[str]:
  for file in files:
    yield file

def raw_df(file_path: str) -> pd.DataFrame:
  return pd.read_csv(file)

def cob_date(file_path: str) -> datetime:
  return pd.to_datetime(re.findall('20\d{6}', file_path)[0])

# ... other funcs

def norm_df(cob_date: datetime, other_transform: pd.Series) -> pd.DataFrame:
  return ...

def norm_df_collection(norm_df: Collect[pd.DataFrame]) -> pd.DataFrame:
  return ...
The parallel branches will be over files. For each file, you will create a
norm_df
and collect them as a a list in
norm_df_collection
If you encounter another error, sharing the full error trace could help me debug with you!
s
Ok this makes more sense. Thank you! Will send stacktrace shortly
Copy code
Traceback (most recent call last):
  File "C:\codebase\data-project\etl\src\run.py", line 99, in <module>
    get_sg_fees_parallel(['norm_fees_parallel'], visualize=True)
  File "C:\codebase\data-project\etl\src\run.py", line 85, in get_sg_fees_parallel
    df = dr.execute(output_nodes)
  File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\driver.py", line 410, in execute
    raise e
  File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\driver.py", line 403, in execute
    outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
  File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\driver.py", line 498, in raw_execute
    return self.graph_executor.execute(
  File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\driver.py", line 230, in execute
    executors.run_graph_to_completion(execution_state, self.execution_manager)
  File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\execution\executors.py", line 342, in run_graph_to_completion
    next_task = execution_state.release_next_task()
  File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\execution\state.py", line 498, in release_next_task
    return self.bind_task(self.task_queue.popleft())
  File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\execution\state.py", line 474, in bind_task
    dynamic_inputs = {**dynamic_inputs, **self.result_cache.read(input_vars_to_read)}
  File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\execution\state.py", line 113, in read
    raise KeyError(f"Key {formatted_key} not found in cache")
KeyError: 'Key norm_fees not found in cache'
Can you remind me when it it necessary to add a
__config
to a function name? Is it always necessary even with
@config.when
?
e
Its only necessary if the python function names will collide. E.G. if two separate
@config
are on the same named functions in the same file…
s
Would this be valid?
Copy code
@config.when(cond="this")
def function(): pass


@config.when_not(cond="this")
def function(): pass
e
So, no, they’ll overwrite each other, and Hamilton won’t be able to read them
Instead, you want
Copy code
def function__this(...)

def function__not_this(...)
👍 1