Hello everybody! Finally i get my small cluster up...
# general
d
Hello everybody! Finally i get my small cluster up and running, thank you all for the support! 🙂 i am doing a final test to understand if i need to add one more node or not. However, just to make one thing a little bit clearer, i would like to know if we can "organize" data inside a Pinot Server by a specific column. For those of you who know Citus, I am referring to the distribution key for shards. Basically what i am asking is, if we have a specific column that is often used in group by clause, How can we store documents that have the same column (used in group by) on the same server? I think it is an important thing. Because for example, in my custom aggregation func i need to sort the documents of each segment (in
aggregateGroupBySV()
) before working on it (i am trying to do a similar thing that window functions do). I know that a Server has more segments and the documents order in segments could be random.... BUT if i have all the documents of that specific key in the same server i could avoid sorting again everything in
extractFinalResult()
that is called at Broker level. I know there is a
merge()
method used to merge all the results of each segment, if i can do something after that MERGE i can shift all the computation process at the Server level instead of Broker and i think it is an important thing, otherwise the Broker should work with all the results of each Server and then sort+compute (in my case).
n
d
@Neha Pawar ok, so the solution are the partitions. I know Pinot create segments and store segments in Server, in this case the partitions are smaller chunks of the segments? If yes, i need to do one more question, in the doc you linkedme i see:
When the data is partitioned on a dimension, each segment will contain all the rows with the same partition value for a partitioning dimension
that's ok, but does it really means that ALL
memberId = 101
(example on the doc) will be stored in the same server? As i wrote, it is an important thing because working with a kind of window functions i must: (A) see the entire data, (B) sort by a specific column too (C) then compute the logic of my aggregation. If i also need to work with documents that are coming from different server too i must move the login in extractFinalResult() (so in the broker).
so in my case what
merge()
method will receive? partitions data or segments data? and then... is there a way to add logic before sending result to broker? something just after
merge().
n
but does it really means that ALL memberId = 101 (example on the doc) will be stored in the same server
- hmm, i dont think so. one key will be restricted to a segment, but that key can still be on multiple servers.
@Seunghyun @Jackie any suggestions for this?
d
@Neha Pawar hmm, so there is no solution, I must move the logic on the broker, working on extractFinalResult() only. Because as I told you I need the entire sequence of each key to work with the ordered rows.
j
If you use partition-aware replica-group assignment, and config 1 server for each replica-group, then all segments for one partition will always be assigned to a single server
Copy code
"instanceAssignmentConfigMap": {
    "OFFLINE": {
      ...
      "replicaGroupPartitionConfig": {
        "replicaGroupBased": true,
        "numReplicaGroups": 3,
        "numInstancesPerReplicaGroup": 4,
        "numPartitions": 4,
        "numInstancesPerPartition": 1
      }
    }
  },
d
Hmm I need to investigate it more, still a bit confusing to me. Suppose we have many rows with the same memberId are all the rows related to the same memberId in the same server? Why are we talking about replica? I miss something for sure :)
@Jackie ^
j
Let's say you have 3 partitions, each partition has 10 segments
You need to assign the 10 segments for one partition to the same server
In order to do that, you need to use replica-group based assignment, or the segments will be assigned to all the servers
You also want to enable the replica-group based routing so that the broker will only query one server for each partition
Copy code
"routing": {
    "instanceSelectorType": "replicaGroup"
  },
d
Ok, does it slow down queries for some reason? As I wrote I have tree servers so should I create three partitions one for server and then all the segments inside as you told me will have the same memberId. Ok, got it. How can I get all the rows for each memberId before sending the result to the broker?
Is there something that will be called after merge() and before sending it to broker?
n
Jackie, this solution won’t scale right. there can never be more than 1 server in a replica group
j
Wait, I think I missed something
@Damiano You only have 3 servers? How many replicas do you need?
@Neha Pawar You can have multiple replicas, but each server need to hold all the segments for one partition (which is the requirement)
@Damiano There is no
merge()
method, the merge of segment results is performed inside the
CombineOperator
, and then the merged result will be serialized back to broker as a
DataTable
n
you’re suggesting 3 replica groups (and 3 servers total), so that each replica group will have only 1 server. Is that right? Now consider an offline table. What happens if the usecase wants to scale, for which they increase number of servers per replica group to 2. In this case, we again cannot guarantee that segmnts of same column partition will go to just 1 server right?
j
Each replica-group can only have 1 server per the requirement. You can split the data into multiple partitions in order to scale
Replica-group can work on per-partition basis
đź‘Ś 1
d
@Jackie i am referring to the merge() method in my custom aggregation function. That method is called to merge segments results. So i wonder if there is a way to get the complete sequence of rows after that merge (before passing it to the broker)