Hi team, I am trying to build an upsert realtime t...
# getting-started
s
Hi team, I am trying to build an upsert realtime table with hashing function. I have a composite key. Do I have to implement the same hashing function for partitioning in kafka or can I use one individual key out of the composite key for partitioning?
s
Any one of the columns from the composite key can be used to partition the stream. The idea is simply to make sure all records belonging to a given composite key, always reach the same server. Partitioning the stream by any one of the columns from the composite keys achieves that. cc: @Kartik Khare
k
yes, that is correct. Just partitioning by any one of the keys in Kafka should be good enough.
Also, create a good enough number of kafka partitions as it is not possible to increase partitions later in an upsert table
✔️ 1
s
@Kartik Khare @saurabh dubey Thanks for the confirmation. Have implemented the same. But the upsert table takes up too much heap memory. Any way we can move this key map logic for upsert to disk memory?
k
Not for now. That feature is WIP. How many unique keys do you have and how much memory it is taking?
s
i have 5 32 gb servers. it has a total of 70 million records with replication as 2. It has consumed about 30-40% memory in each server
k
Hmm, what is the primary key you are using? Based upon my calculcations the upsert metadata shouldn't take more than 3-4GB total (70 million records * (32 + 20)) I have assumed keys to be of average 32 bytes the value stored is always 20 bytes. You can multiply the result by 2 to account for replication.
s
primary key is a composite key so have used MURMUR3 hashfunction for it
k
In that case, it shouldn't use more than 10GB total since we use 128 bit version of murmur3
s
this is the count of segments per server
the one with most count has used 48% memory and with the least has consumed 17% memory
k
Can you also run the following query on the table and tell me the count
SELECT COUNT(*) FROM table
SELECT COUNT(*) FROM table option(skipUpsert = true)
s
for skipUpsert = 83395586 with upsert = 70766256
k
cool. Now can you tell me the following • Xmx and direct memory values for server processes • The 48% memory is total memory, heap memory or off-heap memory? • The number of partitions in kafka topic
s
have set xmx as 24gb
have 20 partitions in kafka
only server is running in this vm
s
Can you try
Copy code
jmap -histo:live <java pid>
inside the server to check what is the exact memory usage by each class instance? Specifically bytes used by RecordLocation?
k
do a
| head -10
as well. It will be a long list :P
s
jmap isn't working. I am using oracle jdk 11. I guess it's not supported in it
k
does jcmd work?
a
@Sonit Rathi if you are have a composite then why is murmur3 correct ? Asking as we have composite key too and was not able to find an mention of that in the pinot docs