Dear Datahub experts, My use case is to track the...
# advice-metadata-modeling
w
Dear Datahub experts, My use case is to track the datasets and the docker images (/image tags) used in an Airflow data transformation + ML train/test pipeline. Each stage of my pipeline is a DockerOperator or BashOperator. I ran the demos of Datahub + Airflow, went thru the codebase that generates the lineage information and noticed the following for make_dataset_urn() in mce_builder.py: return f"urnlidataset:({make_data_platform_urn(platform)},{name},{env})" I need to send the values passed for the image argument (to the DockerOperator), along with a reference to the datasets for lineage tracking. I understand, I can coin a dict and pass them as inlets so that lineage information can be sent. But I don't think there exists a schema in Datahub that would allow this. Please let me know how I can go about my task. Thanks,
o
Hi! Have you checked out the DataJob model?: https://demo.datahubproject.io/dataset/urn:li:dataset:(urn:li:dataPlatform:datahub,DataJob,PROD)/Schema?is_lineage_mode=false From the sound of it you have an Airflow DAG with some processes that execute along it (either in a docker container or through some bash processor). Airflow DAGs can be modeled using the DataFlow model with processes occurring along the pipeline that have links out to input & output Datasets modeled as DataJobs. For supporting values passed to the image argument you can utilize the customProperties map on DataJobInfo if there are not directly relevant fields relating to what the values represent contained within the model. Please let me know if I'm misunderstanding your set-up or missing something! 🙂
w
Thanks a lot for the pointer there! I went thru the following: https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub_provider/_lineage_core.py And noticed that the set of fields that are sent as CustomProperties by the datahub lineage backend for Airflow are restricted (not everything is sent to datahub) as per lines 151-176, pasting below. I suppose I need to fix this in order for the other args & values to be sent.
Copy code
for key in task.get_serialized_fields():
        if key not in job_property_bag:
            job_property_bag[key] = repr(getattr(task, key))
    # <http://operator.log.info|operator.log.info>(f"{flow_property_bag=}")
    # <http://operator.log.info|operator.log.info>(f"{job_property_bag=}")
    allowed_task_keys = [
        "_downstream_task_ids",
        "_inlets",
        "_outlets",
        "_task_type",
        "_task_module",
        "depends_on_past",
        "email",
        "label",
        "execution_timeout",
        "end_date",
        "start_date",
        "sla",
        "sql",
        "task_id",
        "trigger_rule",
        "wait_for_downstream",
    ]
    job_property_bag = {
        k: v for (k, v) in job_property_bag.items() if k in allowed_task_keys
    }
.....
.....
 flow_mce = models.MetadataChangeEventClass(
        proposedSnapshot=models.DataFlowSnapshotClass(
            urn=flow_urn,
            aspects=[
                models.DataFlowInfoClass(
                    name=dag.dag_id,
                    description=f"{dag.description}\n\n{dag.doc_md or ''}",
                    customProperties=flow_property_bag,
                    externalUrl=flow_url,
                ),
                *ownership_aspect,
                *tags_aspect,
            ],
        )
    )