This message was deleted.
# ask-anything
s
This message was deleted.
meerkat 1
👏 2
o
That looks pretty useful. Can/do functions that are not depending on each other run asynchronously?
e
if the functions are defined on a different file, then it works. if they're defined inline, there's some issue with the Python multiprocessing module. I need to investigate this more
👍 1
o
Sounds ok!
l
Looks good! I've spent a little time attempting to do this with more generic functions. E.g., something like
Copy code
def ones() -> pd.Series:
  . . . 

def plusone(ds: pd.Series) -> pd.Series:
  return (
      ds
      .add(1)
      .rename(f"{ds.name}_plusone")
  )

def join(ds1: pd.Series, ds2: pd.Series) -> pd.DataFrame:
  return ds1.to_frame().join(ds2)

def make_dag():
  return join(ones(), plusone(ones()))

dag = make_dag()
I figure there's a decorator design that would allow such arbitrary python callables to be converted to ploomber tasks. I've played around with
PythonCallable
some, but haven't locked in a working pattern.
Additionally, it'd be useful to create dags that can accept data from memory.
input_data_parser
seems to allow this, but, again, I haven't landed on a working pattern. This way, common micro-pipelines could be imported from installable packages and used to transform data in memory, aiding rapid prototyping. As the project matures, the dag could be converted from an
InMemoryDAG
to a
DAG
and the results could be saved to disk. Do `InMemoryDAG`s allow caching an incremental loading? Ephemeral storage of intermediate results, perhaps via an explicit checkpoint flag, could be useful when maturing from prototyping to prod.
e
Looks good! I've spent a little time attempting to do this with more generic functions. E.g., something like
are the arguments of these functions the upstream dependencies? for example
def join(ds1, ds2)
- are ds1 and ds2 expected to be tasks in the pipeline?
Additionally, it'd be useful to create dags that can accept data from memory.
input_data_parser
seems to allow this, but, again, I haven't landed on a working pattern.
yes,
input_data_parser
is the way to go.
This way, common micro-pipelines could be imported from installable packages and used to transform data in memory, aiding rapid prototyping.
i like this idea
the dag could be converted from an
InMemoryDAG
to a
DAG
and the results could be saved to disk.
currently, you first need to build a DAG and then convert it into an InMemoryDAG, the other way around is not possible, but that's just implementation details, from the user perspective, we can make this transparent
Do `InMemoryDAG`s allow caching an incremental loading? Ephemeral storage of intermediate results, perhaps via an explicit checkpoint flag, could be useful when maturing from prototyping to prod.
it doesn't, since it passes outputs in-memory, it doesn't store anything. but you can get that with
DAG
.
I'll work on the example I shared earlier to add a few more features, like the data parser, dag -> inmemorydag conversion etc. I'll share my updates here 🙂
l
are the arguments of these functions the upstream dependencies? for example
def join(ds1, ds2)
- are ds1 and ds2 expected to be tasks in the pipeline?
I see them as naive functions. They could be used entirely without ploomber. One could pass any two data series or dataframes (or any `.join`able objects) to
join
and get a valid result.
ds1
and
ds2
could be outputs of upstream operations or they could be sideloaded from memory / disk. In the sideloading case, these objects would probably be passed through
input_data_parser
so they'd be outputs of an upstream task.
Thanks for all the other notes! This clarifies a few things. Looking forward to riffing on this more and seeing what you come up with.
e
I got something working last night 🙂 now the parallel execution works even when defining functions in the notebook, and no need to use
product
in the function signature. this requires some changes in Ploomber's internals but I can get something working next week. let me know what you think! @Luke Smith @Orell Garten