Slackbot
03/24/2022, 8:05 PMEduardo
source
many times, but you have to turn automatic upstream extraction off:
# pipeline.yaml
meta:
extract_upstream: false
tasks:
- source: task.py
name: task-a
products: ...
upstream [another]
- source: task.py
name: task-b
products: ...
upstream [a-different-one]
in your case, since drop_columns
will have different upstreams you can refer to them without knowing their name like this:
def drop_columns(upstream, product):
train = pd.read_csv(upstream.first["train"])
holdout = pd.read_csv(upstream.first["holdout"])
# cleaning....
does this solve your issue?Eduardo
feregrino
03/24/2022, 8:25 PM"train"
and "holdout"
in my Python code…
I was thinking of something to the tune of:
tasks:
- source: task.py
name: task-a
products: ...
upstream [another.train]
- source: task.py
name: task-b
products: ...
upstream [another.holdout]
- source: task.py
name: task-c
products: ...
upstream [another.some_other]
And then Python
def drop_columns(upstream, product):
generic_df = pd.read_csv(upstream)
#code...
Eduardo
drop_columns
you can do upstream.first
and that's going to return the products from the upstream task (whatever that is, since you're not referring the upstream by name). what's missing with this approach?feregrino
03/24/2022, 8:44 PMupstream
key in the pipeline:
tasks:
- source: task.drop_columns
name: task-a
products: ...
upstream: [another.train]
And then in `task.py`:
def drop_columns(upstream, product):
generic_df = pd.read_csv(upstream.first)
upstream.first
would contain the product train
of the another
task?Eduardo
upstream
you put the name of the upstream task. then upstream.first
returns the product of that upstream. example:
tasks:
- source: tasks.train
name: train
products:
a: ...
b: ....
- source: tasks.drop_columns
products:
c: ...
upstream: [train]
Eduardo
upstream.first
inside tasks.drop_columns
is the same as doing upstream['train']
hence, it returns a dictionary with keys a
and b
, since those are the products of train
feregrino
03/24/2022, 9:37 PMa
and b
in my Python code, as in upstream.first["a"]
, right?
I want to avoid having to reference a specific upstream product within my Python code. The way I was thinking is to be able to directly specify where the value of upstream
comes for a given task.
Kind of:
• for task a with source drop_columns, its upstream is split.holdout,
• for task b with source drop_columns, its upstream is split.train
• for task c with source drop_columns, its upstream is fetch_new_data.data
and then have the function drop_columns
be as generic as possible:
def drop_columns(upstream, product):
df = pd.read_csv(upstream)
# ...
Eduardo
def drop_columns(upstream, product):
key = list(upstream.first)[0]
df = pd.read_csv(upstream.first[key])
if the upstream generates a single product (instead of a dictionary), then it's simpler:
def drop_columns(upstream, product):
df = pd.read_csv(upstream.first)
Gaurav
03/25/2022, 2:49 AMferegrino
03/25/2022, 1:22 PMGaurav
03/25/2022, 1:25 PMferegrino
03/25/2022, 1:32 PMferegrino
03/25/2022, 4:05 PMEduardo
import_tasks_from
feature, that allows you to put all feature engineering code in a file and then have a pipeline.train.yaml
and pipeline.serve.yaml
files (see here) - but I agree, there's more we can do to enable this modularizationEduardo