hey friends, coming with a sql optimization questi...
# troubleshooting
l
hey friends, coming with a sql optimization question today, we are trying to query pinot with larger timespans now that we are migrating our data, for example, this year, last year filters, for our use case we see that query execution time is way slower now and are trying to figure out ways to gain some performance given this, do you have any recommendations?
Copy code
SELECT product_id, SUM(impression_count) as impression_count, SUM(click_count) as click_count, SUM(cost) as spent_total FROM metrics 
WHERE user_id = xx AND serve_time BETWEEN 1641013200 AND 1654092017  
GROUP BY product_id 
LIMIT 100000
this is an example of a query we are running
Copy code
"numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 1317,
  "numSegmentsProcessed": 168,
  "numSegmentsMatched": 117,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 69212,
  "numEntriesScannedInFilter": 1165155303,
  "numEntriesScannedPostFilter": 415272,
  "numGroupsLimitReached": false,
  "totalDocs": 10362679599,
  "timeUsedMs": 4623,
this is some of the stats that come back. our data resolution is hourly for this data. Do you all have any idea how to make a query like this perform better? We have an idea of making the records not have hourly resolution after certain period of time but have daily resolution so that the data is compressed even further but wanted to ask if there are any methods we could use for this and if the method we can implement makes sense to you all.
d
Some things I would look at are if
serve_time
is either the time column or a column determined as a partition for the segments - so that you can prune segments to avoid searching too many of them -, and maybe creating an inverted index on
user_id
. I don't know, however, if segment partitioning works with realtime ingestion (which I'm guessing is your case).
l
serve_time
is the timecolumn in our config
"timeColumnName": "serve_time",
in this case we are querying an offline table
d
Ah, cool! Do you perhaps have a column with low cardinality that could be used as a partition marker? E.g. a "region" or "country" one which you could use for partitioning, so that you reduce the amount of segments queried?
k
Copy code
"numEntriesScannedInFilter": 1165155303,
you are missing indexes
add range index on the serve_time column
l
we have this in the offline:
Copy code
"segmentPartitionConfig": {
        "columnPartitionMap": {
          "user_id": {
            "functionName": "Murmur",
            "numPartitions": 8
          }
        }
      },
Copy code
"sortedColumn": [
        "user_id"
      ],
      "bloomFilterColumns": [
        "user_id",
        "product_id"
      ],
oh we def don’t have range indexes
Copy code
"rangeIndexColumns": [],
      "rangeIndexVersion": 2,
