Luis Fernandez
06/09/2022, 4:05 PMMayank
Ken Krugler
06/09/2022, 4:12 PMLuis Fernandez
06/09/2022, 4:12 PMLuis Fernandez
06/09/2022, 4:12 PMLuis Fernandez
06/09/2022, 4:13 PMLuis Fernandez
06/09/2022, 4:13 PMLuis Fernandez
06/09/2022, 4:14 PM"segmentPartitionConfig": {
"columnPartitionMap": {
"user_id": {
"functionName": "Murmur",
"numPartitions": 16
}
}
},
"aggregateMetrics": false,
"nullHandlingEnabled": false
},
"metadata": {},
"quota": {},
"routing": {
"segmentPrunerTypes": [
"partition"
]
},
Ken Krugler
06/09/2022, 4:14 PMLuis Fernandez
06/09/2022, 4:14 PMLuis Fernandez
06/09/2022, 4:14 PMMayank
Luis Fernandez
06/09/2022, 4:15 PMKen Krugler
06/09/2022, 4:15 PMLuis Fernandez
06/09/2022, 4:15 PMfinal_representation
.repartition(16, col("user_id"))
.sortWithinPartitions("user_id")
.write//.option("maxPartitionBytes", "128MB")
.partitionBy("year", "month", "day")
.mode("append")
.parquet("<gs://pinot_offline/>" + output)
}
Luis Fernandez
06/09/2022, 4:16 PMMayank
Luis Fernandez
06/09/2022, 4:17 PMKen Krugler
06/09/2022, 4:18 PMKen Krugler
06/09/2022, 4:19 PMLuis Fernandez
06/09/2022, 4:20 PMMayank
Sergii Balganbaiev
06/09/2022, 5:54 PMAs an experiment, try to copy Pinot impl of MurmurPartitionFunction and see what happens.I can confirm that this works
Mayank
Luis Fernandez
06/09/2022, 5:56 PMMayank
Sergii Balganbaiev
06/09/2022, 5:57 PMorg.apache.pinot.segment.spi.partition.MurmurPartitionFunction#getPartition
and paste it to my codeLuis Fernandez
06/09/2022, 5:58 PMLuis Fernandez
06/09/2022, 5:58 PMfinal_representation
.repartition(16, col("user_id"))
.sortWithinPartitions("user_id")
.write//.option("maxPartitionBytes", "128MB")
.partitionBy("year", "month", "day")
.mode("append")
.parquet("<gs://pinot_offline/>" + output)
}
this is what i’m doing and i want to sort of give it a custom partitioner that may be what pinot is doingLuis Fernandez
06/09/2022, 6:00 PMSergii Balganbaiev
06/09/2022, 6:08 PMprivate val NUM_PARTITIONS = 8
...
def getPartitionUdf: UserDefinedFunction = {
udf((valueIn: Any) => {
(murmur2(valueIn.toString.getBytes(UTF_8)) & Integer.MAX_VALUE) % NUM_PARTITIONS
})
}
...
val partitionUdf = getPartitionUdf
final_representation
.repartition(NUM_PARTITIONS, partitionUdf(col("column_name")))
...
where murmur2
is function copy-pasted from org.apache.pinot.segment.spi.partition.MurmurPartitionFunction
and jfyi this code example is written on scala(but overall an idea should be clear)Luis Fernandez
06/09/2022, 7:14 PMLuis Fernandez
06/16/2022, 8:59 PMNeha Pawar