Slackbot
10/25/2023, 3:47 PMThierry Jean
10/25/2023, 3:53 PMdef cob_date(raw_df: pd.DataFrame) -> datetime:
return ...
since the name raw_df
is what will be available in parallel.
Otherwise, to collect the parallel nodes, I suggest writing it this way:
def norm_df(cob_date: datetime, other_transform: pd.Series) -> pd.DataFrame:
return ...
def norm_df_collection(norm_df: Collect[pd.DataFrame]) -> pd.DataFrame:
return ...
This way norm_df_collection()
only depends on one Collect[]
parameter. This could allow you to inspect the norm_df
of each branch if you wanted to!Seth Stokes
10/25/2023, 4:17 PMdef raw_df(files: List[str]) -> Parallelizable[pd.DataFrame]:
for file in files:
yield pd.read_csv(file)
def cob_date(files: List[str]) -> Parallelizable[datetime]:
for file in files:
yield pd.to_datetime(re.findall('20\d{6}', file)[0])
So to access the individual file name, could I redefine it as above?Seth Stokes
10/25/2023, 4:32 PMParallelizable[pd.DataFrame]
in a individual downstream transform appears to be interpretted as a list BEFORE I use Collect
.
AttributeError: 'list' object has no attribute 'loc'
Thierry Jean
10/25/2023, 4:39 PMcob_date()
was applied on the dataframe. If you want to iterate over the files you could do:
def file_path(files: List[str]) -> Parallelizable[str]:
for file in files:
yield file
def raw_df(file_path: str) -> pd.DataFrame:
return pd.read_csv(file)
def cob_date(file_path: str) -> datetime:
return pd.to_datetime(re.findall('20\d{6}', file_path)[0])
# ... other funcs
def norm_df(cob_date: datetime, other_transform: pd.Series) -> pd.DataFrame:
return ...
def norm_df_collection(norm_df: Collect[pd.DataFrame]) -> pd.DataFrame:
return ...
The parallel branches will be over files. For each file, you will create a norm_df
and collect them as a a list in norm_df_collection
If you encounter another error, sharing the full error trace could help me debug with you!Seth Stokes
10/25/2023, 4:41 PMSeth Stokes
10/25/2023, 5:02 PMTraceback (most recent call last):
File "C:\codebase\data-project\etl\src\run.py", line 99, in <module>
get_sg_fees_parallel(['norm_fees_parallel'], visualize=True)
File "C:\codebase\data-project\etl\src\run.py", line 85, in get_sg_fees_parallel
df = dr.execute(output_nodes)
File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\driver.py", line 410, in execute
raise e
File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\driver.py", line 403, in execute
outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\driver.py", line 498, in raw_execute
return self.graph_executor.execute(
File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\driver.py", line 230, in execute
executors.run_graph_to_completion(execution_state, self.execution_manager)
File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\execution\executors.py", line 342, in run_graph_to_completion
next_task = execution_state.release_next_task()
File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\execution\state.py", line 498, in release_next_task
return self.bind_task(self.task_queue.popleft())
File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\execution\state.py", line 474, in bind_task
dynamic_inputs = {**dynamic_inputs, **self.result_cache.read(input_vars_to_read)}
File "C:\Users\AppData\Roaming\Python\Python39\site-packages\hamilton\execution\state.py", line 113, in read
raise KeyError(f"Key {formatted_key} not found in cache")
KeyError: 'Key norm_fees not found in cache'
Seth Stokes
10/25/2023, 5:44 PM__config
to a function name? Is it always necessary even with @config.when
?Elijah Ben Izzy
10/25/2023, 5:58 PM@config
are on the same named functions in the same file…Seth Stokes
10/25/2023, 6:00 PM@config.when(cond="this")
def function(): pass
@config.when_not(cond="this")
def function(): pass
Elijah Ben Izzy
10/25/2023, 7:08 PMElijah Ben Izzy
10/25/2023, 7:08 PMdef function__this(...)
def function__not_this(...)