is there not a default idnex on the serve_time column by default or is that only on realtime? @Kishore G
d
I thought time columns always had a default index too...
m
Yes, you should add range index. Also is the timestamp in seconds? Do you need to query seconds granularity? If not store it in the granularity you want to query that will reduce cardinality and improve range index perf
l
granularity of this query is hourly
so just to confirm there’s no default index on serve_time
k
right, pinot does not add default index on any column
l
I ran a query for a month where we had data in our production system and compared with data ingested thru offline and the query in prod is way faster then whatever we ingested in offline which is weird to me
I will add this to our realtime config as well as well as offline, (the index on serve_time that is)
I will report bak
k
the only explanation is when you ingested through offline, the data got mixed up
m
compare metadata
Also in offline you will need to explicitly partition the data
l
we have a spark job that gets data from bigquery puts it on parquet files somwhere in GCS and then we ingest this data thru the standalone job into pinot
m
Do you partition it though?
Iirc you partition it in real-time
l
this is the config we have in offline tbh not so much differently from realtime but i guess some of this configs don’t make sense in offline:
Copy code
{
  "OFFLINE": {
    "tableName": "metrics_OFFLINE",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "schemaName": "metrics",
      "retentionTimeUnit": "DAYS",
      "retentionTimeValue": "730",
      "replication": "2",
      "timeColumnName": "serve_time",
      "allowNullTimeValue": false,
      "segmentPushType": "APPEND"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant"
    },
    "tableIndexConfig": {
      "invertedIndexColumns": [],
      "noDictionaryColumns": [
        "click_count",
        "order_count",
        "impression_count",
        "cost",
        "revenue"
      ],
      "rangeIndexColumns": [],
      "rangeIndexVersion": 2,
      "autoGeneratedInvertedIndex": false,
      "createInvertedIndexDuringSegmentGeneration": false,
      "sortedColumn": [
        "user_id"
      ],
      "bloomFilterColumns": [
        "user_id",
        "product_id"
      ],
      "loadMode": "MMAP",
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "enableDefaultStarTree": false,
      "enableDynamicStarTreeCreation": false,
      "segmentPartitionConfig": {
        "columnPartitionMap": {
          "user_id": {
            "functionName": "Murmur",
            "numPartitions": 16
          }
        }
      },
      "aggregateMetrics": false,
      "nullHandlingEnabled": false
    },
    "metadata": {},
    "quota": {},
    "routing": {
      "segmentPrunerTypes": [
        "partition"
      ]
    },
    "query": {},
    "fieldConfigList": [],
    "ingestionConfig": {},
    "isDimTable": false
  }
}
like the partitioning stuff doesn’t make sense in offline right? only realtime
when you say partition is basically separate this data by an id and have stuff cluster together given an id right, and a partitions number like user_id % PARTITION_NUMBER and then have different files like that?
d
Do you also partition your user_ids when doing the batch ingestion, to make sure you don't end up with segments that would contain basically any user_id? Because if this happens, that partitioning scheme is useless - I made this mistake too some time ago.
It works for offline, yes - I'm using that in a project I'm building.
l
as in the partition config in the json file, that doesn’t matter for offline right? it’s just how you do it yourself
Copy code
"exceptions": [],
  "numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 1423,
  "numSegmentsProcessed": 168,
  "numSegmentsMatched": 117,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 69212,
  "numEntriesScannedInFilter": 1165155303,
  "numEntriesScannedPostFilter": 415272,
  "numGroupsLimitReached": false,
  "totalDocs": 11234231893,
  "timeUsedMs": 1464,
  "offlineThreadCpuTimeNs": 0,
  "realtimeThreadCpuTimeNs": 0,
  "offlineSystemActivitiesCpuTimeNs": 0,
  "realtimeSystemActivitiesCpuTimeNs": 0,
  "offlineResponseSerializationCpuTimeNs": 0,
  "realtimeResponseSerializationCpuTimeNs": 0,
  "offlineTotalCpuTimeNs": 0,
  "realtimeTotalCpuTimeNs": 0,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 0,
  "numRowsResultSet": 1
i added the date range index but seems like a lot of things get into
numEntriesScannedInFilter
still
also, for some of the queries, i have gotten an error about one of the offline servers not being able to respond with errorCode 427 anyone knows what’s that about
Copy code
"rangeIndexColumns": [
        "serve_time"
      ],
      "rangeIndexVersion": 2,
added this
m
Because your timestand is still in seconds (or ms) and hence really high cardinality? reducing that to hours will help as I mentioned earlier.
l
our timestamp is in seconds but it’s truncated to the hour if that makes sense
m
I see so for all data in one hour, they all share the same seconds value?
l
yes
exactly that
m
Then I suspect your data is not sorted.
WHERE user_id = xx AND serve_time BETWEEN 1641013200 AND 1654092017
The user id predicate should reduce the numDocsScanned to max docs a user can have.
And I doubt you have users with
1165155303
docs
Can you check segment metadata to ensure data is sorted?
l
yea it’s weird to me cause even the stats here
Copy code
"exceptions": [],
  "numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 1497,
  "numSegmentsProcessed": 168,
  "numSegmentsMatched": 117,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 69212,
  "numEntriesScannedInFilter": 1165155303,
  "numEntriesScannedPostFilter": 415272,
  "numGroupsLimitReached": false,
  "totalDocs": 11854535291,
  "timeUsedMs": 1755,
  "offlineThreadCpuTimeNs": 0,
  "realtimeThreadCpuTimeNs": 0,
  "offlineSystemActivitiesCpuTimeNs": 0,
  "realtimeSystemActivitiesCpuTimeNs": 0,
  "offlineResponseSerializationCpuTimeNs": 0,
  "realtimeResponseSerializationCpuTimeNs": 0,
  "offlineTotalCpuTimeNs": 0,
  "realtimeTotalCpuTimeNs": 0,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 0,
  "numRowsResultSet": 25305
