This message was deleted.
# hamilton-help
s
This message was deleted.
e
Ok, so this is certainly doable, but it’s not very natural yet. That said, we’re thinking about this exactly, and your input would be 🔥 https://github.com/DAGWorks-Inc/hamilton/issues/19. The trick here is to use
@parameterize_subdag
and
resolve
to create one subdag per “partition”. Then you join them all. AFK now but I’ll put some pseudo code in here when I’m back.
j
Thanks
e
Re: incrementally populating it — curious what you need/why. This gets trickier, but with the generator/iterator pattern it could be really easy.
OK, so a few ways to do this: 1. You can orchestrate it outside of hamilton (run each DAG in series with a different driver, then pass the results into another after you combine them. 2. You can do this all within Hamilton For (2) the tool you’ll want to use is
@parameterized_subdag
+
@resolve
. The only caveat is that you have to know the partitions prior to instantiating the DAG. So, it looks something like this:
Copy code
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    lambda partitions: parameterize_subdag(my_funcs, **{f"partition_{i}": {"inputs" : {"partition":      
        value(i) for i in partitions) })
)
def partitioned_datasets(output_from_subdag: pd.DataFrame) -> pd.DataFrame:
    return output_from_subdag(pd.DataFrame)
What this does is use the
config
value of
partitions
, which is a list of
ints
to create one subdag per partition. You’ll have to pass that in in the driver. Then, if you want to concat them, you can do the following:
Copy code
@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    lambda partitions: inject(group(**{str(i) : source(f"partition_{i}") for i in partitions})
)
def concatted_data(dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame:
    ...
This declares a function that accepts all in a group.
So, if you want it to do it incrementally its a little tougher. I imagine you could actually do this manually, by having an upstream function return a generator (although a subdag would be tougher):
Copy code
DFGenerator = Generator[pd.DataFrame, None, None]

def partitioned_data_generator(partitions: List[int], raw_data: pd.DataFrame) -> DFGenerator:
    for partition in partitions:
        yield _process(_extract_partition(partition))

def concatted_data(partitioned_data_generator: DFGenerator) -> pd.DataFrame:
    for df in partitioned_data_generator:
        _do_something_with(df)
I like this less because it leaks the implementation (generators/streaming) into the function, but its the only way so far if memory concerns are your main constraint...