Slackbot
04/25/2023, 12:18 PMElijah Ben Izzy
04/25/2023, 2:08 PM@parameterize_subdag
and resolve
to create one subdag per “partition”. Then you join them all. AFK now but I’ll put some pseudo code in here when I’m back.James Marvin
04/25/2023, 2:08 PMElijah Ben Izzy
04/25/2023, 2:08 PMElijah Ben Izzy
04/25/2023, 3:59 PM@parameterized_subdag
+ @resolve
. The only caveat is that you have to know the partitions prior to instantiating the DAG. So, it looks something like this:
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
lambda partitions: parameterize_subdag(my_funcs, **{f"partition_{i}": {"inputs" : {"partition":
value(i) for i in partitions) })
)
def partitioned_datasets(output_from_subdag: pd.DataFrame) -> pd.DataFrame:
return output_from_subdag(pd.DataFrame)
What this does is use the config
value of partitions
, which is a list of ints
to create one subdag per partition. You’ll have to pass that in in the driver.
Then, if you want to concat them, you can do the following:
@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
lambda partitions: inject(group(**{str(i) : source(f"partition_{i}") for i in partitions})
)
def concatted_data(dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame:
...
This declares a function that accepts all in a group.Elijah Ben Izzy
04/25/2023, 4:16 PMDFGenerator = Generator[pd.DataFrame, None, None]
def partitioned_data_generator(partitions: List[int], raw_data: pd.DataFrame) -> DFGenerator:
for partition in partitions:
yield _process(_extract_partition(partition))
def concatted_data(partitioned_data_generator: DFGenerator) -> pd.DataFrame:
for df in partitioned_data_generator:
_do_something_with(df)
I like this less because it leaks the implementation (generators/streaming) into the function, but its the only way so far if memory concerns are your main constraint...