thousands-intern-95970
03/14/2022, 4:10 PMloud-island-88694
green-football-43791
03/14/2022, 5:17 PMthousands-intern-95970
03/15/2022, 9:19 AMimport datahub.emitter.mce_builder as builder
import json
import os
import pandas as pd
import numpy as np
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from <http://datahub.metadata.com|datahub.metadata.com>.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage
)
from datahub.metadata.schema_classes import ChangeTypeClass, DataJobInputOutputClass
data = pd.read_excel('/file.xlsx')
source_t = data['SOURCE_TABLE'].tolist()
target_t = data['TARGET_TABLE'].tolist()
source_c = data['SOURCE_COLUMN'].tolist()
target_c = data['TARGET_COLUMN'].tolist()
def datasetUrn(tbl):
return builder.make_dataset_urn("postgres", tbl, "DEV")
def fldUrn(tbl, fld):
return builder.make_schema_field_urn(datasetUrn(tbl), fld);
# Lineage of fields in a dataset
# c1 <-- unknownFunc(bar2.c1, bar4.c1)
# c2 <-- myfunc(bar3.c2)
# {c3,c4} <-- unknownFunc(bar2.c2, bar2.c3, bar3.c1)
# c5 <-- unknownFunc(bar3)
# {c6,c7} <-- unknownFunc(bar4)
# note that the semantic of the "transformOperation" value is contextual.
# In above example, it is regarded as some kind of UDF; but it could also be an expression etc.
#bar1, bar2 defined as the dataset --> refernece to the source table and target table
#c1, c2 defined as the dataset --> refered as to the source and column field on the tables
for i in range(10):
fineGrainedLineages=[
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn(source_t[i], source_c[i]), fldUrn(source_t[i], source_c[i])],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn(target_t[i], target_c[i])],
confidenceScore = 1-(i*0.1), transformOperation="myfunc")
]
#print(fineGrainedLineages)
# this is just to check if any conflicts with existing Upstream, particularly the DownstreamOf relationship
upstream = Upstream(dataset=datasetUrn("JPR_D_SCHADEN"), type=DatasetLineageType.TRANSFORMED)
FineGrainedLineage0 = FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[fldUrn(None, None), fldUrn(None,None)],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[fldUrn("SR_SCH_COP", "BEARB_DAT")],
confidenceScore = 1-(i*0.1), transformOperation="myfunc")
fineGrainedLineages.insert(0,FineGrainedLineage0)
fieldLineages = UpstreamLineage(upstreams=[upstream], fineGrainedLineages=fineGrainedLineages)
lineageMcp = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=datasetUrn("JPR_D_SCHADEN"),
aspectName="upstreamLineage",
aspect=fieldLineages
)
print(lineageMcp)
# Create an emitter to the GMS REST API.
emitter = DatahubRestEmitter(#url)
#print(upstream)
# Emit metadata!
emitter.emit_mcp(lineageMcp)
brave-businessperson-3969
03/16/2022, 8:27 PMgreen-football-43791
03/16/2022, 8:34 PMgreen-football-43791
03/16/2022, 8:35 PM