Hi everyone! Trying to configure airflow lineage b...
# ingestion
d
Hi everyone! Trying to configure airflow lineage backend to use Datahub Kafka Sink connection. If configured with extra parameters to point to schema_registry_url, I receive this error:
Copy code
[2022-03-16, 12:39:38 UTC] {base.py:79} INFO - Using connection to: id: datahub_kafka_default. Host: prerequisites-kafka.datahub-prereqs-prod.svc.cluster.local:9092, Port: None, Schema: , Login: ***, Password: ***, extra: {'schema_registry_url': '<http://prerequisites-cp-schema-registry.datahub-prereqs-prod.svc.cluster.local:8081>'}
[2022-03-16, 12:39:38 UTC] {base.py:79} INFO - Using connection to: id: datahub_kafka_default. Host: prerequisites-kafka.datahub-prereqs-prod.svc.cluster.local:9092, Port: None, Schema: , Login: ***, Password: ***, extra: {'schema_registry_url': '<http://prerequisites-cp-schema-registry.datahub-prereqs-prod.svc.cluster.local:8081>'}
[2022-03-16, 12:39:38 UTC] {datahub.py:122} ERROR - 1 validation error for KafkaSinkConfig
schema_registry_url
  extra fields not permitted (type=value_error.extra)
And without extra this error:
Copy code
[2022-03-16, 12:58:42 UTC] {base.py:79} INFO - Using connection to: id: datahub_kafka_default. Host: prerequisites-kafka.datahub-prereqs-prod.svc.cluster.local:9092, Port: None, Schema: , Login: ***, Password: ***, extra: {}
[2022-03-16, 12:58:42 UTC] {base.py:79} INFO - Using connection to: id: datahub_kafka_default. Host: prerequisites-kafka.datahub-prereqs-prod.svc.cluster.local:9092, Port: None, Schema: , Login: ***, Password: ***, extra: {}
[2022-03-16, 12:58:42 UTC] {datahub.py:122} ERROR - KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="HTTPConnectionPool(host='localhost', port=8081): Max retries exceeded with url: /subjects/MetadataChangeEvent_v4-value/versions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff97e544a50>: Failed to establish a new connection: [Errno 111] Connection refused'))"}
[2022-03-16, 12:58:42 UTC] {datahub.py:123} INFO - Supressing error because graceful_exceptions is set
So, how is the proper way to configure it?
s
What version of DataHub python package are you installing in airflow?
d
@square-activity-64562
0.8.29
s
Where exactly are you doing that configuration? Can you share the exact command that you ran for setting up the connection (masking secret if any)? Which document were you referring to when setting that connection?
d
The connection config
Configured based on the document https://datahubproject.io/docs/lineage/airflow
When set the schema_registry_url
s
I cannot find
schema_registry_url
in that document. I am not sure whether that is supported in airflow lineage or not. @dazzling-judge-80093 any ideas whether
schema_registry_url
is supported in airflow package?
d
schema_registry_url
was my attempt to pass a schema registration url to config because I saw that the
DatahubKafkaHook
class uses
datahub.ingestion.sink.datahub_kafka.KafkaSinkConfig.parse_obj(obj)
to parse configs. I probably got it wrong, sorry.
s
Got it. Nothing to be sorry about. I can see how that might be confusing.
d
It seems that the default value is
<http://localhost:8081>
in class
datahub.configuration.kafka._KafkaConnectionConfig
. But I'm not sure