Hi ,
I am trying to ingest mysql metadata to datahub using airflow
code:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonVirtualenvOperator
def ingest_from_mysql():
from datahub.ingestion.run.pipeline import Pipeline
pipeline = Pipeline.create(
# This configuration is analogous to a recipe configuration.
{
"source": {
"type": "mysql",
"config": {
# If you want to use Airflow connections, take a look at the snowflake_sample_dag.py example.
"username": "user",
"password": "pass",
"database": "db_name",
"host_port": "localhost:3306",
},
},
"sink": {
"type": "datahub-rest",
"config": {"server": "
http://localhost:8080"},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status()
with DAG(
"datahub_mysql_ingest",
default_args={
"owner": "airflow",
},
description="An example DAG which ingests metadata from MySQL to DataHub",
start_date=datetime(2022, 1, 1),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
task0 = PythonOperator(
task_id="mysql_metadata",
python_callable=ingest_from_mysql,
dag=dag
)
task0
when i run the dag i am getting
AttributeError: 'function' object has no attribute 'run'
Can anyone please help me with this