Hello, I am facing issues with setting the consume...
# troubleshooting
t
Hello, I am facing issues with setting the consumer configs for kafka in table config. I am using the image with
latest
tag. I tried by using the
stream.kafka
/
stream.kafka.consumer.prop
as prefixes both did not work.
I am trying to read from a SSL enabled kafka and facing the issue now. It works fine with the same kafka cluster without ssl.
x
@Tanmay Movva can you paste the table conf here
@Neha Pawar can you help check with this ?
t
Table conf
Copy code
{
  "tableName": "rawServiceViewTest_REALTIME",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "schemaName": "rawServiceView",
    "timeType": "MILLISECONDS",
    "timeColumnName": "start_time_millis",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "7",
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
    "replicasPerPartition": "1"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "LowLevel",
      "stream.kafka.topic.name": "hypertrace-raw-service-view-events",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.broker.list": "<http://stage-kafka.razorpay.in:9090|stage-kafka.razorpay.in:9090>",
      "stream.kafka.security.protocol": "SSL",
      "stream.kafka.ssl.truststore.location": "/tmp/ktruststore.jks",
      "stream.kafka.ssl.keystore.location": "/tmp/keystore.jks",
      "stream.kafka.ssl.truststore.password": "password",
      "stream.kafka.ssl.keystore.password": "password",
      "stream.kafka.ssl.endpoint.identification.algorithm": "",
      "stream.kafka.decoder.prop.schema.registry.url": "<https://confluent-schema-registry-server.int.stage.razorpay.in>",
      "stream.kafka.decoder.prop.schema.registry.rest.url": "<https://confluent-schema-registry-server.int.stage.razorpay.in>",
      "realtime.segment.flush.threshold.rows": "5000000",
      "realtime.segment.flush.threshold.time": "1d",
      "realtime.segment.flush.threshold.segment.size": "500m",
      "stream.kafka.auto.offset.reset": "smallest",
      "stream.kafka.fetch.timeout.millis": "40000"
    },
    "loadMode": "MMAP"
  },
  "metadata": {}
}
This is the error I am getting
Copy code
"error": "org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata"
d
It feels like network connectivity can’t be established to
<http://stage-kafka.razorpay.in:9090|stage-kafka.razorpay.in:9090>
is that host and port routable from your pinot cluster?
n
configs look correct. It might just be that pinot is not able to reach “stream.kafka.broker.list”: “stage-kafka.razorpay.in:9090",
t
Will check the connectivity again and let you know.
Btw, I checked the controller logs and the auto.offset.reset was set to
latest
eventhough I mentioned it as
smallest
in the config. I checked the consumer configs that are logged.
n
“stream.kafka.consumer.prop.auto.offset.reset” this is the right property. Did you see something else in the docs? I’ll fix that if you did
t
That did not work. Passing those configs without any prefix worked.
Copy code
"auto.offset.reset": "earliest"
This worked. SSL configs are also being passed to kafka consumer config now, checked from logs. Although still not able to connect probably because of some certs issue.
n
even if you see that printed in the logs, it will only be assigned to the Pinot consumer manager if you pass it with the prefix. https://github.com/apache/incubator-pinot/blob/master/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java#L113