lemon-engine-23512
09/23/2022, 11:48 AMdazzling-judge-80093
09/23/2022, 11:49 AMlemon-engine-23512
09/23/2022, 11:53 AMgeneric_recipi_sample _dag.py
just update the config path with s3 location @dazzling-judge-80093lemon-engine-23512
09/23/2022, 11:57 AM"""Generic DataHub Ingest via Recipe
This example demonstrates how to load any configuration file and run a
DataHub ingestion pipeline within an Airflow DAG.
"""
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.run.pipeline import Pipeline
from airflow.hooks.S3_hook import S3Hook
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["<mailto:airflow@airflow.com|airflow@airflow.com>"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
def datahub_recipe():
config = load_config_file("<s3://***/csv_recipi.yaml|s3://***/csv_recipi.yaml>")
pipeline = Pipeline.create(config)
pipeline.run()
pipeline.raise_from_status()
with DAG(
"datahub_ingest_using_recipe",
default_args=default_args,
description="An example DAG which runs a DataHub ingestion recipe",
schedule_interval=timedelta(days=1),
start_date=datetime(2022, 9, 23),
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=datahub_recipe,
)
lemon-engine-23512
09/23/2022, 12:14 PMdazzling-judge-80093
09/23/2022, 12:15 PMlemon-engine-23512
09/23/2022, 12:18 PMdazzling-judge-80093
09/23/2022, 1:01 PMlemon-engine-23512
09/23/2022, 1:02 PMdazzling-judge-80093
09/23/2022, 1:03 PMdazzling-judge-80093
09/23/2022, 1:04 PMAmazon MWAA automatically syncs new and changed objects from your Amazon S3 bucket to Amazon MWAA scheduler and worker containers' /usr/local/airflow/dags folder every 30 seconds, preserving the Amazon S3 source's file hierarchy, regardless of file type. The time that new DAGs take to appear in your Apache Airflow UI is controlled by scheduler.dag_dir_list_interval. Changes to existing DAGs will be picked up on the next DAG processing loop.
dazzling-judge-80093
09/23/2022, 1:04 PMlemon-engine-23512
09/23/2022, 1:08 PMlemon-engine-23512
09/23/2022, 2:06 PMdazzling-judge-80093
09/23/2022, 2:20 PMlemon-engine-23512
09/23/2022, 2:29 PMdazzling-judge-80093
09/23/2022, 2:29 PM