silly-ice-4153
07/15/2022, 3:13 PMFile "/home/airflow/.local/lib/python3.8/site-packages/requests/sessions.py", line 742, in get_adapter
raise InvalidSchema("No connection adapters were found for {!r}".format(url))
requests.exceptions.InvalidSchema: No connection adapters were found for 'xxx:8080/entities?action=ingest'
I'm using the following code for connection - I put it the Connection UI for host my hostname and :8080 - the port is open
from datetime import timedelta
from airflow import DAG
try:
from airflow.operators.bash import BashOperator
except ModuleNotFoundError:
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
import datahub.emitter.mce_builder as builder
from datahub_provider.operators.datahub import DatahubEmitterOperator
default_args = {
"owner": "airflow",
"depends_on_past": False,
"email": ["<mailto:jdoe@example.com|jdoe@example.com>"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(minutes=120),
}
with DAG(
"datahub_lineage_emission_example",
default_args=default_args,
description="An example DAG demonstrating lineage emission within an Airflow DAG.",
schedule_interval=timedelta(days=1),
start_date=days_ago(2),
catchup=False,
) as dag:
# This example shows a SnowflakeOperator followed by a lineage emission. However, the
# same DatahubEmitterOperator can be used to emit lineage in any context.
transformation_task = BashOperator(
task_id="bash_test",
dag=dag,
bash_command="echo 'This is where you might run your data tooling.'",
)
emit_lineage_task = DatahubEmitterOperator(
task_id="emit_lineage",
datahub_conn_id="datahub_rest_default",
mces=[
builder.make_lineage_mce(
upstream_urns=[
builder.make_dataset_urn("postgres", "postgres.zoom.events"),
],
downstream_urn=builder.make_dataset_urn(
"postgres", "postgres.zoom.events"
),
)
],
)
transformation_task >> emit_lineage_task
silly-ice-4153
07/15/2022, 3:51 PM*** Reading local file: /opt/airflow/logs/datahub_lineage_emission_example/emit_lineage/2022-07-15T15:06:43.883765+00:00/1.log
[2022-07-15 15:06:47,162] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: datahub_lineage_emission_example.emit_lineage 2022-07-15T15:06:43.883765+00:00 [queued]>
[2022-07-15 15:06:47,184] {taskinstance.py:896} INFO - Dependencies all met for <TaskInstance: datahub_lineage_emission_example.emit_lineage 2022-07-15T15:06:43.883765+00:00 [queued]>
[2022-07-15 15:06:47,185] {taskinstance.py:1087} INFO -
--------------------------------------------------------------------------------
[2022-07-15 15:06:47,185] {taskinstance.py:1088} INFO - Starting attempt 1 of 2
[2022-07-15 15:06:47,185] {taskinstance.py:1089} INFO -
--------------------------------------------------------------------------------
[2022-07-15 15:06:47,195] {taskinstance.py:1107} INFO - Executing <Task(DatahubEmitterOperator): emit_lineage> on 2022-07-15T15:06:43.883765+00:00
[2022-07-15 15:06:47,220] {standard_task_runner.py:52} INFO - Started process 1834 to run task
[2022-07-15 15:06:47,233] {standard_task_runner.py:76} INFO - Running: ['***', 'tasks', 'run', 'datahub_lineage_emission_example', 'emit_lineage', '2022-07-15T15:06:43.883765+00:00', '--job-id', '6388', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/lineage_test3.py', '--cfg-path', '/tmp/tmpnq4frg8x', '--error-file', '/tmp/tmpyvyw_rnd']
[2022-07-15 15:06:47,237] {standard_task_runner.py:77} INFO - Job 6388: Subtask emit_lineage
[2022-07-15 15:06:47,337] {logging_mixin.py:104} INFO - Running <TaskInstance: datahub_lineage_emission_example.emit_lineage 2022-07-15T15:06:43.883765+00:00 [running]> on host f3822c1d5fd6
[2022-07-15 15:06:47,445] {taskinstance.py:1300} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=jdoe@example.com
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=datahub_lineage_emission_example
AIRFLOW_CTX_TASK_ID=emit_lineage
AIRFLOW_CTX_EXECUTION_DATE=2022-07-15T15:06:43.883765+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-07-15T15:06:43.883765+00:00
[2022-07-15 15:06:47,469] {base.py:69} INFO - Using connection to: id: ***_rest_default. Host: ***xxx:8080, Port: None, Schema: , Login: , Password: ***, extra: {}
[2022-07-15 15:06:47,485] {base.py:69} INFO - Using connection to: id: ***_rest_default. Host: ***xxx:8080, Port: None, Schema: , Login: , Password: ***, extra: {}
[2022-07-15 15:06:47,491] {taskinstance.py:1501} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/datahub/emitter/rest_emitter.py", line 229, in _emit_generic
response = <http://self._session.post|self._session.post>(
File "/home/airflow/.local/lib/python3.8/site-packages/requests/sessions.py", line 590, in post
return self.request('POST', url, data=data, json=json, **kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/requests/sessions.py", line 542, in request
resp = self.send(prep, **send_kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/requests/sessions.py", line 649, in send
adapter = self.get_adapter(url=request.url)
File "/home/airflow/.local/lib/python3.8/site-packages/requests/sessions.py", line 742, in get_adapter
raise InvalidSchema("No connection adapters were found for {!r}".format(url))
requests.exceptions.InvalidSchema: No connection adapters were found for 'xxx:8080/entities?action=ingest'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1356, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.8/site-packages/datahub_provider/operators/datahub.py", line 63, in execute
self.generic_hook.get_underlying_hook().emit_mces(self.mces)
File "/home/airflow/.local/lib/python3.8/site-packages/datahub_provider/hooks/datahub.py", line 83, in emit_mces
emitter.emit_mce(mce)
File "/home/airflow/.local/lib/python3.8/site-packages/datahub/emitter/rest_emitter.py", line 196, in emit_mce
self._emit_generic(url, payload)
File "/home/airflow/.local/lib/python3.8/site-packages/datahub/emitter/rest_emitter.py", line 248, in _emit_generic
raise OperationalError(
datahub.configuration.common.OperationalError: ('Unable to emit metadata to DataHub GMS', {'message': "No connection adapters were found for '***xxxx:8080/entities?action=ingest'"})
[2022-07-15 15:06:47,494] {taskinstance.py:1544} INFO - Marking task as UP_FOR_RETRY. dag_id=***_lineage_emission_example, task_id=emit_lineage, execution_date=20220715T150643, start_date=20220715T150647, end_date=20220715T150647
[2022-07-15 15:06:47,528] {local_task_job.py:149} INFO - Task exited with return code 1
strong-author-11562
06/06/2023, 4:40 AM