Hi team, am trying to schedule ingestion airflow(m...
# ingestion
l
Hi team, am trying to schedule ingestion airflow(managed apache airflow on aws, mwaa) . Here we upload dag and yaml files to s3 location. But when i rub schedule in airflow i get error Datahub.configuration.common.ConfigurationError:cannot open config file --s3path to yaml Anyway to resolve this? Thank you
d
Which source did you try to use and how is your dag looks like?
l
Trying with csv. Dag is the exact as
generic_recipi_sample _dag.py
just update the config path with s3 location @dazzling-judge-80093
Copy code
"""Generic DataHub Ingest via Recipe
This example demonstrates how to load any configuration file and run a
DataHub ingestion pipeline within an Airflow DAG.
"""

from datetime import timedelta, datetime

from airflow import DAG


from airflow.operators.python_operator import PythonOperator

from airflow.utils.dates import days_ago

from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.run.pipeline import Pipeline
from airflow.hooks.S3_hook import S3Hook

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email": ["<mailto:airflow@airflow.com|airflow@airflow.com>"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(minutes=120),
}


def datahub_recipe():
    config = load_config_file("<s3://***/csv_recipi.yaml|s3://***/csv_recipi.yaml>")

    pipeline = Pipeline.create(config)
    pipeline.run()
    pipeline.raise_from_status()


with DAG(
    "datahub_ingest_using_recipe",
    default_args=default_args,
    description="An example DAG which runs a DataHub ingestion recipe",
    schedule_interval=timedelta(days=1),
    start_date=datetime(2022, 9, 23),
    catchup=False,
) as dag:
    ingest_task = PythonOperator(
        task_id="ingest_using_recipe",
        python_callable=datahub_recipe,
    )
Not sure if the function load_config_file can read s3 path, or if any function for it @dazzling-judge-80093
d
it can’t load files from s3
l
So any solution for this? For our mwaa files are in s3. how do we schedule?
d
You store dags on s3, but I think MWAA loads it from the local disk. Try adding the config next to your dag and specify a relative location.
l
We are actually loading it to s3 itself and not on disj. Once we add in s3 automatically dag is created
d
I know
Copy code
Amazon MWAA automatically syncs new and changed objects from your Amazon S3 bucket to Amazon MWAA scheduler and worker containers' /usr/local/airflow/dags folder every 30 seconds, preserving the Amazon S3 source's file hierarchy, regardless of file type. The time that new DAGs take to appear in your Apache Airflow UI is controlled by scheduler.dag_dir_list_interval. Changes to existing DAGs will be picked up on the next DAG processing loop.
l
Can you please explain what you mean by adding config next to dag??
Ahhh . Ok so files are copied to /usr/local/airflow/dags. I can find the path that our instance is configured to and use it. Guess that makes sense. Thank you @dazzling-judge-80093 One more question. So when i run the recipi in airflow the rest sink should point to the ip address of the datahub instance right. Cant use licalhost anymore
d
You can’t use localhost, you have to point to the datahub instance and make sure mwaa worker node can access it
l
Thank you, the path worked :))👍🏻👍🏻 @dazzling-judge-80093
d
awesome 🙂