modern-laptop-12942
06/14/2022, 4:32 PMTraceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1307, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/Test_ingestion_dag.py", line 34, in datahub_recipe
pipeline = Pipeline.create(config)
File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 150, in create
return cls(config, dry_run=dry_run, preview_mode=preview_mode)
File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 116, in __init__
self.source: Source = source_class.create(
File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/source/sql/snowflake.py", line 182, in create
config = SnowflakeConfig.parse_obj(config_dict)
File "pydantic/main.py", line 511, in pydantic.main.BaseModel.parse_obj
File "pydantic/main.py", line 331, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 4 validation errors for SnowflakeConfig
host_port
field required (type=value_error.missing)
account_id
extra fields not permitted (type=value_error.extra)
include_view_lineage
extra fields not permitted (type=value_error.extra)
upstream_lineage_in_report
extra fields not permitted (type=value_error.extra)
I use source.type: snowflake. And I can successfully ingest using CLI for this recipe.bulky-soccer-26729
06/14/2022, 4:49 PMhost_port
and including unsupported fields like, account_id
, include_view_lineage
, upstream_lineage_in_report
. This could certainly be a formatting issue as well since these fields are supported, but maybe they're in the wrong place?modern-laptop-12942
06/14/2022, 4:54 PMmodern-laptop-12942
06/14/2022, 4:57 PMbulky-soccer-26729
06/14/2022, 4:57 PMmodern-laptop-12942
06/14/2022, 5:16 PMbulky-soccer-26729
06/14/2022, 5:29 PMmodern-laptop-12942
06/15/2022, 3:19 AM[2022-06-15 03:12:17,383] {taskinstance.py:1462} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1164, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1282, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1307, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 150, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 161, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/Test_ingestion_dag.py", line 34, in datahub_recipe
pipeline = Pipeline.create(config)
File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 150, in create
return cls(config, dry_run=dry_run, preview_mode=preview_mode)
File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 116, in __init__
self.source: Source = source_class.create(
File "/home/airflow/.local/lib/python3.9/site-packages/datahub/ingestion/source/sql/snowflake.py", line 182, in create
config = SnowflakeConfig.parse_obj(config_dict)
File "pydantic/main.py", line 511, in pydantic.main.BaseModel.parse_obj
File "pydantic/main.py", line 331, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 2 validation errors for SnowflakeConfig
host_port
field required (type=value_error.missing)
dazzling-judge-80093
06/15/2022, 11:59 AMpip freeze
on your airflow instance?
How do you run this ingestion?modern-laptop-12942
06/15/2022, 1:06 PMmodern-laptop-12942
06/15/2022, 1:06 PMfrom datetime import timedelta
from airflow import DAG
try:
from airflow.operators.python import PythonOperator
except ModuleNotFoundError:
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.run.pipeline import Pipeline
# Change the owner
default_args = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=2),
"execution_timeout": timedelta(minutes=120),
}
def datahub_recipe():
# add file path
config = load_config_file("/opt/airflow/recipes/TEST_recipe.yml")
pipeline = Pipeline.create(config)
pipeline.run()
pipeline.raise_from_status()
with DAG(
"datahub_ingest_using_recipe",
default_args=default_args,
description="An example DAG which runs a DataHub ingestion recipe",
# set schedule interval
schedule_interval=timedelta(days=7),
start_date=days_ago(2),
catchup=False,
) as dag:
ingest_task = PythonOperator(
task_id="ingest_using_recipe",
python_callable=datahub_recipe,
)
I create a DAG to ingest metadata.