Not sure if it’s the right place but did not find ...
# getting-started
b
Not sure if it’s the right place but did not find a better channel 🤷‍♂️ I am on an older Cloud Composer version and want to use the DatahubEmitterOperator only to ingest some metadata. Do you know if its possible to register it as a plugin to use in my DAG or do I need to change my Composer installation? This wouldn’t be my preferred solution because its used by multiple teams and don’t want to mess with the dependencies
l
@bland-orange-95847 are you trying to use the Airflow lineage backend with Google Cloud composer?
If so, I think there are some issues with making it work. cc @breezy-glass-7892 who has tried to make it work and can comment more
b
No, I don’t want to use the lineage backend but just the emitter operator to emit some lineage information
m
I believe @square-activity-64562 might also be using the lineage emitter operator with Google Composer
s
I decided to make a separate docker container to not bring in all the dependencies. Extracted some basic code from https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub_provider/_lineage_core.py#L40 and made a simple cli passing in required parameters. For every airflow task added an option to create another airflow task which sends the lineage using this docker container. Note that there is a probable bug in the lineage code file. The URLs don't work with Airflow 1.x. Might be of Airflow 2.x. Didn't have the time to get an airflow 2.x up to confirm whether it was a bug or I missed something.
Having it as a lineage backend can cause a lot of dependencies to come in which can cause issues. Also, any change to composer pypi plugins takes ~30mins. For every new release of datahub I would have needed to make the change in pypi dependency which would take a lot of time. In case something went wrong we are looking at ~1 hour of downtime in airflow. Did not want to introduce that risk.
b
Thanks @square-activity-64562 thats exactly my issue. I also thought of creating a docker image containing everything needed and submitting lineage from there but that would require reimplementing a lot from what is already there (as you did). Using the code and Airflow mechanism would be a lot better. Have you considered using the Airflow plugins mechanism? Would this be an option? I am not really familiar with this but maybe this could be a trade-off between the two solutions
m
Do people using Airflow use the Docker / K8s based executor model normally?
If there is interest in having a standard docker-based operator that can be added to an Airflow DAG ... I think it might be useful for everyone?
b
I am not sure if I get your question right. We are using airflow celery executor but most of the tasks are KubernetesPodOperator tasks. So an approach with having a docker image doing all the stuff would be a good fit.
m
Got it. What are the things needed inside that docker image? Just Acryl Datahub python package? Anything from airflow?
b
just the acryl datahub package to build the mces and push lineage to Datahub. It can be easily build to have all in the image. And then the work is to have a neat interface for the PodOperator to pass in the information via parameter/config and build the MCE out of it.
s
@mammoth-bear-12532 Here is a perspective https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753. Our team discussed and agreed to use KubernetesPodOperator exclusively
👍 1
@mammoth-bear-12532 The datahub metadata ingestion image works just fine. I added small bit of code on top of it
m
Got it. I guess to help with creating a cli version of sending metadata with only command line args
s
Copy code
from utils.py import initialize
initialize()

import click
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
import logging

from datahub.emitter.rest_emitter import DatahubRestEmitter
from typing import List

logger = logging.getLogger(__name__)

PLATFORMS = [
    'athena', 
    'mysql',
    'postgres',
    's3'
]

TIMEOUT = 10

def _validate_platform(platform):
    if platform not in PLATFORMS:
        raise RuntimeError(f"Unknown platform ${platform} passed in urn")


def _get_dataset_urn(string: str):
    platform, dataset_id = string.split(":")
    
    _validate_platform(platform)

    return builder.make_dataset_urn(platform, dataset_id)


def _get_tag_aspect(tags: List[str]):
    if len(tags) == 0:
        return []
    tags_class = models.GlobalTagsClass(
        tags=[
            models.TagAssociationClass(tag=builder.make_tag_urn(tag))
            for tag in (tags or [])
        ]
    )
    return [tags_class]

def get_dataset_snapshot(urn: str, removed: bool, browse_paths: List[str] = None):
    aspects = [
        models.StatusClass(removed=removed),
    ]
    if browse_paths is not None:
        aspects.append(models.BrowsePathsClass(paths=browse_paths))
    return models.MetadataChangeEventClass(
        proposedSnapshot=models.DatasetSnapshotClass(
            urn=urn,
            aspects=aspects
        )
    )


def send_mces(gms_url, mces):
    emitter = DatahubRestEmitter(gms_url, None, TIMEOUT, TIMEOUT)
    for mce in mces:
        <http://logger.info|logger.info>(f"mce is {mce}")
        emitter.emit_mce(mce)


