I used this ingestion file and was able to get th...
# ingestion
s
I used this ingestion file and was able to get the data into kafka. It shows up in confluent cloud but datahub's
datahub-mce-consumer
is unable to consume it
Copy code
source:
  type: postgres
  config:
    username: ${DB_USERNAME}
    password: ${DB_PASSWORD}
    host_port: ${DB_HOST}
    database: ${DB_database}
    table_pattern:
      allow:
        - "superset.public.logs"

    schema_pattern:
      deny:
        - "information_schema"

sink:
  type: "datahub-kafka"
  config:
    connection:
      bootstrap: ${BOOTSTARP_URL}
      producer_config:
        security.protocol: sasl_ssl
        sasl.mechanism: PLAIN
        sasl.username: ${KAFKA_KEY_ID}
        sasl.password: ${KAFKA_KEY_SECRET}
      schema_registry_url: https://${SCHEMA_REGISTRY_URL}
      schema_registry_config:
        <http://basic.auth.user.info|basic.auth.user.info>: "${SCHEMA_REGISTRY_KEY_ID}:${SCHEMA_REGISTRY_KEY_PASSWORD}"
In
datahub-mce-consumer
I get these logs
Copy code
07:37:33.443 [mce-consumer-job-client-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeEvent_v4-2 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 100002
Caused by: java.net.MalformedURLException: For input string: "REDACTED_HERE"
	at java.net.URL.<init>(URL.java:645)
	at java.net.URL.<init>(URL.java:508)
	at java.net.URL.<init>(URL.java:457)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:257)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:659)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:641)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:273)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:97)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:76)
	at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
	at java.net.URL.<init>(URL.java:508)
	at java.net.URL.<init>(URL.java:457)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:257)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:659)
	at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:641)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
	at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:273)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:97)
	at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:76)
	at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
	at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
	at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
	at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:757)
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:708)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "REDACTED_HERE"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
	at java.lang.Integer.parseInt(Integer.java:580)
	at java.lang.Integer.parseInt(Integer.java:615)
	at java.net.URLStreamHandler.parseURL(URLStreamHandler.java:222)
	at java.net.URL.<init>(URL.java:640)
	... 28 common frames omitted
My helm chart values has this for connecting to schema registry
Copy code
global:
  graph_service_impl: elasticsearch
  datahub_analytics_enabled: true
  datahub_standalone_consumers_enabled: true

  kafka:
    bootstrap:
      server: "{{ kafka_bootstrap_server_url }}"
    schemaregistry:
      url: "https://{{ kafka_schema_registry_key_id }}:{{ kafka_schema_registry_key_secret }}@{{ kafka_schema_registry_url }}"
In the error
REDACTED_HERE
is part of my schema registry secret which I have added via
kafka_schema_registry_key_secret
in helm The schema registry credentials are correct. I have used curl example in confluent docs and I was able to successfully create/view/delete schema in confluent schema registry. https://docs.confluent.io/platform/current/schema-registry/develop/using.html
@big-carpet-38439 In continuation of https://datahubspace.slack.com/archives/CUMUWQU66/p1626457105105400?thread_ts=1626386375.078200&amp;cid=CUMUWQU66 I am finally able to get an error when I switched my ingestion from using
datahub-rest
to
datahub-kafka
. This shows that the problem is with schema registry authentication being done in datahub
It also shows that
datahub-rest
is failing silently when sending this information to kafka. This is against what is documented https://datahubproject.io/docs/metadata-ingestion/#datahub-rest-datahub-rest
Copy code
The advantage of the rest-based interface is that any errors can immediately be reported.
1 message sent and not able to consume so 1 offset behind
b
Got it - Yes you're right. I'm not sure why using the REST api vs Kafka api is showing different errors, as in both cases we need to connect to Kafka. So, it seems that the schema registry client thinks that the URL you've provided in config is malformed it seems, should we consult the Kafka consumer documentation for further triaging? We've not seen this before
s
can you point me in https://github.com/linkedin/datahub/tree/master/gms where exactly it is sending final requests to schema registry? I have been unable to find that so unable to figure out what might be going wrong here.
e.g. config for frontend is at https://github.com/linkedin/datahub/blob/master/datahub-frontend/conf/application.conf. where is the config for gms?
b
Its inside the Kafka producer class, which is pulled from the kafka client library!
s
@big-carpet-38439 I think you misunderstood what I am asking for. I understand that datahub is not using the API directly. There has to be some line in
gms
code which calls a function of the client library of kafka (or spring's wrapper of kafka). e.g. https://github.com/linkedin/datahub/blob/master/metadata-dao-impl/kafka-producer/s[…]din/metadata/dao/producer/EntityKafkaMetadataEventProducer.java is where kafka API is finally used to send messages. Where is the line that actually interacts with schema registry APIs (through kafka or spring's wrapper of kafka)?
e.g. in case of ingestion this is where schema registry API is being used https://github.com/linkedin/datahub/blob/master/metadata-ingestion/src/datahub/emitter/kafka_emitter.py#L32 In case of gms where is the code that is interacting with confluent schema registry?
Seems like this is the starting point of ingestion https://github.com/linkedin/datahub/blob/master/gms/impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java#L129. Will try to figure out what is the problem
b
Hi Aseem. So by default, you can provide overrides for all Kafka producer and consumer configurations for those consumers that are constructed by the Spring framework (which are everything in GMS and the standalone containers). The way to do this is to use the SPRING_KAFKA_PROPERTIES_ prefix to any of your kafka configurations
so for example
Copy code
SPRING_KAFKA_PROPERTIES_SASL_MECHANISM=PLAIN
And these must be provided as environment variables to GMS on boot up
So there is no single properties file for GMS currently 😞 Instead, we use the docker.env file under
docker/gms/env/docker.env
as the source of truth for configuration