Hi everyone, Trying to Ingest with Kafka as the Si...
# ingestion
h
Hi everyone, Trying to Ingest with Kafka as the Sink, Getting the following -
Copy code
[2022-03-15 15:59:16,848] {logging_mixin.py:104} INFO -  Pipeline config is {'source': {'type': 'glue', 'config': {'env': 'PROD', 'aws_region': 'us-east-1', 'extract_transforms': 'false', 'table_pattern': {'allow': ['testdb.*'], 'ignoreCase': 'false'}}}, 'transformers': [{'type': 'simple_remove_dataset_ownership', 'config': {}}, {'type': 'simple_add_dataset_ownership', 'config': {'owner_urns': ['urn:li:corpuser:user1']}}, {'type': 'set_dataset_browse_path', 'config': {'path_templates': ['/Platform/PLATFORM/DATASET_PARTS']}}], 'sink': {'type': 'datahub-kafka', 'config': {'connection': {'bootstrap': 'bootstrapserver:9092', 'schema_registry_url': '<https://schemaregistryurl>'}}}}
[2022-03-15 16:05:46,022] {pipeline.py:85} ERROR - failed to write record with workunit testdb.person_era with KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"} and info {'error': KafkaError{code=_MSG_TIMED_OUT,val=-192,str="Local: Message timed out"}, 'msg': <cimpl.Message object at 0x7f0863603560>}
[2022-03-15 16:05:46,078] {taskinstance.py:1482} ERROR - Task failed with exception
Traceback (most recent call last):
e
Hey! Seems like its a timeout talking to kafka broker. Can you make sure your airflow instance has access to the broker?
If you could ssh into the airflow instance, easiest would be to just simply curl the address and see what you get
h
Dexter, I did ssh into Airflow Instance, I was able to curl to schema registry. Let me check if we are to connect to the kafka broker from Airflow.
@early-lamp-41924 - Getting this -
Copy code
botocore.exceptions.ClientError: An error occurred (ServiceUnavailableException) when calling the GetTables operation (reached max retries: 4): 
[2022-03-21 17:54:13,625] {taskinstance.py:1532} INFO - Marking task as UP_FOR_RETRY. dag_id=metadata_push_ingestion_dag, task_id=ingest_metadata, execution_date=20220321T175240, start_date=20220321T175242, end_date=20220321T175413
[2022-03-21 17:54:13,676] {local_task_job.py:146} INFO - Task exited with return code 1