m
Yes check if data sorted
l
it comes back with 25k documents and it’s doing this:
Copy code
"numEntriesScannedInFilter": 1165155303,
  "numEntriesScannedPostFilter": 415272,
m
can you run
select count(*) where userId =xxx
? If numEntiresScanedInFilter is not zero then data is not sorted
l
Copy code
"exceptions": [],
  "numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 1497,
  "numSegmentsProcessed": 204,
  "numSegmentsMatched": 143,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 83660,
  "numEntriesScannedInFilter": 1435776018,
  "numEntriesScannedPostFilter": 0,
  "numGroupsLimitReached": false,
  "totalDocs": 11854535291,
  "timeUsedMs": 4000,
  "offlineThreadCpuTimeNs": 0,
  "realtimeThreadCpuTimeNs": 0,
  "offlineSystemActivitiesCpuTimeNs": 0,
  "realtimeSystemActivitiesCpuTimeNs": 0,
  "offlineResponseSerializationCpuTimeNs": 0,
  "realtimeResponseSerializationCpuTimeNs": 0,
  "offlineTotalCpuTimeNs": 0,
  "realtimeTotalCpuTimeNs": 0,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 0,
  "numRowsResultSet": 1
for that query, yea numEntiresScanedInFilter not zero
if you are curious
83660
the
count(*)
is for that user
how do I fix that,
Copy code
"sortedColumn": [
        "user_id"
      ],
doesn’t this take care of that?
m
Is this realtime or offline ingestion? In realtime pinot will sort. In offline, your ingestion job needs to sort.
l
i’m afraid that even our realtime tables do not have this data sorted
how can i check?
m
How did you verify RT does not have it sorted
l
I did the SELECT COUNT(*) for the same user in prod and got umEntiresScanedInFilter not zero
m
Hmm, check segment metadata as I mentioned earlier
l
can i check via endpoint?
/segments/{tableName}/{segmentName}/metadata
?
it seems like the endpoint doesn’t show that data
but i can confirm things are not sorted by
user_id
m
I think there is another endpoint for that. Let me find
l
we sorted the data but it seems like it is not doing it for us so far for the segment we are looking at
and add this to the offline table config yes?
Copy code
{
    "tableIndexConfig": {
        "sortedColumn": [
            "column_name"
        ],
        ...
    }
}
m
For offline, you need to sort the data outside of Pinot (before ingestion).
l
right we did that
m
For realtime, Pinot should have done it. I can’t think of a reason why it isn’t for your case.
l
and add the tableIndexConfig as well to the offline table?
yea yea we checked for realtime and it has indeed done it, for some reason numEntriesScannedInFilter was > than 0 tho when we did the COUNT(*) but then i checked the segments and it said isSorted=true for
user_id
we sorted in spark, and then we used the standalone job to ingest one day data but the segments say they are not sorted
m
If isSorted = true for all segments of a table (which I expect for RT), numEntriesScannedInFilter should be zero.
Is this a hybrid table?
l
this is hybrid yes
Copy code
"sortedColumn": [
        "user_id"
      ],
      "bloomFilterColumns": [
        "user_id",
        "product_id"
      ],
