Has anyone configured realtime Kafka ingest with S...
# general
w
Has anyone configured realtime Kafka ingest with SASL / jaas auth (as in how Confluent handles auth for their managed clusters)?
@Elon ^^
You can pass all the properties to underlying kafka consumer by putting them in streamCOnfigs
w
@Kishore G Thanks! How about using basic auth for schema registry? Same mechanism?
k
@Will Briggs looks like this is not supported and as you pointed out there is an issue already. We will add this asap
w
Thanks!
e
We use basic auth. In terms of confluent properties you can do something like this:
🙏 1
Copy code
"streamType": "kafka",
        "stream.kafka.consumer.type": "LowLevel",
        "stream.kafka.topic.name": "XXX",
        "stream.kafka.broker.list": "kafka:9092",
        "realtime.segment.flush.threshold.time": "6h",
        "realtime.segment.flush.threshold.size": "0",
        "realtime.segment.flush.desired.size": "200M",
        "stream.kafka.consumer.prop.auto.isolation.level": "read_committed",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.prop.group.id": "XXX",
        "stream.kafka.consumer.prop.client.id": "XXX",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<http://schema-registry:8081>"
This is to use the avro confluent message decoder, is that what you wanted?
w
yes, where are you configuring the basic auth?
for the schema registry? I can’t find anything in the docs, and the code looks like it’s not passing those properties through to the
CachedSchemaRegistryClient
that you instantiate in the
KafkaConfluentSchemaRegistryAvroMessageDecoder
but it would certainly make my life easier if it were already supported somehow
e
It adds them as properties to the consumer.
The kafka consumer interacts with the schema registry
We do not use the auth parameters, but we do use schema registry and it's been working for us.
Does that help?
w
Kind of? I’m a little confused, to be honest. You said you are using basic auth, but then you said you aren’t using auth parameters for the schema registry - so I’m trying to figure out how you are authenticating to it. It’s a little unclear.
e
Maybe I misunderstood your question?
w
What I would expect to be able to do would be something like this:
Copy code
"stream.kafka.decoder.prop.schema.registry.rest.url": "https://<schema-registry-url>.confluent.cloud",
          "stream.kafka.decoder.prop.schema.registry.basic.auth.credentials.source": "USER_INFO",
          "<http://stream.kafka.decoder.prop.schema.registry.basic.auth.user.info|stream.kafka.decoder.prop.schema.registry.basic.auth.user.info>": "schema_registry_basic_auth_username:schema_registry_basic_auth_pw",
e
I was showing how we pass the schema registry and kafka low level consumer class stream configs
w
yes, that part makes sense - when you said “We use basic auth”, I think it threw me, because I didn’t see that anywhere in the provided example
e
Ah, by basic I meant "no auth" 🙂
s/basic/no/ 🙂
So those params do not work for you?
w
They don’t, because our schema registry is secured and requires credentials
I believe that if you simply allowed properties under the
DECODER_PROPS_PREFIX
to flow into the config for the CachedSchemaRegistryClient (instead of only extracting the SSL properties, and tossing the rest), it would properly configure the underlying RestService - the logic is already there in the alternate constructors for CachedSchemaRegistryClient
đź‘Ť 1
e
Sounds reasonable. I would create a github issue, someone can contribute a fix for that.
w
There’s an issue, and I’m testing a (hacky) fix locally to see if that actually works
đź‘Ť 1
e
nice, good luck!
w
I didn’t create it, but it’s the same problem I’m hitting