This message was deleted.
# ask-anything
s
This message was deleted.
e
yes, you can use the same
source
many times, but you have to turn automatic upstream extraction off:
Copy code
# pipeline.yaml
meta:
  extract_upstream: false

tasks:
  - source: task.py
    name: task-a
    products: ...
    upstream [another]

  - source: task.py
    name: task-b
    products: ...
    upstream [a-different-one]
in your case, since
drop_columns
will have different upstreams you can refer to them without knowing their name like this:
Copy code
def drop_columns(upstream, product):
    train = pd.read_csv(upstream.first["train"])
    holdout = pd.read_csv(upstream.first["holdout"])
    # cleaning....
does this solve your issue?
(note: if you turn extract_upstream off, all tasks must have an upstream key)
f
Ahm, not so sure that solves my problem though… I still have to hardcode
"train"
and
"holdout"
in my Python code… I was thinking of something to the tune of:
Copy code
tasks:
  - source: task.py
    name: task-a
    products: ...
    upstream [another.train]

  - source: task.py
    name: task-b
    products: ...
    upstream [another.holdout]

  - source: task.py
    name: task-c
    products: ...
    upstream [another.some_other]
And then Python
Copy code
def drop_columns(upstream, product):
    generic_df = pd.read_csv(upstream)
    #code...
e
yeah, so in
drop_columns
you can do
upstream.first
and that's going to return the products from the upstream task (whatever that is, since you're not referring the upstream by name). what's missing with this approach?
f
So I could be specific in the
upstream
key in the pipeline:
Copy code
tasks:
  - source: task.drop_columns
    name: task-a
    products: ...
    upstream: [another.train]
And then in `task.py`:
Copy code
def drop_columns(upstream, product):
    generic_df = pd.read_csv(upstream.first)
upstream.first
would contain the product
train
of the
another
task?
e
ah ok, i think I see where the confusion is coming from. in
upstream
you put the name of the upstream task. then
upstream.first
returns the product of that upstream. example:
Copy code
tasks:
  - source: tasks.train
    name: train
    products:
      a: ...
      b: ....

  - source: tasks.drop_columns
    products:
      c: ...
    upstream: [train]
then
upstream.first
inside
tasks.drop_columns
is the same as doing
upstream['train']
hence, it returns a dictionary with keys
a
and
b
, since those are the products of
train
f
Oh, still not quite my use case. If I understand correctly, I would still need to reference
a
and
b
in my Python code, as in
upstream.first["a"]
, right? I want to avoid having to reference a specific upstream product within my Python code. The way I was thinking is to be able to directly specify where the value of
upstream
comes for a given task. Kind of: • for task a with source drop_columns, its upstream is split.holdout, • for task b with source drop_columns, its upstream is split.train • for task c with source drop_columns, its upstream is fetch_new_data.data and then have the function
drop_columns
be as generic as possible:
Copy code
def drop_columns(upstream, product):
    df = pd.read_csv(upstream)
    # ...
e
ah ok, got it. interesting use case, never thought about it. right now, ploomber pushes all products downstream, so what you're suggesting isn't supported, I'll open an issue since this is an interesting use case. I think for now, what I suggested can help you. then, you have two options. 1) either standarize your products (e.g. tasks product a single file or produce a dictionary with the asme keys) or 2) add a bit of logic so you don't need to hardcode the product key. e.g if they generate a dictionary with a single key:
Copy code
def drop_columns(upstream, product):
    key = list(upstream.first)[0]
    df = pd.read_csv(upstream.first[key])
if the upstream generates a single product (instead of a dictionary), then it's simpler:
Copy code
def drop_columns(upstream, product):
    df = pd.read_csv(upstream.first)
g
interesting request. So if i understand correct, @feregrino your drop task still has to have explicit knowledge of two upstream products as you are producing respective cleaned output for each. how would you be able to generate different named output without this understanding?
f
@Eduardo cool, I’ll keep an eye on the issues then. @Gaurav the task, as an abstraction (in the pipeline definition) yes, has to know about the upstream. However, at code (or using Ploomber terms, source level) I see no need for it to hold this knowledge.
g
Just wondering if instead of generating named output, can you define product as a direcory instead of a file. In downstream task you just then have to get all files from this folder and apply drop task on them.
f
Sure I could workaround in many ways, however they all sound hacky. To me, it would make more sense to allow true task reusability by letting users specify connections amongst them in the pipeline file, rather than hardcoding them within the source code itself.
The more I think about this, the more it makes sense to me... For example, I was thinking of having three pipelines: One for initial training, other for subsequent retraining and one for batch inference. Reusing the same code for feature transformation across three pipelines would be great, reusing the training/evaluation/validation code for the initial and subsequent retraining would be great. At the moment I see this only being possible by creating thin wrapper methods.
e
for training <> serving, we have a
import_tasks_from
feature, that allows you to put all feature engineering code in a file and then have a
pipeline.train.yaml
and
pipeline.serve.yaml
files (see here) - but I agree, there's more we can do to enable this modularization
hi, i created an issue. I think this should fix your problem @feregrino, feel free to comment any feedback. @Gaurav, please also let me know what you think. I'd like to open the discussion so we add a new example showing how to re-use tasks https://github.com/ploomber/ploomber/issues/682
👍 1