Hi All, I have a Realtime table consuming from kaf...
# general
k
Hi All, I have a Realtime table consuming from kafka. As of now it has 5 Billion records. I’m performing look up [predicate] on inverted index keys [userid, eventcategory, eventlabel] with using “metricFieldSpecs” column as timestampist for range condition. My Query is taking too much time to finish > 10 seconds almost. How can i can configure it with best optimised configuration. Query
Copy code
select userid,eventlabel,sessionid, MIN(timestampist) as mint, MAX(timestampist) as maxt, (MAX(timestampist) - MIN(timestampist)) as diff_time  from default.click_stream where eventlabel !='null' and timestampist between 1615833000000 and 1616225312000 group by userid,eventlabel,sessionid
m
How long does it take if you just do count(*) without group by but using same where clause? And how many records does it select? My guess is your query selects too many rows and so group by is slow In this case, may be try star tree indexing
k
@User: select count(*) from click_stream , average its taking 250ms
@User: It’s taking 5 seconds for ---- select MIN(timestampist) as mint, MAX(timestampist) as maxt, (MAX(timestampist) - MIN(timestampist)) as diff_time from click_stream where timestampist between 1612264456000 and 1619962345000. numDocsScanned : 442963466
Copy code
select userid,eventlabel, MIN(timestampist) as mint, MAX(timestampist) as maxt, (MAX(timestampist) - MIN(timestampist)) as diff_time  from click_stream where  timestampist between 1612264456000 and 1619962345000 group by userid,eventlabel.   This is taking 60 seconds numDocsScanned 221866025
m
Yeah, you are trying to group by 442M records. How many nodes, what’s the jvm heap size? Your query is essentially very expensive
k
I have re created table with star tree :
Copy code
{
  "tableName": "click_stream_REALTIME",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
    "timeColumnName": "created_at",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "90",
    "segmentPushType": "APPEND",
    "timeType": "DAYS",
    "replication": "1",
    "replicasPerPartition": "1",
    "schemaName": "click_stream"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "tableIndexConfig": {
    "autoGeneratedInvertedIndex": true,
    "loadMode": "MMAP",
    "streamConfigs": {
      "streamType": "kafka",
      "stream.kafka.consumer.type": "simple",
      "stream.kafka.topic.name": "vedantu.dp.click_stream.click_stream",
      "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
      "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
      "stream.kafka.hlc.zk.connect.string": "zk1:2181/kafka",
      "stream.kafka.zk.broker.url": "zk1:2181/kafka",
      "stream.kafka.broker.list": "kafka01:9092",
      "realtime.segment.flush.threshold.time": "3600000",
      "realtime.segment.flush.threshold.size": "50000",
      "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
    },
    "aggregateMetrics": false,
    "enableDefaultStarTree": false,
    "enableDynamicStarTreeCreation": false,
    "nullHandlingEnabled": true,
    "createInvertedIndexDuringSegmentGeneration": false,
    "starTreeIndexConfigs": [
      {
        "dimensionsSplitOrder": [
          "eventcategory",
          "eventlabel",
          "userid",
          "sessionid"
        ],
        "skipStarNodeCreationForDimensions": [],
        "functionColumnPairs": [
          "MIN__timestampist",
          "MAX__timestampist"
        ]
      }
    ],
    "rangeIndexColumns": [
      "timestampist",
      "created_at"
    ]
  },
  "metadata": {
    "customConfigs": {}
  },
  "routing": {
    "instanceSelectorType": "replicaGroup"
  },
  "isDimTable": false
}
@User but it doesn’t help here, what I have to follow, do suggest me.
m
The query is really expensive.
How many servers and what’s the jvm heap
k
Yeah, there are 2 servers with 100GB heap space
m
What’s the use case? Is this going to be a real production query? Or are you experimenting
k
This is going to be a prod query :
Copy code
select userid,eventlabel,sessionid, MIN(timestampist) as mint, MAX(timestampist) as maxt, (MAX(timestampist) - MIN(timestampist)) as diff_time  from default.click_stream where sessionid !='null' and timestampist between 1615833000000 and 1616225312000 group by userid,eventlabel,sessionid
scanning 3 months ka data.
m
What’s your time column granularity
k
1 minute
m
Hmm can you live with 1 day
Since you are going back 3 months?
And uses aggregate metrics. This will reduce number of rows and improve latency
k
I’ll try now
Copy code
"dateTimeFieldSpecs": [
    {
      "name": "created_at",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:DAY"
    }
  ]
@User should i try this…
m
Aslo enable metrics aggregation
k
OK
“aggregateMetrics”: false --- > true
Lets move to troubleshooting
k
@User yes
@User Only supported aggregation right now is
SUM
on “aggregateMetrics”: true. Which aren’t using as a part of query. Now My queries are not ever responding in 60 seconds with new table definition with start tree index.