millions-soccer-98440
09/15/2021, 8:56 AMfrom datahub.ingestion.run.pipeline import Pipeline
def ingest_metadata(**kwargs):
"""
:param ingest_param: source & sink datahub param
:type ingest_param: json/struct
"""
ingest_param = kwargs.get('ingest_param')
pipeline = Pipeline.create(ingest_param)
pipeline.run()
pipeline.raise_from_status()
kafka_connect = {
"source": {
"type": "kafka-connect",
"config": {
"connect_uri": "<http://127.0.0.1:8083>",
"cluster_name": "ts-connect",
},
},
"sink": {
"type": "datahub-kafka",
"config": {
"connection": {
"bootstrap": "127.0.0.1:19092",
"schema_registry_url": "<http://127.0.0.1:17081>"
}
},
},
}
ingest_metadata(ingest_param=kafka_connect)
this error after run code
Skipping connector saleordering-postcodes. Sink Connector not yet implemented
Skipping connector thestreet-image-receipts. Sink Connector not yet implemented
Traceback (most recent call last):
File "kafkaconnect.py", line 36, in <module>
ingest_metadata(ingest_param=kafka_connect)
File "kafkaconnect.py", line 14, in ingest_metadata
pipeline.run()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/datahub/ingestion/run/pipeline.py", line 108, in run
for wu in self.source.get_workunits():
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/datahub/ingestion/source/kafka_connect.py", line 468, in get_workunits
connectors_manifest = self.get_connectors_manifest()
File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/datahub/ingestion/source/kafka_connect.py", line 308, in get_connectors_manifest
connector_manifest.topic_names = topics[c]["topics"]
KeyError: 'sales-ordering-prod-v5'
mammoth-bear-12532
witty-state-99511
09/15/2021, 2:18 PMwitty-state-99511
09/15/2021, 2:27 PMwitty-state-99511
09/15/2021, 2:28 PMwitty-state-99511
09/15/2021, 2:32 PMmillions-soccer-98440
09/15/2021, 2:56 PMmillions-soccer-98440
09/16/2021, 3:06 AM