Facing issues with gms access to kafka in confluen...
# all-things-deployment
s
Facing issues with gms access to kafka in confluent cloud
Copy code
Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:894) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:553) at org.springframework.web.context.ContextLoader.configureAndRefreshWebApplicationContext(ContextLoader.java:401) at org.springframework.web.context.ContextLoader.initWebApplicationContext(ContextLoader.java:292) at org.springframework.web.context.ContextLoaderListener.contextInitialized(ContextLoaderListener.java:103) at org.eclipse.jetty.server.handler.ContextHandler.callContextInitialized(ContextHandler.java:921) at org.eclipse.jetty.servlet.ServletContextHandler.callContextInitialized(ServletContextHandler.java:554) at org.eclipse.jetty.server.handler.ContextHandler.startContext(ContextHandler.java:888) at org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:357) at org.eclipse.jetty.webapp.WebAppContext.startWebapp(WebAppContext.java:1443) at org.eclipse.jetty.webapp.WebAppContext.startContext(WebAppContext.java:1407) at org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:821) at org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:276) at org.eclipse.jetty.webapp.WebAppContext.doStart(WebAppContext.java:524) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:72) at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:169) at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:117) at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:106) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:72) at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:169) at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:117) at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:106) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:72) at org.eclipse.jetty.util.component.ContainerLifeCycle.start(ContainerLifeCycle.java:169) at org.eclipse.jetty.server.Server.start(Server.java:407) at org.eclipse.jetty.util.component.ContainerLifeCycle.doStart(ContainerLifeCycle.java:110) at org.eclipse.jetty.server.handler.AbstractHandler.doStart(AbstractHandler.java:106) at org.eclipse.jetty.server.Server.doStart(Server.java:371) at org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:72) at org.eclipse.jetty.runner.Runner.run(Runner.java:520) at org.eclipse.jetty.runner.Runner.main(Runner.java:565) Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
My configuration for kafka is like this. Is there some way to debug this?
Copy code
kafka:
    bootstrap:
      server: "KEY_ID:KEY_SECRET@CLUSTER_ID.gcp.confluent.cloud:9092"
    schemaregistry:
      url: "KEY_ID:KEY_SECRET@CLUSTER_ID.gcp.confluent.cloud"
b
And you're sure the URL is pointing to correct path of schemaregistry?
s
yes. I did a text search to ensure there is no typo and it matches exactly.
some other stacktraces and errors. The error
Bootstrap broker xxx disconnected
happens a lot in beginning
Copy code
13:24:06.042 [main] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-mce-consumer-job-client-1, groupId=mce-consumer-job-client] Bootstrap broker IDENTIFIER.gcp.confluent.cloud:9092 (id: -1 rack: null) disconnected
13:24:06.209 [main] WARN o.s.w.c.s.XmlWebApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
13:24:06.211 [main] INFO o.a.k.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
This is happening for health checks
Copy code
javax.servlet.ServletException: javax.servlet.UnavailableException: Servlet Not Initialized at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:162) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.eclipse.jetty.server.Server.handle(Server.java:494) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129) at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918) at java.lang.Thread.run(Thread.java:748) Caused by:
javax.servlet.UnavailableException: Servlet Not Initialized at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:822) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:544) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143) at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:536) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1581) at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1307) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:482) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1549) at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1204) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221) at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127) at org.eclipse.jetty.server.Server.handle(Server.java:494) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:374) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:268) at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311) at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103) at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171) at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129) at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:367) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:782) at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:918) at java.lang.Thread.run(Thread.java:748)
b
let's try to meet about this 🙂
s
I was able to produce and consume messages using the credentials so credentials seems to be correct. Using commands from this https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/python.html
Copy code
~/gogo/tmp/examples/clients/cloud/python latest*                                                                                                                                                         11:37:11 AM
❯ ./producer.py -f /home/aseem/gogo/tmp/tmp/kafka.config -t test
Producing record: alice {"count": 0}
Producing record: alice {"count": 1}
Producing record: alice {"count": 2}
Producing record: alice {"count": 3}
Producing record: alice {"count": 4}
Producing record: alice {"count": 5}
Producing record: alice {"count": 6}
Producing record: alice {"count": 7}
Producing record: alice {"count": 8}
Producing record: alice {"count": 9}
Produced record to topic test partition [0] @ offset 0
Produced record to topic test partition [0] @ offset 1
Produced record to topic test partition [0] @ offset 2
Produced record to topic test partition [0] @ offset 3
Produced record to topic test partition [0] @ offset 4
Produced record to topic test partition [0] @ offset 5
Produced record to topic test partition [0] @ offset 6
Produced record to topic test partition [0] @ offset 7
Produced record to topic test partition [0] @ offset 8
Produced record to topic test partition [0] @ offset 9
10 messages were produced to topic test!
~/gogo/tmp/examples/clients/cloud/python latest*                                                                                                                                                         11:37:47 AM
❯ ./consumer.py -f /home/aseem/gogo/tmp/tmp/kafka.config -t test
Waiting for message or event/error in poll()
Waiting for message or event/error in poll()
Waiting for message or event/error in poll()
Consumed record with key b'alice' and value b'{"count": 0}',                       and updated total count to 0
Consumed record with key b'alice' and value b'{"count": 1}',                       and updated total count to 1
Consumed record with key b'alice' and value b'{"count": 2}',                       and updated total count to 3
Consumed record with key b'alice' and value b'{"count": 3}',                       and updated total count to 6
Consumed record with key b'alice' and value b'{"count": 4}',                       and updated total count to 10
Consumed record with key b'alice' and value b'{"count": 5}',                       and updated total count to 15
Consumed record with key b'alice' and value b'{"count": 6}',                       and updated total count to 21
Consumed record with key b'alice' and value b'{"count": 7}',                       and updated total count to 28
Consumed record with key b'alice' and value b'{"count": 8}',                       and updated total count to 36
Consumed record with key b'alice' and value b'{"count": 9}',                       and updated total count to 45
Waiting for message or event/error in poll()
Waiting for message or event/error in poll()
Waiting for message or event/error in poll()
Am I supposed to be using this for authentication
Copy code
kafka:
    bootstrap:
      server: "KEY_ID:KEY_SECRET@CLUSTER_ID.gcp.confluent.cloud:9092"
    schemaregistry:
      url: "KEY_ID:KEY_SECRET@CLUSTER_ID.gcp.confluent.cloud"
