hey friends, our team is trying to query pinot fro...
# general
l
hey friends, our team is trying to query pinot from the query console, and we are trying to understand some of the latency in the queries we are currently executing a query like the following :
select * from table where user_id = x
when we first hit a query like this we get more than 500ms after we hit it again we get good results i guess it’s because the segment gets closer to memory, i was wondering why something like this would happen 500ms is def out of our expectations for query latency, our current configuration of the table has indexing and it’s a real time table. our current config for noDictionaryColumns
Copy code
"noDictionaryColumns": [
        "click_count",
       "impression_count",      
],
so that we can aggregate in our dimensions using “aggregateMetrics” : true segment flushing config configurations:
Copy code
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.segment.size": "250M"
we have rangeIndex in our serve_time which is an epoch timestamp to the hour. we have an invertexindex on the user_id and sortedcolumn as well as a partition map with 4 partitions with modulo. we chose 4 partitions because the consuming topic has 4 partitions. the consuming topic is getting around 5k messages a second. finally we currently have 2 servers with 4gigs of heap for java and 10g in the machine itself 4 cpu and 500G of disk space. at the moment of writing this message we have 96 segments in this table. metrics from what we issue a query like the one seen above:
Copy code
timeUsedMs	numDocsScanned	totalDocs	numServersQueried	numServersResponded	numSegmentsQueried	numSegmentsProcessed	numSegmentsMatched	numConsumingSegmentsQueried	numEntriesScannedInFilter	numEntriesScannedPostFilter	numGroupsLimitReached	partialResponse	minConsumingFreshnessTimeMs	offlineThreadCpuTimeNs	realtimeThreadCpuTimeNs
264	40	401325330	2	2	93	93	4	1	0	320	false	-	1634050463550	0	159743463
could anyone direct me into what to look into even this queries based on the trouble shooting steps don’t seem to have much numDocsScanned and numEntriesScannedPostFilter
r
is this 500ms cold start consistent even when the JVMs have been running for a long time?
l
it’s
3 digits latency
r
I'm asking whether this is JVM warmup
l
i don’t think it’s once i grab a different user_id i get again 3 digits latency
r
ok
l
pinot has been running for a while in our cluster if that’s what you are asking
r
what's the cardinality of
user_id
?
if it's very high, there will be a lot of bitmaps in the inverted index on
user_id
which will incur some set up costs (what version are you using, by the way)? Which dimension are you sorting by? Would it be possible to sort by
user_id
so you can use a sorted index instead?
l
it’s on the hundreds of thousands? we usually sort by impression_count or click_count
so would sort by user_id, impression_count help at all?
i’m on 0.8.0
r
the fastest index you can have is a sorted index, so it depends what the most important queries are. If this is a really important query, I would investigate sorting by
user_id
and in 0.9.0 the new range index should have much lower latency for
impression_count
and
click_count
high cardinalities can cause issues for inverted indexes in most systems (we don't know if this is the root cause yet), because while each of the bitmaps is highly selective you have hundreds of thousands of them.
could you run queries for `user_id`s you haven't queried before with tracing and use the JSON format view and paste the trace at the bottom here, so we can see which operators take most time?
l
Copy code
"exceptions": [],
  "numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 93,
  "numSegmentsProcessed": 93,
  "numSegmentsMatched": 4,
  "numConsumingSegmentsQueried": 1,
  "numDocsScanned": 40,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 320,
  "numGroupsLimitReached": false,
  "totalDocs": 404198690,
  "timeUsedMs": 505,
  "offlineThreadCpuTimeNs": 0,
  "realtimeThreadCpuTimeNs": 294557175,
  "segmentStatistics": [],
  "traceInfo": {},
  "numRowsResultSet": 10,
  "minConsumingFreshnessTimeMs": 1634052628512
that’s without order by user_id
oh wait forgot tracing lol
r
Also, it would be interesting to get a cpu profile by installing async-profiler as a native agent as outlined here, setting
Copy code
-agentpath:/path/to/libasyncProfiler.so=start,event=cpu,file=cpu.html
while you run a load of queries which always query a user_id for the first time (better automated) for about a minute or so, we can see exactly what's going on
l
Copy code
"exceptions": [],
  "numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 93,
  "numSegmentsProcessed": 93,
  "numSegmentsMatched": 4,
  "numConsumingSegmentsQueried": 1,
  "numDocsScanned": 40,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 320,
  "numGroupsLimitReached": false,
  "totalDocs": 404233610,
  "timeUsedMs": 452,
  "offlineThreadCpuTimeNs": 0,
  "realtimeThreadCpuTimeNs": 181968373,
  "segmentStatistics": [],
  "traceInfo": {
    "pinot-server-1.pinot-server-headless.pinot.svc.cluster.local": "[{\"0\":[{\"SelectionOnlyCombineOperator Time\":181},{\"InstanceResponseOperator Time\":182}]},{\"0_0\":[]},{\"0_1\":[]},{\"0_2\":[{\"SortedIndexBasedFilterOperator Time\":0},{\"DocIdSetOperator Time\":0},{\"ProjectionOperator Time\":0},{\"PassThroughTransformOperator Time\":0},{\"SelectionOnlyOperator Time\":51}]},{\"0_3\":[{\"SortedIndexBasedFilterOperator Time\":0},{\"DocIdSetOperator Time\":0},{\"ProjectionOperator Time\":0},{\"PassThroughTransformOperator Time\":0},{\"SelectionOnlyOperator Time\":180}]}]",
    "pinot-server-0.pinot-server-headless.pinot.svc.cluster.local": "[{\"0\":[{\"SelectionOnlyCombineOperator Time\":1},{\"InstanceResponseOperator Time\":1}]},{\"0_0\":[]},{\"0_1\":[]},{\"0_3\":[{\"SortedIndexBasedFilterOperator Time\":0},{\"DocIdSetOperator Time\":0},{\"ProjectionOperator Time\":0},{\"PassThroughTransformOperator Time\":0},{\"SelectionOnlyOperator Time\":1}]},{\"0_2\":[{\"SortedIndexBasedFilterOperator Time\":0},{\"DocIdSetOperator Time\":0},{\"ProjectionOperator Time\":0},{\"PassThroughTransformOperator Time\":0},{\"SelectionOnlyOperator Time\":1}]}]"
  },
  "numRowsResultSet": 10,
  "minConsumingFreshnessTimeMs": 1634052785732
r
ok so it looks like
SelectionOnlyCombineOperator
and
SelectionOnlyOperator
are the problems in this query, forget everything I said about indexes
getting profiles of your system doing the slow query would help to figure this out at this point
k
whats the query for above response?
l
select * from table where user_id = x
for my own education what is that?
SelectionOnlyCombineOperator
and
SelectionOnlyOperator
r
Is this on AWS by the way? What instance type are you using?
k
Copy code
"numSegmentsQueried": 93,
  "numSegmentsProcessed": 93,
  "numSegmentsMatched": 4,
l
this is on kubernetes, google instances.
k
you can do a lot better with pruning
partitioning/bloomfilter etc will help a lot
l
?
to decrease
"numSegmentsQueried": 93
k
no, to reduce "numSegmentsProcessed": 93,
l
we have partitioning 4 partitions only tho, i thought it had to match the kafka topic but that’s not the case yea?
and add bloomfilter on user_id you mean yea
k
yes,
if either do partitioning + sorting or
bloomfilter + sorting
you dont need inverted index for this
l
and if i add this now it will only apply to new segments yea? even if i reload segments
and either partitioning + sorting or bloomfilter + sorting, inverted index not required.
k
Bloomfilter will be applied to existing segments as well
l
i just have to push reload segments yea?
k
Yes, invoke the reload all segments
l
also one question, the number of partitions we setup in pinot doesn’t have to be equal to the one in kafka yea?
k
Pinot does not need any setup for partitions
l
doens’t it need this
Copy code
"segmentPartitionConfig": {
      "columnPartitionMap": {
k
It derives it automatically from Kafka or in batch mode, you can partition the data and push it to Pinot
Yes.. but number of partitions is not needed
l
ohh cause it already partitions by whatever kafka is partitioning by?
and in this case i’m consuming from kafka
k
Right
l
this wouldn’t be needed as well?
Copy code
"routing": {
    "segmentPrunerTypes": [
      "partition"
    ]
  },
k
Hey, you probably tried this @User, but can you put a
LIMIT
clause on your query. Let's see how it affects the query response time. I've noticed some wonkiness lately related to this.
k
@User Maybe, I am not sure but we should able to apply that automatically cc @User
m
Glanced over the thread, here's my initial input:
Copy code
- Definitely sort on user_id if you are not doing so.
- If high throughput, partition on user_id as well.
- The partition function does need to be specified and matched for partition based pruning.
👍 1
@User can you confirm if the data is sorted on user_id or not?
l
do we add bloom filter too?
you mean just by issuing a regular select yea?
also @User does the partition has to match the kafka partitioning count? or it doesn’t matter
m
No, I meant if you are sorting on user_id in offline, and specified as sorted column for RT?
@User The count may not need to match, the function impl needs to (we use Murmur that matches Kafka)
l
yes sorted_column is user_id but in the query i’m not ordering by user_id perse
m
What's your read/write qps?
l
150qps
m
do you have offline?
l
approx that i think it’s even lesser
m
And what's the typical number of rows selected for a user?
And is the slow start specific to select query or count(*) also?