m
Ok, so that could explain numEntrieScannedInFilter > 0 coming from offline.
If segment says it is not sorted, then for sure the input is not sorted.
l
would that mess things up?
if user_id is 2 indexes
m
It cannot change the sort order in offline. That is immutable as far as Pinot is concnerned.
Order in offline segment == order in what it saw in input file
l
i’m looking at the raw data of the segment (parquet file) and it’s sorted
i see it going on ascending order for the
user_id
and just to clarify this is just the data that we are migrating into pinot, not what is already there from realtime + RT to offline, that does seem to be sorted when checking the metadata
but the parquet file we are ingesting now does look sorted to me ASC by
user_id
and it doesn’t have to be sorted across segments for the same day
m
Indexes are in the scope of a segment.
Can user id’s be null? If so, that could explain why Pinot thinks it is not sorted.
Also, is this a number or string in parquet side, and does that match Pinot side (may be number sorted vs string sorted would be different).
l
user_id cannot be null
Copy code
############ Column(user_id) ############
name: user_id
path: user_id
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: None
converted_type (legacy): NONE
parquet tools in python showsm e this
shows me this
m
All things matching I am pretty sure that if Pinot sees input is not sorted, then it is not.
l
i see the parquet file and it’s sorted i’m not sure why it’s thinking it’s not anything else i could share to help triage what may be happening?
Copy code
| 1.90185e+07 |
| 1.90211e+07 |
that’s how the shop_id looks like
in the parquet file
in the spark job that generates this parquet files right before we write them parquet files we do this:
Copy code
.orderBy(imp_click_receipts_dataframe_final("user_id"))
which should sort it
m
If you can provide a parquet file I can tell you exactly why Pinot thinks it is not sorted
Just to close the thread , we found the data was indeed not sorted.
l
data was not sorted indeed, the problem was with the way we were writing out data to parquet from spark, we have finally fixed this, we are running ingestion again and will be doing performance testing once we load sufficient data thank you so much
👍 1
m
Thanks @Luis Fernandez would be good to see the before and after comparisons
r
tl;dr is that we suck at spark
🤣 1
l
way better results
Copy code
"exceptions": [],
  "numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 11445,
  "numSegmentsProcessed": 213,
  "numSegmentsMatched": 116,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 68683,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 412098,
  "numGroupsLimitReached": false,
  "totalDocs": 50128023764,
  "timeUsedMs": 581,
  "offlineThreadCpuTimeNs": 0,
  "realtimeThreadCpuTimeNs": 0,
  "offlineSystemActivitiesCpuTimeNs": 0,
  "realtimeSystemActivitiesCpuTimeNs": 0,
  "offlineResponseSerializationCpuTimeNs": 0,
  "realtimeResponseSerializationCpuTimeNs": 0,
  "offlineTotalCpuTimeNs": 0,
  "realtimeTotalCpuTimeNs": 0,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 0,
  "numRowsResultSet": 25228
how does this stats look to you?
this is our sandbox environment
we are going to run this in our dev env now, before i do so, is there any other thing that we can do to get even faster?
m
Ok, so stats indicate data is definitely sorted. It I still feel the latency is quite high. Do you have a lot of group by columns, or have udf in the sql?
l
Copy code
SELECT product_id, SUM(impression_count) as impression_count, SUM(click_count) as click_count, SUM(cost) as spent_total FROM metrics 
WHERE user_id = xx AND serve_time BETWEEN 1641013200 AND 1654092017  
GROUP BY product_id 
LIMIT 100000
that’s how the query looks like
we do have a group by in
product_id
but that’s it no udf
m
If you don’t care about second or milli granularity when querying, then use the lowest you will query (hour/day). That will reduce cardinality of time and then you can use range index
l
we have that already, our resolution is hourly, truncated to second
and we added range index as well
i posted about partitioning do you think that may be that will slow it down? or not really
m
Partitioning helps with throughput. If you are planning for thousands of read qps, I recommend you use that.
l
oh i see yeah so we def won’t have thousands
not for this use case
so the only thing left really to make it faster is probably roll up right? so instead of hourly probably do daily
m
Can you share the response metadata for
Select count(*) from metrics WHERE user_id = xx AND serve_time BETWEEN 1641013200 AND 1654092017
and also the specs of the node on which you are running this?
l
Copy code
"numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 11445,
  "numSegmentsProcessed": 213,
  "numSegmentsMatched": 116,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 68683,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 0,
  "numGroupsLimitReached": false,
  "totalDocs": 50128023764,
  "timeUsedMs": 33,
  "offlineThreadCpuTimeNs": 0,
  "realtimeThreadCpuTimeNs": 0,
  "offlineSystemActivitiesCpuTimeNs": 0,
  "realtimeSystemActivitiesCpuTimeNs": 0,
  "offlineResponseSerializationCpuTimeNs": 0,
  "realtimeResponseSerializationCpuTimeNs": 0,
  "offlineTotalCpuTimeNs": 0,
  "realtimeTotalCpuTimeNs": 0,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 0,
  "numRowsResultSet": 1
