Hi :wave: , I have some confusion about offline pa...
# general
s
Hi 👋 , I have some confusion about offline partition-based segment pruning. My understanding is: 1. In order to use partition-based segmentPruner for offline segment data, the data source that is ingested by Pinot must already be partitioned by the desired partition column. ex
<s3://bucket/metrics/country=US/files.parquet>
2. From this thread, it seems there is no way currently to make pinot aware of columns associated with partition filepath metadata. i.e. in above example, pinot table cannot contain a
country
column. Am I understanding that correctly? If so, how does partition-based segment pruning help in this case if the partition column cannot be part of the query issued to pinot?
m
@Scott deRegt typically, the partitioning here for partition within the folder. Typically pattern I see is that folders are time partitioned (say daily). And within each day, you can further partition by another dimension (say country).
s
Cool, yeah we are looking to do exactly that. We started with
time
based pruner and are looking to evaluate configuring both
time
+
partition
based pruners. Currently we have something like
<s3://bucket/metrics/ds=2022-07-13/files.parquet>
. For
ds
partition, upstream to pinot ingestion, we duplicated the
ds
column as a new column,
date_str
, in the parquet file since we appear to lose access to the
ds
column on pinot-side if it only exists in partition filepath metadata. Would I need to do something similar for
country
in this example? i.e. duplicate the
country
column to something like
_country
, partition the parquet file upstream of pinot ingestion by
_country
, so that
country
is still available in the parquet file itself? Like
<s3://bucket/metrics/ds=2022-07-13/_country=US/files.parquet>
- then configure
time
based pruner on
date_str
+
partition
based pruner on
country
?
m
You don’t need to create sub folder for
country
. Within the
ds
folder, you need to ensure that individual files contain individual partitions. For example, you may pick 64 partitions, so you can do something like
murmur(country) % 64
and create one file for each of those 64 partitions.
s
Ok, that is making more sense. So if the
partition
column we care about is actually a high cardinality
bigint
(
team_id
), does it make sense to do the following? • Add a derived column,
team_bucket = team_id % n
. • Using sparksql, I think it would be easiest to add a
partition
column on
team_bucket
such that path is
<s3://bucket/metrics/ds=2022-07-13/team_bucket=n/><files.parquet>
. • Use a pinot config like:
Copy code
"segmentPartitionConfig": {
        "columnPartitionMap": {
          "team_id": {
            "functionName": "Modulo",
            "numPartitions": n
          }
        }
      },
"routing": {
      "segmentPrunerTypes": [
        "time", "partition"
      ]
    },
m
You don’t need to create a derived column. You can have all parquet files for the day in
<s3://bucket/metrics/ds=2022-07-13>
. You just need need to ensure that each file has a single partition
murmur(team_id) % n
. And then specify the partition config to use Murmur (not modulo)
Also, need to ensure that the murmur implementation in Pinot matches the implementation on spark side (name doesn’t guarantee same impl): https://github.com/apache/pinot/blob/21632dadb8cd2d8b77aec523a758d73a64f70b07/pino[…]apache/pinot/segment/spi/partition/MurmurPartitionFunction.java
a
General Question: Could I use a custom partitioner for batch and/or real time data?
m
Yes. Only requirement is that it’s implementation should be available in Pinot.
s
@Mayank do you have any sample scala code for the murmur hash partitioning of source data files?
m
I don't, but I do remember seeing a scala implementation in Kafka a long while ago, you may want to check there
n
Hi @Mayank 👋 I'll be working with @Scott deRegt to have both time based & partition based segment pruning for our Pinot tables. I was reading through this thread and wanted to get some guidance around the
partition function
&
numPartitions
. Based on this doc, I saw that
Pinot currently supports Modulo, Murmur, ByteArray and HashCode hash functions
. Any guidance around when to choose which function & also, your recommendation to use Murmur (not modulo)?
l
another question, if you have partitioning do you need to add this to the configs (?)
Copy code
"routing": {
      "segmentPrunerTypes": [
        "time", "partition"
      ]
    },
