bland-orange-95847
09/09/2021, 9:03 AMloud-island-88694
loud-island-88694
bland-orange-95847
09/09/2021, 6:06 PMmammoth-bear-12532
square-activity-64562
09/10/2021, 6:55 AMsquare-activity-64562
09/10/2021, 6:59 AMbland-orange-95847
09/10/2021, 7:17 AMmammoth-bear-12532
mammoth-bear-12532
bland-orange-95847
09/10/2021, 7:34 AMmammoth-bear-12532
bland-orange-95847
09/10/2021, 7:39 AMsquare-activity-64562
09/10/2021, 7:39 AMsquare-activity-64562
09/10/2021, 7:40 AMmammoth-bear-12532
square-activity-64562
09/10/2021, 7:42 AMfrom utils.py import initialize
initialize()
import click
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
import logging
from datahub.emitter.rest_emitter import DatahubRestEmitter
from typing import List
logger = logging.getLogger(__name__)
PLATFORMS = [
'athena',
'mysql',
'postgres',
's3'
]
TIMEOUT = 10
def _validate_platform(platform):
if platform not in PLATFORMS:
raise RuntimeError(f"Unknown platform ${platform} passed in urn")
def _get_dataset_urn(string: str):
platform, dataset_id = string.split(":")
_validate_platform(platform)
return builder.make_dataset_urn(platform, dataset_id)
def _get_tag_aspect(tags: List[str]):
if len(tags) == 0:
return []
tags_class = models.GlobalTagsClass(
tags=[
models.TagAssociationClass(tag=builder.make_tag_urn(tag))
for tag in (tags or [])
]
)
return [tags_class]
def get_dataset_snapshot(urn: str, removed: bool, browse_paths: List[str] = None):
aspects = [
models.StatusClass(removed=removed),
]
if browse_paths is not None:
aspects.append(models.BrowsePathsClass(paths=browse_paths))
return models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn=urn,
aspects=aspects
)
)
def send_mces(gms_url, mces):
emitter = DatahubRestEmitter(gms_url, None, TIMEOUT, TIMEOUT)
for mce in mces:
<http://logger.info|logger.info>(f"mce is {mce}")
emitter.emit_mce(mce)
def send_job_lineage_task(gms_url, base_url, dag_id, task_id, upstream_urns: List[str], downstream_urns: List[str], job_tags: List[str], task_tags: List[str]):
input_datasets=[_get_dataset_urn(string) for string in upstream_urns]
output_datasets=[_get_dataset_urn(string) for string in downstream_urns]
flow_urn = builder.make_data_flow_urn("airflow", dag_id)
job_urn = builder.make_data_job_urn_with_flow(flow_urn, task_id)
flow_url = f"{base_url}/airflow/tree?dag_id={dag_id}"
job_url = f"{base_url}/taskinstance/?flt0_dag_id_equals={dag_id}&flt3_task_id_equals={task_id}"
# Flow is Pipeline (aka airflow Job)
job_tags_aspect = _get_tag_aspect(job_tags)
flow_mce = models.MetadataChangeEventClass(
proposedSnapshot=models.DataFlowSnapshotClass(
urn=flow_urn,
aspects=[
models.DataFlowInfoClass(
name=dag_id,
externalUrl=flow_url,
),
*job_tags_aspect
],
)
)
# Job is Task (aka airflow task)
task_tags_aspect = _get_tag_aspect(job_tags + task_tags)
job_mce = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn=job_urn,
aspects=[
models.DataJobInfoClass(
name=task_id,
type=models.AzkabanJobTypeClass.COMMAND,
externalUrl=job_url,
),
models.DataJobInputOutputClass(
inputDatasets=input_datasets,
outputDatasets=output_datasets,
inputDatajobs=[],
),
*task_tags_aspect
],
)
)
force_entity_materialization = [
get_dataset_snapshot(iolet, False)
for iolet in input_datasets + output_datasets
]
mces = [
flow_mce,
job_mce,
*force_entity_materialization
]
send_mces(gms_url, mces)
def send_dataset_to_view_lineage_task(gms_url, upstream_urns: List[str], downstream_urn: str):
mces = [
builder.make_lineage_mce(
[_get_dataset_urn(string) for string in upstream_urns],
[_get_dataset_urn(downstream_urn)][0]
)
]
send_mces(gms_url, mces)
@click.group()
def cli():
pass
@cli.command()
@click.option('--gms-url', required=True)
@click.option('--airflow-url', required=True)
@click.option('--dag-id', required=True)
@click.option('--task-id', required=True)
@click.option('--upstream-urn', required=False, multiple=True)
@click.option('--downstream-urn', required=False, multiple=True)
@click.option("--job-tag", required=False, multiple=True)
@click.option("--task-tag", required=False, multiple=True)
def send_job_lineage(gms_url, airflow_url, dag_id, task_id, upstream_urn, downstream_urn, job_tag, task_tag):
send_job_lineage_task(
gms_url,
airflow_url,
dag_id,
task_id,
list(upstream_urn),
list(downstream_urn),
list(job_tag),
list(task_tag)
)
@cli.command()
@click.option('--gms-url', required=True)
@click.option('--upstream-urn', required=False, multiple=True)
@click.option('--downstream-urn', required=False, multiple=False)
def send_dataset_to_view_lineage(gms_url, upstream_urn, downstream_urn):
send_dataset_to_view_lineage_task(
gms_url,
list(upstream_urn),
downstream_urn
)
square-activity-64562
09/10/2021, 7:42 AMsquare-activity-64562
09/10/2021, 7:43 AMsquare-activity-64562
09/10/2021, 7:44 AMbland-orange-95847
09/10/2021, 7:45 AMlinkedin/datahub-ingestion
as the base image and the python script above as the entrypoint, right?
and yeah, plugins does not seem to help a lot there, found nothing to solve that issue.square-activity-64562
09/10/2021, 7:46 AMbland-orange-95847
09/10/2021, 7:46 AMsquare-activity-64562
09/10/2021, 7:49 AM--gms-url
because the current cli approach of specifying gms location https://datahubproject.io/docs/how/delete-metadata/#delete-by-urn is a problem when using automation. I have sent a PR https://github.com/linkedin/datahub/pull/3215 to make it easier to use in cli. After that PR is merged this can be cleaned upsquare-activity-64562
09/10/2021, 7:51 AMsquare-activity-64562
09/10/2021, 7:52 AMmammoth-bear-12532