Hi team
# general
n
Hi team
I am having trouble figuring out how to horizontal scale pinot for upsert tables. I have setup a upsert enabled realtime table on pinot. I had pushed around 1.6TB of data(in kafka storage size, records are in json format) into a kafka topic with 10 partitions containing around 4B records. This was historical data that I wanted to load to pinot. For the new records flowing via our cdc flow, the ingestion rate would be around 10-15 Million records/day. I did not want to keep kafka partitions too high otherwise we would not have high number of records per day per partition. I had setup pinot cluster with 10 servers with instance type r5a.4xlarge with max heap memory of 64GB. Since upsert table keeps an in-memory map of records we saw huge increase in the heap usage. The heap usage went to around 95% when only half of historical data had been ingested(around 2B records). We haven't even increased replication as of yet. This is the case when the replication is only set to 1. Increasing the replication should further increase the heap usage. We cannot increase partitions later down the line(as it changes primary key to kafka partition mapping) and if we keep number of partitions too high now, the segments created would be too small in size. What is the recommended approach on how we should handle such case??
d
Could the segments be too large, perhaps? I'm not a Pinot developer, but that's what I would start looking at. Also, what's the table index
loadMode
? Is it
MMAP
?
m
How many partitions do you have? Have many pk you expect per partition?
If total ram is 64GB, heap should be a small percentage. @Jackie @Kartik Khare
k
IMO, increasing the number of partitions should be the way to go. Also, can I know what is the primary key you are using?
j
For upsert table to work, all the data for a partition has to be served in a single server. So in order to hold more data (reduce heap usage), we need to either: • Reduce the size of the primary key • Reduce the cardinality of the primary key • Increase the kafka partitions
What is your primary key column? What is its approximate cardinality? Do you use upsert feature to dedup the records or update the records?
n
Could the segments be too large, perhaps?
I have set maximum segment size to be of max 500MB or 4m records.
Also, what's the table index
loadMode
? Is it
MMAP
?
Yes it is MMAP.
How many partitions do you have? Have many pk you expect per partition?
We have currently set 10 partitions. We have almost 4B records so 4B primary keys. I have set this value as 10 because this 4B is complete data since inception. The new record rate would be around 10 million per day so 1million per partition(approx). If we fan out partitions more then the new segment size would be smaller
If total ram is 64GB, heap should be a small percentage.
The instance type in r5a.4x large with 128 gb of ram. Out of this I had allocated 64GB for heap usage
can I know what is the primary key you are using?
In this case primary key is a string type with 14 characters.
What is your primary key column? What is its approximate cardinality? Do you use upsert feature to dedup the records or update the records?
In this case primary key is a string type with 14 characters. Current pk count is around 4b and it would increase with a rate of approx 10-15 million per day We are using upsert Mode as
FULL
as we are rewriting the complete data for the given record. This should replace the pointer in in-memory in the memory to new segment location
Team, any suggestions on how we can reduce the heap memory usage??
k
@Jackie
j
@Nisheet IIUC, you use upsert to achieve data correction, and most of the records are unique. If the 4B keys are evenly distributed, each partition will contain 400M keys (400M map entries in the upsert metadata map), each entry may take roughly 150B (counting the object overhead), so storing these entries can take up to 60GB of heap memory.
There is a on-going project to support upsert using an off-heap data structure, and we plan to deliver it by end of July. Before that, the only way to reduce the heap memory usage is to reduce the keys per partition, i.e. add more partitions to the stream and use more servers
n
Thanks for the info. Can you share the issue/PR where the use of off-heap is being discussed?
Hey @Jackie, Can you share the issue where using off-heap memory for upserts are being tracked
One more question: If I set replication factor to 2 for an upsert table, will it replicate the in-memory primary key data as well to another server or just the data? If it does not replicate primary key data, how will it resume ingestion if one of the server goes down. Does pinot just wait for the server to come up and stalls the ingestion and only serve read queries??
@Jackie @Kartik Khare If one of you can help with the above query ^^
j
Hi @Nisheet, the upsert metadata map is maintained independently on each server, so the upsert metadata will also be replicated on the servers for multiple replicas
In order word, if you increase the replica of the upsert table from 1 to 2, the upsert metadata size will also double
n
👍
j
I've just created a top level issue to track all the upsert enhancements planned: https://github.com/apache/pinot/issues/8760