silly-oil-35180
09/08/2022, 4:27 AMDataHub Kafka Sink
). So I set up airflow connections like this.
$ airflow connections add --conn-type 'datahub_kafka' 'datahub_rest_default' --conn-host 'broker:9092' --conn-extra '{}'
because i didn’t modify datahub.datahub_conn_id
in airflow.cfg
, i am using datahub_rest_default
as connection name.
When I trigger DAG, I get this error log.
[2022-09-08, 11:49:28 ] {log.py:232} WARNING - 2022-09-08, 11:49:28 WARNING Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
[2022-09-08, 11:49:28 ] {connectionpool.py:810} WARNING - Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
[2022-09-08, 11:49:33 ] {log.py:232} WARNING - 2022-09-08, 11:49:33 WARNING Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
[2022-09-08, 11:49:33 ] {connectionpool.py:810} WARNING - Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
I don’t know why Connection reset by peer
happens..
Anyone who has exp on kafka sink..?silly-oil-35180
09/08/2022, 4:43 AM[2022-09-08, 13:31:34 ] {log.py:232} WARNING - 2022-09-08, 13:31:34 WARNING Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
...
datahub.configuration.common.OperationalError: ('Unable to emit metadata to DataHub GMS', {'message': "HTTPConnectionPool(host='broker', port=9092): Max retries exceeded with url: /aspects?action=ingestProposal (Caused by ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer')))"})
I think datahub_kafka
connection uses DataHub GMS REST API, not Kafka.dazzling-judge-80093
09/08/2022, 9:34 AMdatahub_rest_default
to something which doesn’t contain the string rest
. If rest
is in the connection name then we assume it is a rest connection ->
https://github.com/datahub-project/datahub/blob/f99d27fd8cb3bf1a2906b731ebe9e0fcbe[…]c69c47/metadata-ingestion/src/datahub_provider/hooks/datahub.pysilly-oil-35180
09/13/2022, 1:19 AMairflow.cfg
.
I add 2 lines in airflow.cfg
to change datahub config.
[datahub]
cluster = PROD
datahub_conn_id = datahub_kafka_default
However, when I run airflow dag, Airflow doesn’t get datahub_conn_id
which I changed(datahub_kafka_default
). It still finds datahub_rest_default
conn_id.
Am I wrong to add config to airflow.cfg
?dazzling-judge-80093
09/13/2022, 10:55 AM[datahub]
cluster = PROD
conn_id = datahub_kafka_default
silly-oil-35180
09/13/2022, 11:39 AMdatahub_conn_id
should be changed to conn_id
.dazzling-judge-80093
09/13/2022, 11:41 AMsilly-oil-35180
09/13/2022, 11:44 AMdazzling-judge-80093
09/13/2022, 11:44 AMsilly-oil-35180
09/15/2022, 1:30 AM[2022-09-14, 18:48:30 ] {logging_mixin.py:109} INFO - Exception: Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_plugin.py", line 337, in custom_on_success_callback
datahub_on_success_callback(context)
File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_plugin.py", line 204, in datahub_on_success_callback
dataflow.emit(emitter)
File "/home/airflow/.local/lib/python3.8/site-packages/datahub/api/entities/datajob/dataflow.py", line 150, in emit
assert callback is not None
AssertionError
I guess if using Kafka to emit datajob, it is necessary to put callback
. (idk its role and meaning but i find this code)
https://github.com/datahub-project/datahub/blob/f99d27fd8cb3bf1a2906b731ebe9e0fcbe[…]metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py
However, I cannot find any emit code with callback
in datahub_airflow_plugin
(https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py).
I think I cannot use Kafka for Airflow Integration. Is it right..?dazzling-judge-80093
09/15/2022, 9:08 AM