Has anyone configured realtime Kafka ingest with S...
# general
Has anyone configured realtime Kafka ingest with SASL / jaas auth (as in how Confluent handles auth for their managed clusters)?
@Elon ^^
You can pass all the properties to underlying kafka consumer by putting them in streamCOnfigs
@Kishore G Thanks! How about using basic auth for schema registry? Same mechanism?
@Will Briggs looks like this is not supported and as you pointed out there is an issue already. We will add this asap
We use basic auth. In terms of confluent properties you can do something like this:
🙏 1
Copy code
"streamType": "kafka",
        "stream.kafka.consumer.type": "LowLevel",
        "stream.kafka.topic.name": "XXX",
        "stream.kafka.broker.list": "kafka:9092",
        "realtime.segment.flush.threshold.time": "6h",
        "realtime.segment.flush.threshold.size": "0",
        "realtime.segment.flush.desired.size": "200M",
        "stream.kafka.consumer.prop.auto.isolation.level": "read_committed",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.prop.group.id": "XXX",
        "stream.kafka.consumer.prop.client.id": "XXX",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<http://schema-registry:8081>"
This is to use the avro confluent message decoder, is that what you wanted?
yes, where are you configuring the basic auth?
for the schema registry? I can’t find anything in the docs, and the code looks like it’s not passing those properties through to the
that you instantiate in the
but it would certainly make my life easier if it were already supported somehow
It adds them as properties to the consumer.
The kafka consumer interacts with the schema registry
We do not use the auth parameters, but we do use schema registry and it's been working for us.
Does that help?
Kind of? I’m a little confused, to be honest. You said you are using basic auth, but then you said you aren’t using auth parameters for the schema registry - so I’m trying to figure out how you are authenticating to it. It’s a little unclear.
Maybe I misunderstood your question?
What I would expect to be able to do would be something like this:
Copy code
"stream.kafka.decoder.prop.schema.registry.rest.url": "https://<schema-registry-url>.confluent.cloud",
          "stream.kafka.decoder.prop.schema.registry.basic.auth.credentials.source": "USER_INFO",
          "<http://stream.kafka.decoder.prop.schema.registry.basic.auth.user.info|stream.kafka.decoder.prop.schema.registry.basic.auth.user.info>": "schema_registry_basic_auth_username:schema_registry_basic_auth_pw",
I was showing how we pass the schema registry and kafka low level consumer class stream configs
yes, that part makes sense - when you said “We use basic auth”, I think it threw me, because I didn’t see that anywhere in the provided example
Ah, by basic I meant "no auth" 🙂
s/basic/no/ 🙂
So those params do not work for you?
They don’t, because our schema registry is secured and requires credentials
I believe that if you simply allowed properties under the
to flow into the config for the CachedSchemaRegistryClient (instead of only extracting the SSL properties, and tossing the rest), it would properly configure the underlying RestService - the logic is already there in the alternate constructors for CachedSchemaRegistryClient
👍 1
Sounds reasonable. I would create a github issue, someone can contribute a fix for that.
There’s an issue, and I’m testing a (hacky) fix locally to see if that actually works
👍 1
nice, good luck!
I didn’t create it, but it’s the same problem I’m hitting