this always confuses me… do we need to set this up (?)
n
@Luis Fernandez Hi Luis! We haven't done this ^^ yet but I guess we'll need this config to have both time based & partition based segment pruning. Reg. this thread - any reason why/how you went with murmur partitioning function & num of partitions? Would love to see a sample code of your source data partitioning logic as well (which aligns with the corresponding pinot partitioning function) 🙇‍♀️
@Mayank Just wanted to check your thoughts 🙇‍♀️
m
I’d recommend murmur over modulo for sure (better distribution). Also, iirc, its default implementation matches Kafka partitioning impl.
thankyou 1
n
Based on this doc, I saw that
Pinot currently supports Modulo, Murmur, ByteArray and HashCode hash functions
. Any guidance around when to choose which function & the num of partitions? Also, any sample code for the corresponding murmur like partitioning at the source will be very helpful (we are planning to partition using spark sql at the source & it is an offline pinot table).
m
I’d recommend Murmur. You can refer to murmur impl in Pinot code base for sample. Number of partitions depends on your event rate.
thankyou 1
l
@Neeraja Sridharan ohh we did it that way cause our setup is a hybrid table, we consume from Kafka and it's my understanding that Kafka default partitioning is murmur so we just did the same in the Pinot side, as well as the n of partitions
thankyou 1
n
@Mayank @Jackie Based on the pinot code for Murmur, looks like it uses
murmur2
. For partition based segment pruning of our
Pinot offline table
using Murmur (as recommended above in this thread), we'll have to partition our source data accordingly & I was looking at this partitioning scheme using spark sql - `HashPartitioning`: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-HashPartitioning.html. This seems to be using
murmur3
though. Is this still a correct approach for source data partition to align with pinot murmur partition? Appreciate your help with this 🙇‍♀️
m
The implementation has to match on both sides
1
l
Neejara you can just copy and paste the murmur2 algo, it's scala yes?
n
We are planning to use
spark sql
for our source data partitioning and am a little unsure of replicating the MurmurPartitionFunction.java. Spark sql seems to support HashPartitioning & hence the check if murmur2 vs murmur3 are still compatible.
l
I did this when i was using spark:
Copy code
// The partitioning algorithm Pinot uses by default. Copied straight from source so that our segment data is partitioned in the exact same way Pinot does it.
  def murmur2(data: Array[Byte]) = {
    val length = data.length
    val seed = 0x9747b28c
    // 'm' and 'r' are mixing constants generated offline.
    // They're not really 'magic', they just happen to work well.
    val m = 0x5bd1e995
    val r = 24
    // Initialize the hash to a random value
    var h = seed ^ length
    val length4 = length / 4
    for (i <- 0 until length4) {
      val i4 = i * 4
      var k = (data(i4 + 0) & 0xff) + ((data(i4 + 1) & 0xff) << 8) + ((data(i4 + 2) & 0xff) << 16) + ((data(i4 + 3) & 0xff) << 24)
      k *= m
      k ^= k >>> r
      k *= m
      h *= m
      h ^= k
    }
    // Handle the last few bytes of the input array
    // CHECKSTYLE:OFF
    length % 4 match {
      case 3 =>
        h ^= (data((length & ~3) + 2) & 0xff) << 16
        h ^= (data((length & ~3) + 1) & 0xff) << 8
        h ^= data(length & ~3) & 0xff
        h *= m
      case 2 =>
        h ^= (data((length & ~3) + 1) & 0xff) << 8
        h ^= data(length & ~3) & 0xff
        h *= m
      case 1 =>
        h ^= data(length & ~3) & 0xff
        h *= m
      case 0 =>
    }
    // CHECKSTYLE:ON
    h ^= h >>> 13
    h *= m
    h ^= h >>> 15
    h
  }
