Hi, I would like to define consumer group in apach...
# general
s
Hi, I would like to define consumer group in apache pinot table config, So I can monitor the Kafka topic with consumer group Lag(pinot is consuming) Currently, I am running 3K TPS with 3 instance and I noticed even after Kafka ingestion stopped, I still see hug lag and after 20 mins consumer lag is drained, I would like to monitor this consumer group? Is there we can define consumer group here? Also, if we don't define consumer group from consumer application, Kafka will define a random consumer group name like mentioned below, It's very hard to track consumer info from Kafka side, So I would like to define from pinot ?
Copy code
console-consumer-32555
console-consumer-37046
console-consumer-3568
console-consumer-11198
table config:
Copy code
{
  "REALTIME": {
    "tableName": "uapi-testing_REALTIME",
    "tableType": "REALTIME",
    "segmentsConfig": {
      "schemaName": "uapi-testing",
      "timeColumnName": "timestamp",
      "replication": "2",
      "replicasPerPartition": "2"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant",
      "tagOverrideConfig": {}
    },
    "tableIndexConfig": {
      "streamConfigs": {
        "streamType": "kafka",
        "stream.kafka.topic.name": "uapitranlog2",
        "stream.kafka.broker.list": "localhost:6667",
        "stream.kafka.consumer.type": "lowlevel",
        "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
        "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
        "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
        "realtime.segment.flush.threshold.rows": "0",
        "realtime.segment.flush.threshold.time": "240h",
        "realtime.segment.flush.segment.size": "100M"
      },
      "createInvertedIndexDuringSegmentGeneration": false,
      "invertedIndexColumns": [],
      "rangeIndexColumns": [],
      "autoGeneratedInvertedIndex": false,
      "sortedColumn": [],
      "bloomFilterColumns": [],
      "loadMode": "MMAP",
      "noDictionaryColumns": [],
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "aggregateMetrics": false,
      "nullHandlingEnabled": false
    },
    "metadata": {},
    "quota": {},
    "routing": {},
    "query": {},
    "ingestionConfig": {},
    "isDimTable": false
  }
}
m
There’s ingestion status api available in swagger, have you tried that?
s
I checked on /tables/{tableName}/status, I don't see any info for consumer group table status
Copy code
Curl
curl -X GET "<http://vhldvatlb211:9000/tables/uapi-testing_REALTIME/status?type=realtime>" -H "accept: application/json"
Request URL
<http://vhldvatlb211:9000/tables/uapi-testing_REALTIME/status?type=realtime>
Server response
Code	Details
200	
Response body
Download
{
  "ingestionStatus": {
    "ingestionState": "UNHEALTHY",
    "errorMessage": "Not all servers responded for segment: uapi-testing__11__74__20220117T2335Z Missing servers : [Server_10.12.76.14_8098]"
  }
}
Response headers
 access-control-allow-origin: * 
 content-length: 210 
 content-type: application/json 
 pinot-controller-host: <http://vhldvatlb211.tvlport.net|vhldvatlb211.tvlport.net> 
 pinot-controller-version: Unknown 
Responses
Code	Description
200	
successful operation
Example Value
Model
string
p
Tl:Dr; pinot doesn't use Kafka consumer groups. I haven't personally reached the stage of monitoring yet but please check out this talk for more details on how Pinot consumes from Kafka https://www.confluent.io/resources/kafka-summit-2020/apache-pinot-case-study-building-distributed-analytics-systems-using-apache-kafka/