square-activity-64562
07/19/2021, 7:40 AMdatahub-mce-consumer is unable to consume it
source:
type: postgres
config:
username: ${DB_USERNAME}
password: ${DB_PASSWORD}
host_port: ${DB_HOST}
database: ${DB_database}
table_pattern:
allow:
- "superset.public.logs"
schema_pattern:
deny:
- "information_schema"
sink:
type: "datahub-kafka"
config:
connection:
bootstrap: ${BOOTSTARP_URL}
producer_config:
security.protocol: sasl_ssl
sasl.mechanism: PLAIN
sasl.username: ${KAFKA_KEY_ID}
sasl.password: ${KAFKA_KEY_SECRET}
schema_registry_url: https://${SCHEMA_REGISTRY_URL}
schema_registry_config:
<http://basic.auth.user.info|basic.auth.user.info>: "${SCHEMA_REGISTRY_KEY_ID}:${SCHEMA_REGISTRY_KEY_PASSWORD}"square-activity-64562
07/19/2021, 7:41 AMdatahub-mce-consumer I get these logs
07:37:33.443 [mce-consumer-job-client-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: null
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition MetadataChangeEvent_v4-2 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro unknown schema for id 100002
Caused by: java.net.MalformedURLException: For input string: "REDACTED_HERE"
at java.net.URL.<init>(URL.java:645)
at java.net.URL.<init>(URL.java:508)
at java.net.URL.<init>(URL.java:457)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:257)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:659)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:641)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:273)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:97)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:76)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at java.net.URL.<init>(URL.java:508)
at java.net.URL.<init>(URL.java:457)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:257)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:351)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:659)
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:641)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:217)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:291)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:276)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:273)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:97)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:76)
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:55)
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:60)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1324)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3400(Fetcher.java:129)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1555)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1391)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:683)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:634)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1290)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:757)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:708)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NumberFormatException: For input string: "REDACTED_HERE"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at java.net.URLStreamHandler.parseURL(URLStreamHandler.java:222)
at java.net.URL.<init>(URL.java:640)
... 28 common frames omittedsquare-activity-64562
07/19/2021, 7:43 AMglobal:
graph_service_impl: elasticsearch
datahub_analytics_enabled: true
datahub_standalone_consumers_enabled: true
kafka:
bootstrap:
server: "{{ kafka_bootstrap_server_url }}"
schemaregistry:
url: "https://{{ kafka_schema_registry_key_id }}:{{ kafka_schema_registry_key_secret }}@{{ kafka_schema_registry_url }}"
In the error REDACTED_HERE is part of my schema registry secret which I have added via kafka_schema_registry_key_secret in helm
The schema registry credentials are correct. I have used curl example in confluent docs and I was able to successfully create/view/delete schema in confluent schema registry. https://docs.confluent.io/platform/current/schema-registry/develop/using.htmlsquare-activity-64562
07/19/2021, 7:45 AMdatahub-rest to datahub-kafka. This shows that the problem is with schema registry authentication being done in datahubsquare-activity-64562
07/19/2021, 7:47 AMdatahub-rest is failing silently when sending this information to kafka. This is against what is documented https://datahubproject.io/docs/metadata-ingestion/#datahub-rest-datahub-rest
The advantage of the rest-based interface is that any errors can immediately be reported.square-activity-64562
07/19/2021, 7:52 AMbig-carpet-38439
07/19/2021, 1:23 PMsquare-activity-64562
07/19/2021, 1:30 PMsquare-activity-64562
07/19/2021, 1:36 PMsquare-activity-64562
07/19/2021, 1:42 PMbig-carpet-38439
07/19/2021, 2:11 PMsquare-activity-64562
07/19/2021, 3:50 PMgms code which calls a function of the client library of kafka (or spring's wrapper of kafka).
e.g. https://github.com/linkedin/datahub/blob/master/metadata-dao-impl/kafka-producer/s[…]din/metadata/dao/producer/EntityKafkaMetadataEventProducer.java is where kafka API is finally used to send messages.
Where is the line that actually interacts with schema registry APIs (through kafka or spring's wrapper of kafka)?square-activity-64562
07/19/2021, 4:19 PMsquare-activity-64562
07/19/2021, 4:42 PMbig-carpet-38439
07/19/2021, 6:43 PMbig-carpet-38439
07/19/2021, 6:43 PMbig-carpet-38439
07/19/2021, 6:43 PMSPRING_KAFKA_PROPERTIES_SASL_MECHANISM=PLAINbig-carpet-38439
07/19/2021, 6:43 PMbig-carpet-38439
07/19/2021, 6:44 PMdocker/gms/env/docker.env as the source of truth for configuration