Hi all. I want to make airflow integration using ‘...
# troubleshoot
s
Hi all. I want to make airflow integration using ‘kafka’(
DataHub Kafka Sink
). So I set up airflow connections like this.
Copy code
$ airflow connections add  --conn-type 'datahub_kafka' 'datahub_rest_default' --conn-host 'broker:9092' --conn-extra '{}'
because i didn’t modify
datahub.datahub_conn_id
in
airflow.cfg
, i am using
datahub_rest_default
as connection name. When I trigger DAG, I get this error log.
Copy code
[2022-09-08, 11:49:28 ] {log.py:232} WARNING - 2022-09-08, 11:49:28  WARNING Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
[2022-09-08, 11:49:28 ] {connectionpool.py:810} WARNING - Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
[2022-09-08, 11:49:33 ] {log.py:232} WARNING - 2022-09-08, 11:49:33  WARNING Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
[2022-09-08, 11:49:33 ] {connectionpool.py:810} WARNING - Retrying (Retry(total=1, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
I don’t know why
Connection reset by peer
happens.. Anyone who has exp on kafka sink..?
Copy code
[2022-09-08, 13:31:34 ] {log.py:232} WARNING - 2022-09-08, 13:31:34  WARNING Retrying (Retry(total=2, connect=None, read=None, redirect=None, status=None)) after connection broken by 'ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer'))': /aspects?action=ingestProposal
...
datahub.configuration.common.OperationalError: ('Unable to emit metadata to DataHub GMS', {'message': "HTTPConnectionPool(host='broker', port=9092): Max retries exceeded with url: /aspects?action=ingestProposal (Caused by ProtocolError('Connection aborted.', ConnectionResetError(104, 'Connection reset by peer')))"})
I think
datahub_kafka
connection uses DataHub GMS REST API, not Kafka.
d
@silly-oil-35180 please, can you rename the connection name from
datahub_rest_default
to something which doesn’t contain the string
rest
. If
rest
is in the connection name then we assume it is a rest connection -> https://github.com/datahub-project/datahub/blob/f99d27fd8cb3bf1a2906b731ebe9e0fcbe[…]c69c47/metadata-ingestion/src/datahub_provider/hooks/datahub.py
s
@dazzling-judge-80093 Thank you!! I have another question about
airflow.cfg
. I add 2 lines in
airflow.cfg
to change datahub config.
Copy code
[datahub]
cluster = PROD
datahub_conn_id = datahub_kafka_default
However, when I run airflow dag, Airflow doesn’t get
datahub_conn_id
which I changed(
datahub_kafka_default
). It still finds
datahub_rest_default
conn_id. Am I wrong to add config to
airflow.cfg
?
d
ahh, I think it should be:
Copy code
[datahub]
cluster = PROD
conn_id = datahub_kafka_default
s
OMG! I followed guide from https://datahubproject.io/docs/lineage/airflow/#using-datahubs-airflow-lineage-plugin-new.
datahub_conn_id
should be changed to
conn_id
.
d
ahh, sorry about that, we need to update the doc
s
@dazzling-judge-80093 Thank you for answering!! Have a good day!
d
I’m glad it works now
s
@dazzling-judge-80093 Sorry to bother you. I have another problem 😞. Now Airflow find DataHub Kafka Emitter, and it looks like trying to make some request(send data). However, I get this error.
Copy code
[2022-09-14, 18:48:30 ] {logging_mixin.py:109} INFO - Exception: Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_plugin.py", line 337, in custom_on_success_callback
    datahub_on_success_callback(context)
  File "/home/airflow/.local/lib/python3.8/site-packages/datahub_airflow_plugin/datahub_plugin.py", line 204, in datahub_on_success_callback
    dataflow.emit(emitter)
  File "/home/airflow/.local/lib/python3.8/site-packages/datahub/api/entities/datajob/dataflow.py", line 150, in emit
    assert callback is not None
AssertionError
I guess if using Kafka to emit datajob, it is necessary to put
callback
. (idk its role and meaning but i find this code) https://github.com/datahub-project/datahub/blob/f99d27fd8cb3bf1a2906b731ebe9e0fcbe[…]metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py However, I cannot find any emit code with
callback
in
datahub_airflow_plugin
(https://github.com/datahub-project/datahub/blob/master/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py). I think I cannot use Kafka for Airflow Integration. Is it right..?
d
I think you are right 😞