anyone using datahub lineage with Airflow in Googl...
# troubleshoot
s
anyone using datahub lineage with Airflow in Google Composer? I have installed
acryl-datahub[airflow]==0.8.6.4
as a package but it is not showing connection type in airflow UI
l
@breezy-glass-7892 has used datahub lineage with Google Composer if I'm not mistaken
b
Yeah, that’s right.
s
@breezy-glass-7892 you used cli to create the connection? It is not showing in the UI Also, google composer shows this in configuration override. So does lineage work?
Copy code
The configuration properties are stored in a file called airflow.cfg the first time you run Apache Airflow. You can choose to change these properties and override the default values. Sections include: core, cli, api, operators, hive, webserver, email, smtp, celery, celery_broker_transport_options, dask, scheduler, ldap, mesos, kerberos, github_enterprise, admin.
If yes, how did you add indentation? https://datahubproject.io/docs/metadata-ingestion#using-datahubs-airflow-lineage-backend-recommended. This is a simple textbox. Not really sure how to add indentation in here
b
Are you talking about establishing a connection with the Daatahub service?
s
yes for lineage backend
b
You have to first create an ingress service on GCP
Once you do that you will have a IP address assigned to your service
s
I am not talking about the access to the UI of datahub. That I already have.
b
Under your airflow UI, just go to Admin -> Connections and add a new connection
The name of the connection
And the Host
The host in this case will be same as your datahub-gms-ingress service
s
Are you running airflow 1 or 2? The datahub connection types are not showing up for me
b
Yeah you can leave that blank
I am running an older version of Airflow
s
b
I have not updated the
airflow.cfg
file
s
Lineage is working for you even when you have not specified the lineage backend?
b
Yea, I think this link is leveraging the Airflow lineage info
What I have is using BigQuery’s logs to derive lineage
So, maybe what you are looking to do is not a 100% same as what I implemented
s
Ok. Thanks @breezy-glass-7892
@mammoth-bear-12532 Any suggestions on how to set
datahub_kwargs
?
Maybe I can just a simple example instead of using the lineage backend. I will try using
linkedin/datahub-ingestion
and https://github.com/linkedin/datahub/blob/master/metadata-ingestion/examples/library/lineage_emitter_rest.py to ingest it. Will have to wrap it up to ad airflow specific info. This will be better for our use case
m
Hi @square-activity-64562: I think for google composer maybe go with the a la carte operator option.
Looks like that is what you are proposing here as well
s
I was actually talking about having a CLI. This allows us to create a kubernetes Pod in a separate cluster. Using code from https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub_provider/_lineage_core.py I came up with this as a draft.
Copy code
import click
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.emitter.rest_emitter import DatahubRestEmitter
from typing import List

PLATFORMS = [
    'athena', 
    'postgres'
]

def _validate_platform(platform):
    if platform not in PLATFORMS:
        raise RuntimeError(f"Unknown platform ${platform} passed in urn")


def _get_dataset_urn(string: str):
    platform, dataset_id = string.split(":")
    
    _validate_platform(platform)

    return builder.make_dataset_urn(platform, dataset_id)


def send_lineage(gms_url, base_url, dag_id, task_id, upstream_urns: List[str], downstream_urns: str):

    input_datasets=[_get_dataset_urn(string) for string in upstream_urns]
    output_datasets=[_get_dataset_urn(string) for string in downstream_urns]

    flow_urn = builder.make_data_flow_urn("airflow", dag_id)
    job_urn = builder.make_data_job_urn_with_flow(flow_urn, task_id)

    flow_url = f"{base_url}/airflow/tree?dag_id={dag_id}"
    job_url = f"{base_url}/taskinstance/?flt0_dag_id_equals={dag_id}&flt3_task_id_equals={task_id}"

    flow_mce = models.MetadataChangeEventClass(
        proposedSnapshot=models.DataFlowSnapshotClass(
            urn=flow_urn,
            aspects=[
                models.DataFlowInfoClass(
                    name=dag_id,
                    externalUrl=flow_url,
                )
            ],
        )
    )

    job_mce = models.MetadataChangeEventClass(
        proposedSnapshot=models.DataJobSnapshotClass(
            urn=job_urn,
            aspects=[
                models.DataJobInfoClass(
                    name=task_id,
                    type=models.AzkabanJobTypeClass.COMMAND,
                    externalUrl=job_url,
                ),
                models.DataJobInputOutputClass(
                    inputDatasets=input_datasets,
                    outputDatasets=output_datasets,
                    inputDatajobs=[],
                )
            ],
        )
    )
    force_entity_materialization = [
        models.MetadataChangeEventClass(
            proposedSnapshot=models.DatasetSnapshotClass(
                urn=iolet,
                aspects=[
                    models.StatusClass(removed=False),
                ],
            )
        )
        for iolet in input_datasets + output_datasets
    ]
    mces = [
        flow_mce,
        job_mce,
        *force_entity_materialization
    ]

    emitter = DatahubRestEmitter(gms_url)

    for mce in mces:
        emitter.emit_mce(mce)


@click.command()
@click.option('--gms-url', required=True)
@click.option('--airflow-url', required=True)
@click.option('--dag-id', required=True)
@click.option('--task-id', required=True)
@click.option('--upstream-urn', required=True, multiple=True)
@click.option('--downstream-urn', required=True, multiple=True)
def send_job_lineage(gms_url, airflow_url, dag_id, task_id, upstream_urn, downstream_urn):
    send_lineage(
        gms_url,
        airflow_url,
        dag_id,
        task_id,
        list(upstream_urn),
        list(downstream_urn)
    )

send_job_lineage()
If I wanted to add it in it would be in this folder https://github.com/linkedin/datahub/tree/master/metadata-ingestion/src/datahub/cli, correct @mammoth-bear-12532?
b
@shy-island-99768