square-activity-64562
07/30/2021, 7:36 AMacryl-datahub[airflow]==0.8.6.4
as a package but it is not showing connection type in airflow UIloud-island-88694
breezy-glass-7892
08/02/2021, 9:53 PMsquare-activity-64562
08/03/2021, 5:36 AMThe configuration properties are stored in a file called airflow.cfg the first time you run Apache Airflow. You can choose to change these properties and override the default values. Sections include: core, cli, api, operators, hive, webserver, email, smtp, celery, celery_broker_transport_options, dask, scheduler, ldap, mesos, kerberos, github_enterprise, admin.
If yes, how did you add indentation? https://datahubproject.io/docs/metadata-ingestion#using-datahubs-airflow-lineage-backend-recommended. This is a simple textbox. Not really sure how to add indentation in herebreezy-glass-7892
08/03/2021, 5:38 AMsquare-activity-64562
08/03/2021, 5:38 AMbreezy-glass-7892
08/03/2021, 5:41 AMbreezy-glass-7892
08/03/2021, 5:41 AMsquare-activity-64562
08/03/2021, 5:41 AMbreezy-glass-7892
08/03/2021, 5:41 AMbreezy-glass-7892
08/03/2021, 5:42 AMbreezy-glass-7892
08/03/2021, 5:42 AMbreezy-glass-7892
08/03/2021, 5:42 AMsquare-activity-64562
08/03/2021, 5:44 AMbreezy-glass-7892
08/03/2021, 5:44 AMbreezy-glass-7892
08/03/2021, 5:44 AMsquare-activity-64562
08/03/2021, 5:49 AMbreezy-glass-7892
08/03/2021, 5:53 AMairflow.cfg
filesquare-activity-64562
08/03/2021, 5:54 AMbreezy-glass-7892
08/03/2021, 5:55 AMbreezy-glass-7892
08/03/2021, 5:55 AMbreezy-glass-7892
08/03/2021, 5:56 AMsquare-activity-64562
08/03/2021, 5:56 AMsquare-activity-64562
08/03/2021, 5:58 AMdatahub_kwargs
?square-activity-64562
08/03/2021, 6:06 AMlinkedin/datahub-ingestion
and https://github.com/linkedin/datahub/blob/master/metadata-ingestion/examples/library/lineage_emitter_rest.py to ingest it. Will have to wrap it up to ad airflow specific info. This will be better for our use casemammoth-bear-12532
mammoth-bear-12532
mammoth-bear-12532
square-activity-64562
08/03/2021, 9:17 AMimport click
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.emitter.rest_emitter import DatahubRestEmitter
from typing import List
PLATFORMS = [
'athena',
'postgres'
]
def _validate_platform(platform):
if platform not in PLATFORMS:
raise RuntimeError(f"Unknown platform ${platform} passed in urn")
def _get_dataset_urn(string: str):
platform, dataset_id = string.split(":")
_validate_platform(platform)
return builder.make_dataset_urn(platform, dataset_id)
def send_lineage(gms_url, base_url, dag_id, task_id, upstream_urns: List[str], downstream_urns: str):
input_datasets=[_get_dataset_urn(string) for string in upstream_urns]
output_datasets=[_get_dataset_urn(string) for string in downstream_urns]
flow_urn = builder.make_data_flow_urn("airflow", dag_id)
job_urn = builder.make_data_job_urn_with_flow(flow_urn, task_id)
flow_url = f"{base_url}/airflow/tree?dag_id={dag_id}"
job_url = f"{base_url}/taskinstance/?flt0_dag_id_equals={dag_id}&flt3_task_id_equals={task_id}"
flow_mce = models.MetadataChangeEventClass(
proposedSnapshot=models.DataFlowSnapshotClass(
urn=flow_urn,
aspects=[
models.DataFlowInfoClass(
name=dag_id,
externalUrl=flow_url,
)
],
)
)
job_mce = models.MetadataChangeEventClass(
proposedSnapshot=models.DataJobSnapshotClass(
urn=job_urn,
aspects=[
models.DataJobInfoClass(
name=task_id,
type=models.AzkabanJobTypeClass.COMMAND,
externalUrl=job_url,
),
models.DataJobInputOutputClass(
inputDatasets=input_datasets,
outputDatasets=output_datasets,
inputDatajobs=[],
)
],
)
)
force_entity_materialization = [
models.MetadataChangeEventClass(
proposedSnapshot=models.DatasetSnapshotClass(
urn=iolet,
aspects=[
models.StatusClass(removed=False),
],
)
)
for iolet in input_datasets + output_datasets
]
mces = [
flow_mce,
job_mce,
*force_entity_materialization
]
emitter = DatahubRestEmitter(gms_url)
for mce in mces:
emitter.emit_mce(mce)
@click.command()
@click.option('--gms-url', required=True)
@click.option('--airflow-url', required=True)
@click.option('--dag-id', required=True)
@click.option('--task-id', required=True)
@click.option('--upstream-urn', required=True, multiple=True)
@click.option('--downstream-urn', required=True, multiple=True)
def send_job_lineage(gms_url, airflow_url, dag_id, task_id, upstream_urn, downstream_urn):
send_lineage(
gms_url,
airflow_url,
dag_id,
task_id,
list(upstream_urn),
list(downstream_urn)
)
send_job_lineage()
square-activity-64562
08/03/2021, 9:17 AMbrave-secretary-27487
08/10/2022, 12:46 PM