Hi Everyone - We have Datahub deployed on EKS clu...
# ingestion
h
Hi Everyone - We have Datahub deployed on EKS cluster. We are able to use Airflow to do pull based ingestions. We would like to do push based ingestions via Kafka. How do we achieve this and what configurations need to be used ?
Copy code
sink:
  type: "datahub-kafka"
  config:
    connection:
      bootstrap: localhost:9092
      schema_registry_url: <http://localhost:8081>
I believe we need to point schema registry to something else than above ?
Copy code
kafka:
  bootstrap:
    server: "<bootstrap server>"
  zookeeper:
    server: "<zookeeper server>"
  schemaregistry:
    url: "<http://prerequisites-cp-schema-registry:8081>"
e
h
@early-lamp-41924 - Quick question here - the Schema registry , since it is deployed on a EKS , might not be accessible to Airflow.
e
In that case, you need to add an ingress on the schema registry to expose schema registry the same way you exposed frontend/gms!
I don’t think kafka allows you to produce events into an avro topic without access to schema registry.
h
This is error I am getting -
Copy code
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="HTTPConnectionPool(host='prerequisites-cp-schema-registry', port=8081): Max retries exceeded with url: /subjects/MetadataChangeEvent_v4-value/versions (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff68649a610>: Failed to establish a new connection: [Errno -2] Name or service not known'))"}
[2022-02-22 21:48:18,978] {taskinstance.py:1532} INFO - Marking task as UP_FOR_RETRY. dag_id=pull_metadata_ingestion_dag, task_id=ingest_metadata, execution_date=20220222T214817, start_date=20220222T214818, end_date=20220222T214818
[2022-02-22 21:48:19,015] {local_task_job.py:146} INFO - Task exited with return code 1
e
Did you expose the schema registry url?
h
Not yet. this is with Schema registry not yet exposed.
e
Yeah. it will not have any idea what that host name means.
Try exposing it and see how it goes! seems like they don’t support ingress out of the box. So you will have to add ingress manually by creating an ingress.yaml
h
@early-lamp-41924 - Will try to, and update once I get some results or will reach out in case in am stuck.
@early-lamp-41924 - Which component would be the right place to make the above said configurations ? I think datahub-upgrade would be right place to have it, but it seems to have a different structure compared to other components
e
which configurations?
h
Exposing schema registry via ingress
e
I don’t think that should be part of anything
since it should ideally be part of the schema registry chart
but unfortunately they don’t support it natively
so you would have to just add the ingress yourself
h
ok.
Any alternative to cp-schema registry which can be used by kafka ingestion, without exposing ingress.
e
unfortunately there is none 😞
h
@early-lamp-41924 - I have set up the ingress for the schemaregistry in the prerequisites. When I try to execute the DAG to ingest metadata via Kafka I get this-
Copy code
File "/home/ec2-user/.venvs/airflow/lib64/python3.7/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce
    raise ValueSerializationError(se)
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Unknown Schema Registry Error: b'<html>\r\n<head><title>502 Bad Gateway</title></head>\r\n<body>\r\n<center><h1>502 Bad Gateway</h1></center>\r\n</body>\r\n</html>\r\n' (HTTP status code 502, SR code -1)"}
[2022-03-01 22:29:23,304] {taskinstance.py:1532} INFO - Marking task as FAILED. dag_id=pull_metadata_ingestion_dag, task_id=ingest_metadata, execution_date=20220301T222420, start_date=20220301T222922, end_date=20220301T222923
[2022-03-01 22:29:23,360] {local_task_job.py:146} INFO - Task exited with return code 1
e
Can you access the exposed schema registry url from your local?
h
Hmm, Getting 502 Bad Gateway from local as well.
Ingress seems ok -
Copy code
# Source: prerequisites/templates/ingress.yaml
apiVersion: <http://networking.k8s.io/v1beta1|networking.k8s.io/v1beta1>
kind: Ingress
metadata:
 name: prerequisites-cp-schema-registry
 labels:
  <http://helm.sh/chart|helm.sh/chart>: prerequisites-0.0.4
  <http://app.kubernetes.io/name|app.kubernetes.io/name>: prerequisites
  <http://app.kubernetes.io/instance|app.kubernetes.io/instance>: prerequisites
  <http://app.kubernetes.io/version|app.kubernetes.io/version>: "0.3.1"
  <http://app.kubernetes.io/managed-by|app.kubernetes.io/managed-by>: Helm
 annotations:
  <http://alb.ingress.kubernetes.io/actions.ssl-redirect|alb.ingress.kubernetes.io/actions.ssl-redirect>: '{"Type": "redirect", "RedirectConfig":
   { "Protocol": "HTTPS", "Port": "443", "StatusCode": "HTTP_301"}}'
  <http://alb.ingress.kubernetes.io/backend-protocol|alb.ingress.kubernetes.io/backend-protocol>: HTTP
  <http://alb.ingress.kubernetes.io/certificate-arn|alb.ingress.kubernetes.io/certificate-arn>: certarn
  <http://alb.ingress.kubernetes.io/inbound-cidrs|alb.ingress.kubernetes.io/inbound-cidrs>: 0.0.0.0/0
  <http://alb.ingress.kubernetes.io/listen-ports|alb.ingress.kubernetes.io/listen-ports>: '[{"HTTP": 80}, {"HTTPS":443}]'
  <http://alb.ingress.kubernetes.io/scheme|alb.ingress.kubernetes.io/scheme>: internal
  <http://alb.ingress.kubernetes.io/subnets|alb.ingress.kubernetes.io/subnets>: subnet-abc
  <http://alb.ingress.kubernetes.io/target-type|alb.ingress.kubernetes.io/target-type>: instance
  <http://kubernetes.io/ingress.class|kubernetes.io/ingress.class>: alb
spec:
 rules:
  - host: "hostname"
   http:
    paths:
     - path: /*
      backend:
       serviceName: ssl-redirect
       servicePort: use-annotation
     - path: /*
      backend:
       serviceName: prerequisites-cp-schema-registry
       servicePort: 8081
@early-lamp-41924 - Any suggestions on the above issue? Have created a Feature request as well for the same - https://feature-requests.datahubproject.io/b/feedback/p/enable-kafka-based-ingestion-for-k8-deployments
e
Can you post results of
Copy code
kubectl get ingress -n <<namespace>>
since its throwing 502, there is def something wrong with ingress and the controller is failing to create the load balancer
h
@early-lamp-41924 Results of kubectl get ingress -n <<namespace>>
Yes agree definitely some issue. though not sure about the creation of load balancer
e
so you do see the address section
for schema registry ingress?
how are you managing access to the internal load balancer?
are your other ingresses also internal?
h
Yes Dexter, all the ingresses are internal
created for GMS, frontend and schema registry. Access to Internal load balancer is via Route 53
@early-lamp-41924 - Let me know if you have some time to go over the issue.
@early-lamp-41924 - Any suggestions / someone with whom I can connect on this ? The svc and ingress look fine. But unable to access -
e
I don’t have much idea here. To me, it looks correct, and this is just vanilla schema registry chart from confluent, so hard to do further debugging. I would suggest you get some assistance from someone in the cloud infra team on this.
h
No problem. Ingress setup done by packing our version of cp-helm charts with ingress and service configurations and using that for the Helm deployment.