or am I supposed to be using
springKafkaConfigurationOverrides
https://github.com/acryldata/datahub-helm/blob/master/charts/datahub/values.yaml#L124? Found this https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java-springboot.html which contains "Template configuration file for Confluent Cloud" Trying to figure out if can pass these somehow
Seems the basic auth properties (Required connection configs for Confluent Cloud Schema Registry) do not have a corresponding property in configuration https://github.com/linkedin/datahub/blob/master/datahub-frontend/conf/application.conf#L151 Let me try with just to see if kafka authentication works
Copy code
springKafkaConfigurationOverrides:
  security.protocol: "SASL_SSL"
  sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username='KEY_ID' password='KEY_SECRET'"
  sasl.mechanism: "PLAIN"
Any idea what I might be doing wrong here? This is datahub-gms on K8s
Copy code
07:15:50.619 [kafka-producer-network-thread | producer-1] ERROR o.a.kafka.common.utils.KafkaThread - Uncaught exception in thread 'kafka-producer-network-thread | producer-1':
java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:113) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:447) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:678) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:580) at org.apache.kafka.common.network.Selector.poll(Selector.java:485) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:550) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:324) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at java.lang.Thread.run(Thread.java:748)
I have passed JAVA_OPTS. Not sure what is causing it to fail. The defaults is 256m. And I have not done anything on the topics yet. So not sure what is consuming all the memory.
Copy code
- name: JAVA_OPTS
  value: -Xms1024m -Xmx1024m
b
The configuration you are referencing above pertains only to DataHub frontend. (not GMS) I do agree that the values.yaml seem to be missing a few config options, namely "sasl.jaas.config" and "sasl.mechanism". Hey @early-lamp-41924 have we need anything like this before?
s
You may want to check other places too. e.g. I read the code for kafka setup job and it is configured to set these up for SSL only not SASL_SSL which is required for kafka authentication using basic auth
and until a few days ago there was an error in frontend too. I had to disable
datahub_analytics_enabled
in global properties for that error to go away. Not sure if that has been fixed now
b
Was it a kafka error?
We will need to do an audit, I think you are the first person whose come using SASL
s
I think yes it was a kafka error. Is there some other way to connect to confluent cloud? I am not tied to SASL. Just want to connect to confluent cloud to get this to start working
b
If you can configure SSL auth, that may be an easier approach for now
i.e. if confluent allows no SASL
s
Will check. AFAIK it uses SASL only. But I am new so might have missed something.
b
okay then we will likely need to make fixes to the helm charts here
Let me consult with my colleagues
s
helm charts and probably add code to make changes for the new configs. I guess I will have to wait at least a week or probably more for the next release if this makes it in. Please let me know if you need any other information.
e
The above configuration should work for gms and frontend. Do you see the settings being set? Kafka configs are printed in the logs
Just to understand the input more, for
Copy code
sasl.jaas.config
are you actually putting the values in the chart instead of templating (as shown in the doc you shared)? Then it should work (at least for gms)
Let’s check the kafka producer configs shown in the logs and check whether it’s being set or not. This will let us see if it’s datahub code not picking up the config or the configs not working.
We can also get on a call when you’re free to identify the fix!!
s
@early-lamp-41924 what timezone are you in? As per slack it seems San Francisco. I am in India. Can we talk today your morning before 11 AM?
e
Sounds good
What about 9:30AM PST?
s
sounds good. will DM for meeting invite
a
@square-activity-64562 Hi! I am an intern at Freenome doing a PoC on Datahub. We’re also thinking about using SaaS on GCP for all the dependencies. Would you be willing to share if you guys were able to switch to Kafka on Confluent Cloud and how that’s going? Thanks!
s
the confluent doc in datahub docs worked for me
thankyou 1