Hello team, today we had an issue while ingesting ...
# troubleshoot
c
Hello team, today we had an issue while ingesting redshift. The problem is, we were using the “host” field in recipe.yml file as host: <endpoint>/<db> name since today. But now, we had an error like:
Copy code
File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1324, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1443, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1494, in _execute_task
    result = execute_callable(context=context)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/airflow/dags/datahub_ingestion_dag.py", line 39, in datahub_redshift
    pipeline.run()
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/datahub/ingestion/run/pipeline.py", line 125, in run
    for wu in self.source.get_workunits():
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/datahub/ingestion/source/sql/redshift.py", line 253, in get_workunits
    lineage_mcp = self.get_lineage_mcp(wu.metadata.proposedSnapshot.urn)
  File "/home/airflow/airflow_venv/lib/python3.8/site-packages/datahub/ingestion/source/sql/redshift.py", line 272, in get_lineage_mcp
    tablename = dataset_params[2]
IndexError: list index out of range
After digging the problem, i realized that in redshift-py file, “dataset_params” array includes 3 parts inside by
dataset_params =dataset_key.name.split(".")
. And dataset_key doesn’t recognize our db-name from our recipe file since we wrote `host: <endpoint>/<db-name>`If i give the database name like
database:<db-name>
in recipe.yml file, it resolves this problem but then i had another error like:
Copy code
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/cli/ingest_cli.py", line 58, in run
  pipeline.run()
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/ingestion/run/pipeline.py", line 125, in run
  for wu in self.source.get_workunits():
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/ingestion/source/sql/redshift.py", line 248, in get_workunits
  for wu in super().get_workunits():
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/ingestion/source/sql/sql_common.py", line 364, in get_workunits
  yield from self.loop_tables(inspector, schema, sql_config)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/datahub/ingestion/source/sql/sql_common.py", line 435, in loop_tables
  columns = inspector.get_columns(table, schema)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/sqlalchemy/engine/reflection.py", line 390, in get_columns
  col_defs = self.dialect.get_columns(
File "<string>", line 2, in get_columns
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/sqlalchemy/engine/reflection.py", line 52, in cache
  ret = fn(self, con, *args, **kw)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/sqlalchemy_redshift/dialect.py", line 454, in get_columns
  cols = self._get_redshift_columns(connection, table_name, schema, **kw)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/sqlalchemy_redshift/dialect.py", line 705, in _get_redshift_columns
  return all_columns[key]

KeyError: '<our-schema-name>.<our-table-name>'
Everything was perfect until now. Datahub version is 0.8.16. I think there is with I need a solution for the first problem.
m
Hi @curved-jordan-15657, Is there a reason why you cannot use the "database" field in the yaml recipe ? In the above scenarios, what are you setting it to? Did you try setting the properties something like as follows - 1. host_port: jdbc_url:5439 2. database: dev ?
c
Hi @miniature-tiger-96062, Since there is no database required field in the documentation until now, when we took the endpoint directly from the redshift cluster and defined it as host_port=<host>:5439/<db-name>, it already considered the database after the slash. Therefore, we did not specify the extra database field.
But if i specify database field, i get the second error as i mentioned above, which is key_error
m
Hi @curved-jordan-15657 , I will try to reproduce and if you can share you recipe file, that would be helpful
c
Sure @miniature-tiger-96062, we have decided to use “database” field as written below.
Copy code
source:
  type: redshift
  config:
    # Coordinates
    host_port: "<our-host>:5439"
    database: etl
    username: <username>
    password: <password>

    include_views: True # whether to include views, defaults to True
    include_tables: True # whether to include views, defaults to True
        

sink:
  type: "datahub-kafka"
  config:
    connection:
      bootstrap: "<our-bootstrap>:9092"
      schema_registry_url: "http://<our-schema-registry>:8081"