Scott deRegt
07/14/2022, 12:08 AM<s3://bucket/metrics/country=US/files.parquet>
2. From this thread, it seems there is no way currently to make pinot aware of columns associated with partition filepath metadata. i.e. in above example, pinot table cannot contain a country
column.
Am I understanding that correctly? If so, how does partition-based segment pruning help in this case if the partition column cannot be part of the query issued to pinot?Mayank
Scott deRegt
07/14/2022, 2:16 AMtime
based pruner and are looking to evaluate configuring both time
+ partition
based pruners.
Currently we have something like <s3://bucket/metrics/ds=2022-07-13/files.parquet>
. For ds
partition, upstream to pinot ingestion, we duplicated the ds
column as a new column, date_str
, in the parquet file since we appear to lose access to the ds
column on pinot-side if it only exists in partition filepath metadata.
Would I need to do something similar for country
in this example? i.e. duplicate the country
column to something like _country
, partition the parquet file upstream of pinot ingestion by _country
, so that country
is still available in the parquet file itself? Like <s3://bucket/metrics/ds=2022-07-13/_country=US/files.parquet>
- then configure time
based pruner on date_str
+ partition
based pruner on country
?Mayank
country
. Within the ds
folder, you need to ensure that individual files contain individual partitions. For example, you may pick 64 partitions, so you can do something like murmur(country) % 64
and create one file for each of those 64 partitions.Scott deRegt
07/14/2022, 4:49 PMpartition
column we care about is actually a high cardinality bigint
(team_id
), does it make sense to do the following?
• Add a derived column, team_bucket = team_id % n
.
• Using sparksql, I think it would be easiest to add a partition
column on team_bucket
such that path is <s3://bucket/metrics/ds=2022-07-13/team_bucket=n/><files.parquet>
.
• Use a pinot config like:
"segmentPartitionConfig": {
"columnPartitionMap": {
"team_id": {
"functionName": "Modulo",
"numPartitions": n
}
}
},
"routing": {
"segmentPrunerTypes": [
"time", "partition"
]
},
Mayank
<s3://bucket/metrics/ds=2022-07-13>
. You just need need to ensure that each file has a single partition murmur(team_id) % n
. And then specify the partition config to use Murmur (not modulo)Mayank
Awadhesh Tanwar
07/15/2022, 6:08 AMMayank
suraj sheshadri
07/28/2022, 4:22 PMMayank
Neeraja Sridharan
08/10/2022, 6:04 PMpartition function
& numPartitions
. Based on this doc, I saw that Pinot currently supports Modulo, Murmur, ByteArray and HashCode hash functions
. Any guidance around when to choose which function & also, your recommendation to use Murmur (not modulo)?Luis Fernandez
08/10/2022, 6:15 PM"routing": {
"segmentPrunerTypes": [
"time", "partition"
]
},
this always confuses me… do we need to set this up (?)Neeraja Sridharan
08/11/2022, 1:25 AMNeeraja Sridharan
08/11/2022, 3:05 AMMayank
Neeraja Sridharan
08/11/2022, 3:20 AMPinot currently supports Modulo, Murmur, ByteArray and HashCode hash functions
. Any guidance around when to choose which function & the num of partitions? Also, any sample code for the corresponding murmur like partitioning at the source will be very helpful (we are planning to partition using spark sql at the source & it is an offline pinot table).Mayank
Luis Fernandez
08/11/2022, 2:22 PMNeeraja Sridharan
08/11/2022, 7:44 PMmurmur2
. For partition based segment pruning of our Pinot offline table
using Murmur (as recommended above in this thread), we'll have to partition our source data accordingly & I was looking at this partitioning scheme using spark sql - `HashPartitioning`: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-HashPartitioning.html. This seems to be using murmur3
though. Is this still a correct approach for source data partition to align with pinot murmur partition?
Appreciate your help with this 🙇♀️Mayank
Luis Fernandez
08/11/2022, 7:48 PMNeeraja Sridharan
08/11/2022, 8:03 PMspark sql
for our source data partitioning and am a little unsure of replicating the MurmurPartitionFunction.java. Spark sql seems to support HashPartitioning & hence the check if murmur2 vs murmur3 are still compatible.Luis Fernandez
08/11/2022, 8:07 PM// The partitioning algorithm Pinot uses by default. Copied straight from source so that our segment data is partitioned in the exact same way Pinot does it.
def murmur2(data: Array[Byte]) = {
val length = data.length
val seed = 0x9747b28c
// 'm' and 'r' are mixing constants generated offline.
// They're not really 'magic', they just happen to work well.
val m = 0x5bd1e995
val r = 24
// Initialize the hash to a random value
var h = seed ^ length
val length4 = length / 4
for (i <- 0 until length4) {
val i4 = i * 4
var k = (data(i4 + 0) & 0xff) + ((data(i4 + 1) & 0xff) << 8) + ((data(i4 + 2) & 0xff) << 16) + ((data(i4 + 3) & 0xff) << 24)
k *= m
k ^= k >>> r
k *= m
h *= m
h ^= k
}
// Handle the last few bytes of the input array
// CHECKSTYLE:OFF
length % 4 match {
case 3 =>
h ^= (data((length & ~3) + 2) & 0xff) << 16
h ^= (data((length & ~3) + 1) & 0xff) << 8
h ^= data(length & ~3) & 0xff
h *= m
case 2 =>
h ^= (data((length & ~3) + 1) & 0xff) << 8
h ^= data(length & ~3) & 0xff
h *= m
case 1 =>
h ^= data(length & ~3) & 0xff
h *= m
case 0 =>
}
// CHECKSTYLE:ON
h ^= h >>> 13
h *= m
h ^= h >>> 15
h
}
Luis Fernandez
08/11/2022, 8:08 PMNeeraja Sridharan
08/11/2022, 8:31 PM"numPartitions": 16
based on your Kafka source? It is not a real time source in our case.
• We can definitely try to use spark scala instead of spark sql to get this implemented. Was your partition column id
field as well? Ours is an id field of bigint type.Luis Fernandez
08/16/2022, 2:03 PMnumPartitions: 16
is based on our kafka source, it’s an offline table, but we just made it the same as whatever the realtime one has.
• and yes it’s an id fieldNeeraja Sridharan
08/17/2022, 9:21 PMLuis Fernandez
08/17/2022, 9:39 PM"segment.partition.metadata"
check this field you can do it either thru the UI or sshing in to your server.Luis Fernandez
08/17/2022, 9:40 PM{\"numPartitions\":8,\"partitions\":[0]
if you see an array with all the partitions ``\"partitions\":[0,1,2,4,5,6,7]` it means partitioning is not happening correctlyNeeraja Sridharan
08/17/2022, 9:54 PMNeeraja Sridharan
12/07/2022, 3:35 AMMayank
Neeraja Sridharan
12/07/2022, 3:58 AM"routing": {
"segmentPrunerTypes": [
"time", "partition"
],
"instanceSelectorType": "replicaGroup"
},
Neeraja Sridharan
12/07/2022, 4:01 AMMayank
Neeraja Sridharan
12/07/2022, 8:34 PMNeeraja Sridharan
12/07/2022, 9:24 PMMayank
Neeraja Sridharan
12/08/2022, 2:40 AMinsert overwrite table ${hiveconf:dst_table}
partition (ds='{{ds}}')
select
<columns list>
from
${hiveconf:src_table}
DISTRIBUTE BY murmur2(team_id, ${hiveconf:var.num_buckets})
SORT BY team_id;
where murmur2 is the function used for bucketing (which has the same logic as in Pinot) and num_buckets is set to 50.Neeraja Sridharan
12/08/2022, 4:12 AM