Hi! I’m evaluating Pinot for the following use cas...
# general
a
Hi! I’m evaluating Pinot for the following use case and want to know if it’s a good fit, or any best practices to help achieve it. • Ingest events for ~1B total users at ~100k/second • Run aggregation queries on events filtered on individual user IDs at ~10k/second, each query completing in < 100ms What I understand is that the data is organized primarily by time (segments) and secondarily (within a segment) by indexes. In this case, I tried sorting by user ID. To query for a particular user ID, it seems that each segment must be queried, since the data is not consolidated by user. The runtime would be O(s log n) where s is the number of segments in a particular timeframe and n is the number of events per segment. Thus, it seems that Pinot may not scale when there are tens/hundreds of thousands of segments and may not be a good fit here. However, this use case seems similar to the use cases at Linkedin, such as the “who’s viewed your profile” feature, which also would operate on events for individual users. Is my understanding correct, and is there anything I’m missing here? Would appreciate any thoughts or resources you could point me to. Thanks!
m
Hello, if the query is for a user, you can partition the data on user, and achieve very high scalability The numbers you quoted should be able easily achievable In fact we have very similar use cases we are powering using Pinot at LinkedIn (
a
Thanks! I am reading up on partitioning. So the partitioning must be done by Kafka, and the settings in Pinot must match - e.g. number of partitions and partition function? What happens when you need to repartition the data? Are there any best practices for the number of partitions?
m
Yes partitioning must be done in kafka as well as offline ingestion. By repartitioning, do you mean increase number of partitions? That is supported.
If you mean change the partition function, it may have latency impact and we don't recommend that.
Please also read up on replica groups in pinot, that also helps with scaling.
BTW, stay tuned for our virtual meetup (date coming soon) where we will also talk about this.
k
+1 to using partitioning and replica group placement etc
message has been deleted
@Andrew First you can see how the latency improved with partition aware + replica group implementation
a
Thanks! this is very helpful info. yes, i mean increase the # of partitions, not change the function. would you need to increase the partitions in Pinot first, and then Kafka - since they wouldn’t be able to be done at the same time.
k
pinot does not have concept of partition as such, it just derives from Kafka partitions
each segment metadata is self describing
• total partitions, • hashing function • partition Ids
so one segment can be (100, MOD, 5) and another one can say (110, MOD, [10,11,15])
a
i see, so you don’t need to specify any of this in the config, e.g.:
Copy code
"segmentPartitionConfig": {
      "columnPartitionMap": {
        "memberId": {
          "functionName": "Modulo",
          "numPartitions": 4
        }
      }
k
I think thats needed when you generate batch segments
a
got it, thanks! will play around with this for a bit
k
in case of Kafka, we derive that automatically, I might be wrong here but that was our initial thinking
in any case, you get the idea
a
the partition field must be an integer? Since I partitioned on UserID which is a String, and I try to query on UserID, e.g.
select * from events where userId = '10000070d3f29ba15aac40b1' limit 10
I get:
Copy code
ProcessingException(errorCode:450, message:InternalError:
java.io.IOException: Failed : HTTP error code : 500
In the broker logs:
Copy code
java.lang.NumberFormatException: For input string: "15380218d3181aa3dc2d3c05"
	at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) ~[?:1.8.0_265]
	at java.lang.Integer.parseInt(Integer.java:580) ~[?:1.8.0_265]
	at java.lang.Integer.parseInt(Integer.java:615) ~[?:1.8.0_265]
	at org.apache.pinot.core.data.partition.ModuloPartitionFunction.getPartition(ModuloPartitionFunction.java:56) ~[pinot-all-0.5.0-SNAPSHOT-jar-with-dependencies.jar:0.5.0-SNAPSHOT-1d4d47adfe7abf0c3ed8a3a14929de084e979968]
m
I think you need to config the partition function
a
I configured it this way:
Copy code
"segmentPartitionConfig": {
          "columnPartitionMap": {
            "userId": {
              "functionName": "Modulo",
              "numPartitions": 128
            }
          }
        },
m
We recommend MurmurPartitionFunction
Modulo needs numeric I think
👍 1
Also for the scale you mentioned, we are using 32 partitions at lnkd.
a
thanks!
m
correction, 64 partitions for 100k events per second (with 4 dimensions and one metric)