2 servers, 4cpu, 32 gigs of ram, 2tb ssd disk, 13gb jvm to the servers heap. 2 brokers, 2cpu, 4 gigs of mem, 2gb jvm to broker heap
m
Ok 33ms looks reasonable
l
It only goes slower when I go group by and what not operations so this is as fast as we can get with this setup yes?
m
You have 116 segments matching for one user. If that can be reduced, it might be reduced furthers. But if this seems to be good enough for your use case, then you can stop further optimizations
l
we are generating 16 segments per day, how do I know if we are generating too many segments (?)
m
What’s the size of each segment
And is it real-time or offline 16 segments
l
offline 16 segments
m
Size
l
around 130mbs
we have kinda a similar thing in production for realtime, cause we have 16 partitions? and it creates segments per partition
m
If data is partitioned then it is fine and only one segment per day is being processed, this is good. If not, then either ensure only one segment processed for query that has
where user_id=xxx
, or increase segment size.
l
but if it spans across days then it will look up more segments right?
the date frame we give it there is from jun till jan so it will def try to hit more than 1 segment
as in this query
Copy code
SELECT product_id, SUM(impression_count) as impression_count, SUM(click_count) as click_count, SUM(cost) as spent_total FROM metrics 
WHERE user_id = xx AND serve_time BETWEEN 1641013200 AND 1654092017  
GROUP BY product_id 
LIMIT 100000
and again, for realtime, i think it does segments per partition per day, in offline, according to pinot data is not partitioned but we do separate the files according to the user_id and we have 16 buckets and then we upload those 16 and the size is around 130mb for each of those
if i half the partitions to 8 then each file can go up to ~250mbs? (for offline)
Copy code
"numServersQueried": 2,
  "numServersResponded": 2,
  "numSegmentsQueried": 4608,
  "numSegmentsProcessed": 227,
  "numSegmentsMatched": 126,
  "numConsumingSegmentsQueried": 0,
  "numDocsScanned": 239295,
  "numEntriesScannedInFilter": 0,
  "numEntriesScannedPostFilter": 1435770,
  "numGroupsLimitReached": false,
  "totalDocs": 22271206985,
  "timeUsedMs": 441,
  "offlineThreadCpuTimeNs": 0,
  "realtimeThreadCpuTimeNs": 0,
  "offlineSystemActivitiesCpuTimeNs": 0,
  "realtimeSystemActivitiesCpuTimeNs": 0,
  "offlineResponseSerializationCpuTimeNs": 0,
  "realtimeResponseSerializationCpuTimeNs": 0,
  "offlineTotalCpuTimeNs": 0,
  "realtimeTotalCpuTimeNs": 0,
  "segmentStatistics": [],
  "traceInfo": {},
  "minConsumingFreshnessTimeMs": 0,
  "numRowsResultSet": 682
what i see is that first time i query the response times are under a second in general
if i run the query again i get 2 digits ms
m
Yeah I think your use case is already optimized
l
only way to get faster is to throw more hardware or mess with our resolution right if not to add hardware
m
This doesn’t look like day granularity
BETWEEN 1641013200 AND 1654092017
l
oh yeye i just grab that with seconds, but our data is hourly right now (we don’t have day granularity)
that’s just the seconds
m
Yeah, I think you now know all you need to for optimizing your use case :-)
l
i checked with my manager and these response times are more than okay considering that we have hourly resolution of our data
can’t see mysql performing like this with data at this level of granularity and with 2 year look ups
m
Say that in a blog please 😀
l
if we have more segments can overall query performance be impacted in the cluster? asking cause it seems like in the clusters where we have ran the migration script older queries are taking longer, (this is not prod)