def send_job_lineage_task(gms_url, base_url, dag_id, task_id, upstream_urns: List[str], downstream_urns: List[str], job_tags: List[str], task_tags: List[str]):

    input_datasets=[_get_dataset_urn(string) for string in upstream_urns]
    output_datasets=[_get_dataset_urn(string) for string in downstream_urns]

    flow_urn = builder.make_data_flow_urn("airflow", dag_id)
    job_urn = builder.make_data_job_urn_with_flow(flow_urn, task_id)

    flow_url = f"{base_url}/airflow/tree?dag_id={dag_id}"
    job_url = f"{base_url}/taskinstance/?flt0_dag_id_equals={dag_id}&flt3_task_id_equals={task_id}"

    # Flow is Pipeline (aka airflow Job)
    job_tags_aspect = _get_tag_aspect(job_tags)
    flow_mce = models.MetadataChangeEventClass(
        proposedSnapshot=models.DataFlowSnapshotClass(
            urn=flow_urn,
            aspects=[
                models.DataFlowInfoClass(
                    name=dag_id,
                    externalUrl=flow_url,
                ),
                *job_tags_aspect
            ],
        )
    )

    # Job is Task (aka airflow task)
    task_tags_aspect = _get_tag_aspect(job_tags + task_tags)
    job_mce = models.MetadataChangeEventClass(
        proposedSnapshot=models.DataJobSnapshotClass(
            urn=job_urn,
            aspects=[
                models.DataJobInfoClass(
                    name=task_id,
                    type=models.AzkabanJobTypeClass.COMMAND,
                    externalUrl=job_url,
                ),
                models.DataJobInputOutputClass(
                    inputDatasets=input_datasets,
                    outputDatasets=output_datasets,
                    inputDatajobs=[],
                ),
                *task_tags_aspect
            ],
        )
    )
    force_entity_materialization = [
        get_dataset_snapshot(iolet, False)
        for iolet in input_datasets + output_datasets
    ]
    mces = [
        flow_mce,
        job_mce,
        *force_entity_materialization
    ]
    send_mces(gms_url, mces)
def send_dataset_to_view_lineage_task(gms_url, upstream_urns: List[str], downstream_urn: str):
    mces = [
        builder.make_lineage_mce(
            [_get_dataset_urn(string) for string in upstream_urns], 
            [_get_dataset_urn(downstream_urn)][0]
        )
    ]
    send_mces(gms_url, mces)
@click.group()
def cli():
    pass


@cli.command()
@click.option('--gms-url', required=True)
@click.option('--airflow-url', required=True)
@click.option('--dag-id', required=True)
@click.option('--task-id', required=True)
@click.option('--upstream-urn', required=False, multiple=True)
@click.option('--downstream-urn', required=False, multiple=True)
@click.option("--job-tag", required=False, multiple=True)
@click.option("--task-tag", required=False, multiple=True)
def send_job_lineage(gms_url, airflow_url, dag_id, task_id, upstream_urn, downstream_urn, job_tag, task_tag):
    send_job_lineage_task(
        gms_url,
        airflow_url,
        dag_id,
        task_id,
        list(upstream_urn),
        list(downstream_urn),
        list(job_tag),
        list(task_tag)
    )

@cli.command()
@click.option('--gms-url', required=True)
@click.option('--upstream-urn', required=False, multiple=True)
@click.option('--downstream-urn', required=False, multiple=False)
def send_dataset_to_view_lineage(gms_url, upstream_urn, downstream_urn):
    send_dataset_to_view_lineage_task(
        gms_url,
        list(upstream_urn),
        downstream_urn
    )
Here is the piece of code. I was going to send a PR to merge it in datahub but didn't get the time
This needs some cleanup. But should give people a starting point
@bland-orange-95847 I am not familiar with airflow plugins. But I suspect they would also require bringing in dependencies. At the end they also need to run in the same python environment. They would need the required dependencies
b
@square-activity-64562 you used
linkedin/datahub-ingestion
as the base image and the python script above as the entrypoint, right? and yeah, plugins does not seem to help a lot there, found nothing to solve that issue.
s
yes @bland-orange-95847 that image is base image
👍 1
b
By the way I love this community. Always people there to connect with and share the thoughts ❤️👍
❤️ 1
s
It is very small work @bland-orange-95847. Just a single file needed. I had to pass the
--gms-url
because the current cli approach of specifying gms location https://datahubproject.io/docs/how/delete-metadata/#delete-by-urn is a problem when using automation. I have sent a PR https://github.com/linkedin/datahub/pull/3215 to make it easier to use in cli. After that PR is merged this can be cleaned up
@mammoth-bear-12532 The only needed to make this work is to extract code from https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub_provider/_lineage_core.py#L40. I think best place would be https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/metadata/schema_classes.py. Add a cli on top of it. Make it possible to use the cli in automation https://github.com/linkedin/datahub/pull/3215 should help
After this is done people should be able to use datahub ingestion container image for the cli in automation or airflow. No need for DockerOperator or anything like that.
👍 1
m
This looks great. Thanks for the suggestion @square-activity-64562 !