curved-jordan-15657
10/24/2021, 8:49 PMFile "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
args.func(args, dag=self.dag)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
return func(*args, **kwargs)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
return f(*args, **kwargs)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
_run_task_by_selected_method(args, dag, ti)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
_run_raw_task(args, ti)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
ti._run_raw_task(
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
return func(*args, session=session, **kwargs)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1324, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1443, in _execute_task_with_callbacks
result = self._execute_task(context, self.task)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1494, in _execute_task
result = execute_callable(context=context)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/operators/python.py", line 151, in execute
return_value = self.execute_callable()
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/operators/python.py", line 162, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/airflow/dags/datahub_ingestion_dag.py", line 39, in datahub_redshift
pipeline.run()
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 125, in run
for wu in self.source.get_workunits():
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/datahub/ingestion/source/sql/redshift.py", line 253, in get_workunits
lineage_mcp = self.get_lineage_mcp(wu.metadata.proposedSnapshot.urn)
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/datahub/ingestion/source/sql/redshift.py", line 272, in get_lineage_mcp
tablename = dataset_params[2]
IndexError: list index out of range
After digging the problem, i realized that in redshift-py file, “dataset_params” array includes 3 parts inside by dataset_params =dataset_key.name.split(".")
. And dataset_key doesn’t recognize our db-name from our recipe file since we wrote `host: <endpoint>/<db-name>`If i give the database name like database:<db-name>
in recipe.yml file, it resolves this problem but then i had another error like:
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/cli/ingest_cli.py", line 58, in run
pipeline.run()
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 125, in run
for wu in self.source.get_workunits():
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/ingestion/source/sql/redshift.py", line 248, in get_workunits
for wu in super().get_workunits():
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/ingestion/source/sql/sql_common.py", line 364, in get_workunits
yield from self.loop_tables(inspector, schema, sql_config)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/ingestion/source/sql/sql_common.py", line 435, in loop_tables
columns = inspector.get_columns(table, schema)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/sqlalchemy/engine/reflection.py", line 390, in get_columns
col_defs = self.dialect.get_columns(
File "<string>", line 2, in get_columns
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/sqlalchemy/engine/reflection.py", line 52, in cache
ret = fn(self, con, *args, **kw)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/sqlalchemy_redshift/dialect.py", line 454, in get_columns
cols = self._get_redshift_columns(connection, table_name, schema, **kw)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/sqlalchemy_redshift/dialect.py", line 705, in _get_redshift_columns
return all_columns[key]
KeyError: '<our-schema-name>.<our-table-name>'
Everything was perfect until now. Datahub version is 0.8.16. I think there is with I need a solution for the first problem.miniature-tiger-96062
10/25/2021, 12:45 AMcurved-jordan-15657
10/25/2021, 10:10 AMcurved-jordan-15657
10/25/2021, 10:11 AMminiature-tiger-96062
10/25/2021, 2:56 PMcurved-jordan-15657
10/25/2021, 3:07 PMsource:
type: redshift
config:
# Coordinates
host_port: "<our-host>:5439"
database: etl
username: <username>
password: <password>
include_views: True # whether to include views, defaults to True
include_tables: True # whether to include views, defaults to True
sink:
type: "datahub-kafka"
config:
connection:
bootstrap: "<our-bootstrap>:9092"
schema_registry_url: "http://<our-schema-registry>:8081"