Seth Stokes
03/28/2024, 10:05 PM.with_remote_executor(SynchronousLocalTaskExecutor())
execute more than once ~5/6 times?
Given len(accounts) == 1
.
dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({
"cob_date": datetime(2023,3,25),
"portfolio_config": portfolio_config,
"execution": "parallel",
"accounts": ["account_1"] # the `Parallelizable` part
})
.with_remote_executor(SynchronousLocalTaskExecutor())
.with_modules(
data_loader, # accounts are broken up here to load each
transfrom, # performed after collect
)
.build()
)
Stefan Krawczyk
03/28/2024, 11:46 PM# debug_parallel.py
from hamilton import driver
import datetime
from hamilton.execution.executors import SynchronousLocalTaskExecutor
from hamilton.htypes import Parallelizable, Collect
def account_loop(accounts: list[str]) -> Parallelizable[str]:
print("account loop")
for account in accounts:
yield account
def account_step(account_loop: str) -> str:
print("in account step")
return account_loop
def reduce(account_step: Collect[str]) -> list[str]:
print("reducing")
return list(account_step)
if __name__ == '__main__':
import __main__ as debug_parallel
dr = (
driver.Builder()
.enable_dynamic_execution(allow_experimental_mode=True)
.with_config({
"cob_date": datetime.datetime(2023,3,25),
"portfolio_config": {},
"execution": "parallel",
"accounts": ["account_1"] # the `Parallelizable` part
})
.with_remote_executor(SynchronousLocalTaskExecutor())
.with_modules(
debug_parallel
)
.build()
)
dr.execute(["reduce"])
Seth Stokes
03/28/2024, 11:52 PMSeth Stokes
03/28/2024, 11:52 PMStefan Krawczyk
03/28/2024, 11:57 PMSeth Stokes
03/28/2024, 11:57 PMStefan Krawczyk
03/29/2024, 12:23 AMStefan Krawczyk
03/29/2024, 12:23 AMSeth Stokes
03/29/2024, 12:24 AMStefan Krawczyk
03/29/2024, 12:25 AMSeth Stokes
03/29/2024, 12:31 AMStefan Krawczyk
03/29/2024, 1:43 AMaccount_step
throws an error, it only is run once.Stefan Krawczyk
04/02/2024, 8:59 PM