thankyou 1
thankyou 1
n
@Luis Fernandez Awesome! Thank you so much for sharing these 🙇‍♀️ Appreciate your help here!! I'll review and get back. Couple of questions: • Did Sergii's approach/code (mentioned in the thread you shared) actually work for you? Or was it the spark scala code you shared above that worked. Reason am asking this is to see how we can keep up with the exact Pinot murmur implementation in case it changes in the future to say murmur3 for example. • Did you arrive at
"numPartitions": 16
based on your Kafka source? It is not a real time source in our case. • We can definitely try to use spark scala instead of spark sql to get this implemented. Was your partition column
id
field as well? Ours is an id field of bigint type.
l
• hi Neeraja I was out, yes the exact Sergii approach worked for us. • yes
numPartitions: 16
is based on our kafka source, it’s an offline table, but we just made it the same as whatever the realtime one has. • and yes it’s an id field
thankyou 1
🙇‍♀️ 1
n
@Luis Fernandez Is there a way that you used to check if Pinot is recognizing the source data partition correctly i.e. how to check if Pinot data appears as partitioned? other than the table config that we set
l
we would upload it to pinot (one file) and then we would check the metadata.properties within the server of the segment we uploaded you want to see the file having assigned one partition
"segment.partition.metadata"
check this field you can do it either thru the UI or sshing in to your server.
you should see it properly getting assigned one partition
{\"numPartitions\":8,\"partitions\":[0]
if you see an array with all the partitions ``\"partitions\":[0,1,2,4,5,6,7]` it means partitioning is not happening correctly
n
Thank you 🙇‍♀️ Will try this ^^
@Mayank Regarding your guidance here - 1) is there any recommendation on the max or optimal file size per partition? _Context_: We have time + partition (murmur) based segment pruning implemented & running in production. We are working on a feature enhancement which adds significantly more data to each partition and are having to increase the number of partitions on both source & pinot. 2) Also, we see a scenario where we set number of partitions as say 50 but are able to get only 43 partitioned files in S3 post partitioning the source data with murmur logic (implemented the same way as in pinot). Is this expected and will it impact the Pinot table where it'll be configured for murmur with number of partitions as 50. Appreciate any help regarding this 🙇‍♀️
m
1. For offline, you can get segment size to be anywhere between 300MB to 1GB (1GB if your data size is large). 2. For offline it shouldn’t matter. For realtime, you may have empty partitions in topic. Independently, you should also check on using replica-groups.
n
Thanks @Mayank! Yes, this is for offline table scenario & we have configured replica group as well:
Copy code
"routing": {
      "segmentPrunerTypes": [
        "time", "partition"
      ],
      "instanceSelectorType": "replicaGroup"
    },
@Mayank Just to confirm: if we set 50 as no. of partitions but source data murmur partition logic outputs only 43 files, Pinot table can still be configured for murmur with no. of partitions as 50 - is that correct?
m
Just to ensure we are on the same page, are you saying that murmur(column) % 50 will be [0-42]? If that is consistent on both Pinot and upstream, then it should work.
n
As of now, source data partitioning is configured for 16 partitions & it outputs 16 partitioned files which also aligns with corresponding Pinot table configured for 16. As part of the feature enhancement which leads to more data in a revised table version, we are changing the source data partitioning (with murmur logic) to 50 partitions (instead of 16) which outputs only 43 partitioned files though (in S3 source data location). Question is - what should be the no. of partitions to be set in the corresponding table config in Pinot. Will there be any issue at the Pinot end for partition based segment pruning if it is configured for 50?
@Mayank any thoughts on this ^^ 🙇‍♀️
m
What is not clear to me is why is your partitioning job not producing 50 files
n
@Mayank Not sure yet what is causing that. We did not see this when no. of partitions were set to 16. This is happening with increased no. of partitions like n=50. Here is the sql code that partitions the source data with murmur which is upstream to Pinot:
Copy code
insert overwrite table ${hiveconf:dst_table}
partition (ds='{{ds}}')
select
  <columns list>
from
    ${hiveconf:src_table}
DISTRIBUTE BY murmur2(team_id, ${hiveconf:var.num_buckets})
SORT BY team_id;
where murmur2 is the function used for bucketing (which has the same logic as in Pinot) and num_buckets is set to 50.
For offline pinot tables, is it enough that the partition implementation logic (murmur) is the same on source & pinot or should the number of partitions also match? More details here