Hi everyone. I use airflow in Docker to ingest met...
# ingestion
m
Hi everyone. I use airflow in Docker to ingest metadata from snowflake. But here is the error logs.
Copy code
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1307, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/Test_ingestion_dag.py", line 34, in datahub_recipe
    pipeline = Pipeline.create(config)
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 150, in create
    return cls(config, dry_run=dry_run, preview_mode=preview_mode)
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 116, in __init__
    self.source: Source = source_class.create(
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/source/sql/snowflake.py", line 182, in create
    config = SnowflakeConfig.parse_obj(config_dict)
  File "pydantic/main.py", line 511, in pydantic.main.BaseModel.parse_obj
  File "pydantic/main.py", line 331, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 4 validation errors for SnowflakeConfig
host_port
  field required (type=value_error.missing)
account_id
  extra fields not permitted (type=value_error.extra)
include_view_lineage
  extra fields not permitted (type=value_error.extra)
upstream_lineage_in_report
  extra fields not permitted (type=value_error.extra)
I use source.type: snowflake. And I can successfully ingest using CLI for this recipe.
b
Hi Chuxuan! would you mind posting the recipe here as well? just a look at the error it seems like you may have some missing and extra fields such as missing
host_port
and including unsupported fields like,
account_id
,
include_view_lineage
,
upstream_lineage_in_report
. This could certainly be a formatting issue as well since these fields are supported, but maybe they're in the wrong place?
m
source: type: snowflake config: account_id: <id> warehouse: <WH> username: <username> password: <passwd> include_table_lineage: True include_view_lineage: True upstream_lineage_in_report: true database_pattern: allow: - ^TM_DB\$ profiling: turn_off_expensive_profiling_metrics: True include_field_sample_values: True enabled: True sink: type: datahub-rest config: server: http://datahub-gms:8080 I can ingest metadata using this recipe using CLI or on UI before. Recently I deploy datahub server to AWS and use docker-compose to run airflow (in the same instance).
b
yup it is, just seeing that now. very weird.. what is your datahub version?
m
DataHub CLI version: 0.8.36
b
is it the same on your docker image? trying to figure out differences between trying to ingest your data via cli vs docker
m
Hi Chris. I upgrade acryl-datahub for airflow and the version is 0.8.38 But I still encounter the issue.
Copy code
[2022-06-15 03:12:17,383] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1307, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 150, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 161, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/Test_ingestion_dag.py", line 34, in datahub_recipe
    pipeline = Pipeline.create(config)
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 150, in create
    return cls(config, dry_run=dry_run, preview_mode=preview_mode)
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 116, in __init__
    self.source: Source = source_class.create(
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/source/sql/snowflake.py", line 182, in create
    config = SnowflakeConfig.parse_obj(config_dict)
  File "pydantic/main.py", line 511, in pydantic.main.BaseModel.parse_obj
  File "pydantic/main.py", line 331, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 2 validation errors for SnowflakeConfig
host_port
  field required (type=value_error.missing)
d
Can you run a
pip freeze
on your airflow instance? How do you run this ingestion?
m
(airflow)pip freeze acryl-datahub==0.8.38 adal==1.2.7 alembic==1.6.5 altair==4.2.0 amqp==2.6.1 anyio==3.3.0 apache-airflow==2.1.3 apache-airflow-providers-amazon==2.1.0 apache-airflow-providers-celery==2.0.0 apache-airflow-providers-cncf-kubernetes==2.0.1 apache-airflow-providers-docker==2.1.0 apache-airflow-providers-elasticsearch==2.0.2 apache-airflow-providers-ftp==2.0.0 apache-airflow-providers-google==5.0.0 apache-airflow-providers-grpc==2.0.0 apache-airflow-providers-hashicorp==2.0.0 apache-airflow-providers-http==2.0.0 apache-airflow-providers-imap==2.0.0 apache-airflow-providers-microsoft-azure==3.1.0 apache-airflow-providers-mysql==2.1.0 apache-airflow-providers-odbc==2.0.0 apache-airflow-providers-postgres==2.0.0 apache-airflow-providers-redis==2.0.0 apache-airflow-providers-sendgrid==2.0.0 apache-airflow-providers-sftp==2.1.0 apache-airflow-providers-slack==4.0.0 apache-airflow-providers-snowflake==2.4.0 apache-airflow-providers-sqlite==2.0.0 apache-airflow-providers-ssh==2.1.0 apispec==3.3.2 argcomplete==1.12.3 argon2-cffi==21.3.0 argon2-cffi-bindings==21.2.0 asn1crypto==1.4.0 attrs==20.3.0 Authlib==0.15.4 avro==1.10.2 avro-gen3==0.7.4 azure-batch==11.0.0 azure-common==1.1.27 azure-core==1.17.0 azure-cosmos==3.2.0 azure-datalake-store==0.0.52 azure-identity==1.6.0 azure-keyvault==4.1.0 azure-keyvault-certificates==4.3.0 azure-keyvault-keys==4.4.0 azure-keyvault-secrets==4.3.0 azure-kusto-data==0.0.45 azure-mgmt-containerinstance==1.5.0 azure-mgmt-core==1.3.0 azure-mgmt-datafactory==1.1.0 azure-mgmt-datalake-nspkg==3.0.1 azure-mgmt-datalake-store==0.5.0 azure-mgmt-nspkg==3.0.2 azure-mgmt-resource==19.0.0 azure-nspkg==3.0.2 azure-storage-blob==12.8.1 azure-storage-common==2.1.0 azure-storage-file==2.1.0 Babel==2.9.1 backcall==0.2.0 backports.entry-points-selectable==1.1.0 bcrypt==3.2.0 billiard==3.6.4.0 bleach==4.1.0 blinker==1.4 boto3==1.17.112 botocore==1.20.112 cachetools==4.2.2 cattrs==1.1.2 celery==4.4.7 certifi==2020.12.5 cffi==1.14.6 charset-normalizer==2.0.4 click==7.1.2 click-default-group==1.2.2 clickclick==20.10.2 cloudpickle==1.4.1 colorama==0.4.4 colorlog==4.8.0 commonmark==0.9.1 confluent-kafka==1.8.2 croniter==1.0.15 cryptography==3.4.8 dask==2021.6.0 debugpy==1.5.1 decorator==5.1.1 defusedxml==0.7.1 Deprecated==1.2.13 dill==0.3.1.1 distlib==0.3.2 distributed==2.19.0 dnspython==1.16.0 docker==5.0.0 docutils==0.16 elasticsearch==7.14.0 elasticsearch-dbapi==0.2.4 elasticsearch-dsl==7.4.0 email-validator==1.1.3 entrypoints==0.4 eventlet==0.31.1 expandvars==0.7.0 fastavro==1.4.9 filelock==3.0.12 Flask==1.1.4 Flask-AppBuilder==3.3.2 Flask-Babel==1.0.0 Flask-Caching==1.10.1 Flask-Cors==3.0.10 Flask-JWT-Extended==3.25.1 Flask-Login==0.4.1 Flask-OpenID==1.3.0 Flask-SQLAlchemy==2.5.1 Flask-WTF==0.14.3 flower==0.9.7 fsspec==2021.7.0 future==0.18.2 gevent==21.8.0 google-ads==13.0.0 google-api-core==1.31.2 google-api-python-client==1.12.8 google-auth==1.35.0 google-auth-httplib2==0.1.0 google-auth-oauthlib==0.4.5 google-cloud-appengine-logging==0.1.4 google-cloud-audit-log==0.1.0 google-cloud-automl==2.4.2 google-cloud-bigquery==2.24.0 google-cloud-bigquery-datatransfer==3.3.1 google-cloud-bigquery-storage==2.6.3 google-cloud-bigtable==1.7.0 google-cloud-container==1.0.1 google-cloud-core==1.7.2 google-cloud-datacatalog==3.4.0 google-cloud-dataproc==2.5.0 google-cloud-dlp==1.0.0 google-cloud-kms==2.5.0 google-cloud-language==1.3.0 google-cloud-logging==2.6.0 google-cloud-memcache==1.0.0 google-cloud-monitoring==2.4.2 google-cloud-os-login==2.3.1 google-cloud-pubsub==2.7.0 google-cloud-redis==2.2.2 google-cloud-secret-manager==1.0.0 google-cloud-spanner==1.19.1 google-cloud-speech==1.3.2 google-cloud-storage==1.42.0 google-cloud-tasks==2.5.1 google-cloud-texttospeech==1.0.1 google-cloud-translate==1.7.0 google-cloud-videointelligence==1.16.1 google-cloud-vision==1.0.0 google-cloud-workflows==1.2.1 google-crc32c==1.1.2 google-resumable-media==1.3.3 googleapis-common-protos==1.53.0 graphviz==0.17 great-expectations==0.14.4 greenlet==1.1.1 grpc-google-iam-v1==0.12.3 grpcio==1.39.0 grpcio-gcp==0.2.2 gunicorn==20.1.0 h11==0.12.0 HeapDict==1.0.1 httpcore==0.13.6 httplib2==0.19.1 httpx==0.18.2 humanfriendly==10.0 humanize==3.11.0 hvac==0.11.0 idna==3.2 importlib-metadata==4.10.1 importlib-resources==1.5.0 inflection==0.5.1 ipykernel==6.8.0 ipython==7.31.1 ipython-genutils==0.2.0 ipywidgets==7.6.5 iso8601==0.1.16 isodate==0.6.0 itsdangerous==1.1.0 jedi==0.18.1 Jinja2==2.11.3 jmespath==0.10.0 json-merge-patch==0.2 jsonpatch==1.32 jsonpointer==2.2 jsonschema==3.2.0 jupyter-client==7.1.2 jupyter-core==4.9.1 jupyterlab-pygments==0.1.2 jupyterlab-widgets==1.0.2 kombu==4.6.11 kubernetes==11.0.0 lazy-object-proxy==1.4.3 ldap3==2.9.1 libcst==0.3.20 lkml==1.1.2 locket==0.2.1 lockfile==0.12.2 looker-sdk==21.6.0 Mako==1.1.4 Markdown==3.3.4 MarkupSafe==1.1.1 marshmallow==3.13.0 marshmallow-enum==1.5.1 marshmallow-oneofschema==3.0.1 marshmallow-sqlalchemy==0.23.1 matplotlib-inline==0.1.3 mistune==0.8.4 mixpanel==4.9.0 more-itertools==8.12.0 msal==1.13.0 msal-extensions==0.3.0 msgpack==1.0.2 msrest==0.6.21 msrestazure==0.6.4 mypy-extensions==0.4.3 mysql-connector-python==8.0.22 mysqlclient==2.0.3 nbclient==0.5.10 nbconvert==6.4.1 nbformat==5.1.3 nest-asyncio==1.5.4 networkx==2.6.3 notebook==6.4.8 nox==2020.12.31 numpy==1.20.3 oauthlib==3.1.1 openapi-schema-validator==0.1.5 openapi-spec-validator==0.3.1 oscrypto==1.2.1 packaging==20.9 pandas==1.3.2 pandas-gbq==0.14.1 pandocfilters==1.5.0 paramiko==2.7.2 parso==0.8.3 partd==1.2.0 pendulum==2.1.2 pexpect==4.8.0 pickleshare==0.7.5 platformdirs==2.2.0 portalocker==1.7.1 prison==0.1.3 progressbar2==4.0.0 prometheus-client==0.8.0 prompt-toolkit==3.0.26 proto-plus==1.19.0 protobuf==3.17.3 psutil==5.8.0 psycopg2-binary==2.9.1 ptyprocess==0.7.0 py==1.10.0 pyarrow==3.0.0 pyasn1==0.4.8 pyasn1-modules==0.2.8 pybigquery==0.10.2 pycparser==2.20 pycryptodomex==3.14.0 pydantic==1.9.0 pydata-google-auth==1.2.0 Pygments==2.10.0 PyJWT==1.7.1 PyMySQL==1.0.2 PyNaCl==1.4.0 pyodbc==4.0.31 pyOpenSSL==20.0.1 pyparsing==2.4.7 pyrsistent==0.18.0 pysftp==0.2.9 python-daemon==2.3.0 python-dateutil==2.8.2 python-editor==1.0.4 python-http-client==3.3.2 python-ldap==3.3.1 python-nvd3==0.15.0 python-slugify==4.0.1 python-utils==3.1.0 python3-openid==3.2.0 pytz==2021.1 pytz-deprecation-shim==0.1.0.post0 pytzdata==2020.1 PyYAML==5.4.1 pyzmq==22.3.0 ratelimiter==1.2.0.post0 redis==3.5.3 requests==2.26.0 requests-oauthlib==1.3.0 rfc3986==1.5.0 rich==10.7.0 rsa==4.7.2 ruamel.yaml==0.17.17 ruamel.yaml.clib==0.2.6 s3transfer==0.4.2 scipy==1.7.3 Send2Trash==1.8.0 sendgrid==6.8.0 setproctitle==1.2.2 six==1.16.0 slack-sdk==3.9.1 sniffio==1.2.0 snowflake-connector-python==2.7.1 snowflake-sqlalchemy==1.2.4 sortedcontainers==2.4.0 sql-metadata==2.2.2 SQLAlchemy==1.3.24 SQLAlchemy-JSONField==1.0.0 SQLAlchemy-Utils==0.37.8 sqllineage==1.3.3 sqlparse==0.4.2 sshtunnel==0.1.5 stackprinter==0.2.5 starkbank-ecdsa==1.1.1 statsd==3.3.0 swagger-ui-bundle==0.0.8 tabulate==0.8.9 tblib==1.7.0 tenacity==6.2.0 termcolor==1.1.0 terminado==0.13.1 testpath==0.5.0 text-unidecode==1.3 toml==0.10.2 toolz==0.11.1 tornado==6.1 tqdm==4.62.3 traitlets==5.1.1 types-Deprecated==1.2.8 types-termcolor==1.1.4 typing-extensions==3.10.0.2 typing-inspect==0.7.1 tzdata==2021.5 tzlocal==4.1 unicodecsv==0.14.1 uritemplate==3.0.1 urllib3==1.26.6 vine==1.3.0 virtualenv==20.7.2 watchtower==1.0.6 wcwidth==0.2.5 webencodings==0.5.1 websocket-client==1.2.1 Werkzeug==1.0.1 widgetsnbextension==3.5.2 wrapt==1.14.1 WTForms==2.3.3 zict==2.0.0 zipp==3.7.0 zope.event==4.5.0 zope.interface==5.4.0
Copy code
from datetime import timedelta

from airflow import DAG

try:
    from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
    from airflow.operators.python_operator import PythonOperator

from airflow.utils.dates import days_ago

from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.run.pipeline import Pipeline

# Change the owner
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=2),
    "execution_timeout": timedelta(minutes=120),
}


def datahub_recipe():
    # add file path
    config = load_config_file("/opt/airflow/recipes/TEST_recipe.yml")

    pipeline = Pipeline.create(config)
    pipeline.run()
    pipeline.raise_from_status()


with DAG(
    "datahub_ingest_using_recipe",
    default_args=default_args,
    description="An example DAG which runs a DataHub ingestion recipe",
    # set schedule interval
    schedule_interval=timedelta(days=7),
    start_date=days_ago(2),
    catchup=False,
) as dag:
    ingest_task = PythonOperator(
        task_id="ingest_using_recipe",
        python_callable=datahub_recipe,
    )
I create a DAG to ingest metadata.