https://pinot.apache.org/ logo
e

Elon

12/21/2020, 9:40 PM
Hi, anyone familiar with setting generic kafka properties, ex.
stream.kafka.consumer.prop.isolation.level
or
group_id
,
client_id
, etc. - it looks like only a specific list properties are honored, like
stream.kafka.topic.name
,
stream.kafka.decoder.class.name
... - I can create a github issue, lmk.
x

Xiang Fu

12/21/2020, 10:19 PM
I think all the configs will be passed to kafka consumer
E.g. you can put
Copy code
"ssl.truststore.password": "${KAFKA_TRUSTSTORE_PASSWORD}",
"ssl.keystore.password": "${KAFKA_KEYSTORE_PASSWORD}",
into your stream configs
this is a full sample table conf just fyi
Copy code
"tableIndexConfig": {
    "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.consumer.type": "LowLevel",
        "stream.kafka.topic.name": "my-events",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.broker.list": "my-kafka:9090",
        "security.protocol": "SSL",
        "ssl.truststore.location": "/opt/pinot/kafka.client.truststore.jks",
        "ssl.keystore.location": "/opt/pinot/kafka.client.keystore.jks",
        "ssl.truststore.password": "$KAFKA_TRUSTSTORE_PASSWORD",
        "ssl.keystore.password": "$KAFKA_KEYSTORE_PASSWORD",
        "ssl.endpoint.identification.algorithm": "",
        "stream.kafka.decoder.prop.schema.registry.url": "<https://confluent-schema-registry.my-schema-registry>",
        "stream.kafka.decoder.prop.schema.registry.rest.url": "<https://confluent-schema-registry.my-schema-registry>",
        "realtime.segment.flush.threshold.rows": "5000000",
        "realtime.segment.flush.threshold.time": "1d",
        "realtime.segment.flush.threshold.segment.size": "500m",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.fetch.timeout.millis": "40000"
    }
}
e

Elon

12/21/2020, 11:06 PM
Thanks @Xiang Fu! So I see code that strips the prefix, but it doesn't seem to honor the following properties (unless this is incorrect):
Copy code
"streamConfigs": {  
...     "stream.kafka.consumer.prop.isolation.level": "read_committed",
       "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
       "stream.kafka.consumer.prop.group.id": "a2938a5b-747c-4a2a-90e6-2eaddf81164d",
       "stream.kafka.consumer.prop.client.id": "7100f4b4-f15e-4624-881c-7949c807addf",
...
            }
those specific properties do not seem to be set when we check the
ConsumerConfig
's in the server logs
x

Xiang Fu

12/21/2020, 11:16 PM
then I think you can directly set them without prefix?
e

Elon

12/22/2020, 1:23 AM
Ah, I'll try that, thanks!
@Xiang Fu, that worked! no prefix. Thanks so much!
x

Xiang Fu

12/22/2020, 2:35 AM
great!
c

Chun Zhang

01/05/2021, 4:04 PM
@Xiang Fu do you know why some properties work with prefix but some are not? Based on the code here, prefix will be required. Is it a bug somewhere? This kind of inconsistent is very misleading because we don't know when to add prefix and when to not
💡 1