Hi Everyone, Quick question regarding Lineages - H...
# ingestion
h
Hi Everyone, Quick question regarding Lineages - How we add lineage between existing Data Task ( in a Data pipeline) and a dataset. I see this example - https://github.com/linkedin/datahub/blob/master/metadata-ingestion/examples/library/lineage_dataset_job_dataset.py but it seems to be adding lineage to a data_job entity -
Copy code
entityUrn=builder.make_data_job_urn(
        orchestrator="airflow", flow_id="flow1", job_id="job1", cluster="PROD"
    ),
s
We call Pipelines as Flow and task as job
h
@square-activity-64562 - Noted. Are we not able to add lineage between existing datajob/task and datasets ?
Copy code
entityUrn=builder.make_data_job_urn(
            #    orchestrator="airflow", flow_id="testdag", job_id="templated", cluster="PROD"
            #),
@square-activity-64562 - The above code doesnt seem to working, When replaced with the actual datajob urn, it was able to capture.
c
@handsome-football-66174 I just tried above scenario and things are working for me. Can you check if there are any differences between fields you set for urn builder vs actual URN you are using? You can also print and check the value of entityUrn generated by builder vs your urn
h
Copy code
#entityUrn=builder.make_data_job_urn(
            #    orchestrator="airflow", flow_id="testdag", job_id="templated", cluster="PROD"
            #),

            entityUrn="urn:li:dataJob:(urn:li:dataFlow:(airflow,testdag,prod),templated)",
@careful-pilot-86309 - In the above two options, The builder.make_data_job_urn doesnt seem to be working
@square-activity-64562 @careful-pilot-86309 - How do we capture the lineage information for datapipelines ( tasks etc in the data pipeline)
c
you are using "PROD" vs "prod" in builder vs plain urn.
Lineage will get captured when you set DataJonInputOutput aspect to task (datajob)
h
@careful-pilot-86309 - But according to the samples shared , I see that only the Datasets can be provided as input and output - datajob_input_output = DataJobInputOutputClass( inputDatasets=input_datasets, outputDatasets=output_datasets, inputDatajobs=input_data_jobs, ) Sample - https://github.com/linkedin/datahub/blob/master/metadata-ingestion/examples/library/lineage_dataset_job_dataset.py
c
Lineage will be created automatically using these input output datasets. You can view it on ui in task/datajob
h
@careful-pilot-86309 - Looking at creating lineage of tasks ( in a data pipeline )
Quick Question - Notice that if the dataset is not present in the Application while adding lineage, it created a dummy. How can we configure it to not create it.
c
Can you please provide exact steps you are executing?
h
@careful-pilot-86309 # Construct a lineage object. lineage_mce = builder.make_lineage_mce( [ builder.make_dataset_urn("glue", "testdb.dataset1"), ], builder.make_dataset_urn("glue", "testdb.unknowndataset"), ) # Create an emitter to the GMS REST API. emitter = DatahubRestEmitter("<https://cobalt-dev-metadata-mgmt-gms.dev.ihdp.awsnonprod.healthcareit.net|<>gmshostname>") # Emit metadata! emitter.emit_mce(lineage_mce)
c
The code seems good to me. It will create dataset under glue and you can view lineage. what do you mean by creating dummy dataset?
h
If the Dataset is prsent it works fine- the lineage gets added well. If it is not present, a dummy dataset is created and lineage is getting added
c
yes, if dataset is not there it will create it. But its just link. It wont be visible in your datasets tab as entity for that dataset is not created
h
Ideally it should not, why create a lineage for a dataset not even present ?
c
We need somekind of placeholder for lineage endpoint. One might not have all their data linked to datahub yet and this keeps a possibility open to link it later
👍 1