Hugo Evers
05/25/2023, 2:10 PMreturn pipeline(
[
node(
func=rename_columns,
inputs="pretraining_set",
outputs="renamed_df",
name="rename_columns",
),
node(
func=truncate_description,
inputs="renamed_df",
outputs="truncated_df",
name="truncate_description",
),
node(
func=drop_duplicates,
inputs="truncated_df",
outputs="deduped_df",
name="drop_duplicates",
),
node(
func=pad_zeros,
inputs="deduped_df",
outputs="padded_df",
name="pad_zeros",
),
node(
func=filter_0000,
inputs="padded_df",
outputs="filtered_df",
name="filter_0000",
),
node(
func=clean_description,
inputs="filtered_df",
outputs="cleaned_df",
name="clean_description",
),
node(
func=concat_title_description,
inputs="cleaned_df",
outputs="concatenated_df",
name="concat_title_description",
),
]
)
However, on AWS batch these would be run on separate containers, I now use the cloudpickle dataset to facilitate this, but it is actually not neccesary when i use something like dask.
I could also instead run this pipeline like this:
return (
df.pipe(rename_columns)
.pipe(truncate_description)
.pipe(drop_duplicates)
.pipe(pad_zeros)
.pipe(filter_0000)
.pipe(clean_description)
.pipe(concat_title_description)
)
The aforementioned pipeline has tags, and filtering in a modular pipeline depending on pre-training, tuning, which language, etc.
The flatten pipeline would be nice to use in the case of kedro run runner=… concat_pipeline=true, or something like that.
Is this idea worth exploring? It is really not essential, i can work around it, but the ability to have pipelines that can “fold” like this is quite appealing.Deepyaman Datta
05/26/2023, 2:22 PM.pipe()
method, even though it's been around since pandas 0.16.2!
Out of curiosity, how is the behavior of .pipe()
different from if renamed_df
, truncated_df
, etc. were all MemoryDataSet
instances?Hugo Evers
05/26/2023, 2:29 PMMemoryDataSet
is not an option in a distributed environment like AWS Batch. So the default_dataset should be changed to for example a PickleDataset, which requires serialisation and network IO.
Apart from that, i am not sure whether .pipe()
allows one to hook into vectorised optimisation procedures that Pandas supplies, i would think not because pipe is applied tablewise, and meant for sequential tablewise operations.Deepyaman Datta
05/26/2023, 2:33 PMFor me the main difference is that theIn my opinion, this is an issue resulting from the way the deployment is done (and recommended in current Kedro docs), rather than an issue withis not an option in a distributed environment like AWS Batch.MemoryDataSet
MemoryDataSet
per se.
If I understand correctly, what you're proposing is to collapse multiple nodes into 1 that use .pipe()
automatically, because you're deploying each node as a container, whereas if could just deploy a set of nodes (or modular pipeline) to each container it would also solve the issue (in, IMO, a cleaner way).
But I understand your point.Hugo Evers
05/26/2023, 2:42 PMDeepyaman Datta
05/26/2023, 2:46 PMI think to generalize the usecase, currently nodes in pipelines are ran separately.I think the general sentiment (and also my opinion) is to moving towards a state where Kedro deployments aren't mapping nodes to tasks on workflow orchestrators, but rather mapping something like modular pipelines, because nodes usually represent a much smaller unit of work (like what you have), but it doesn't make sense to have them each be a container in most cases. But, while the sentiment is moving, the deployment guides haven't changed and the approach isn't formalized. :D
Hugo Evers
05/26/2023, 4:01 PM