https://pinot.apache.org/ logo
#troubleshooting
Title
# troubleshooting
l

Luis Fernandez

06/09/2022, 4:05 PM
hey my friends question, has anyone of you gotten spark to partition your data and then upload it to pinot successfully? I’m trying to get spark to partition my data, but, in pinot data keeps on appearing not partitioned
m

Mayank

06/09/2022, 4:08 PM
Tagging @Grace Lu @Ken Krugler @Kulbir Nijjer for reach.
k

Ken Krugler

06/09/2022, 4:12 PM
We use Flink, so can’t directly answer your question. But it would be helpful if you explain exactly what you mean by partitioning. Is this just at the segment level, or are you also talking about higher level Pinot partitioning (https://docs.pinot.apache.org/operators/operating-pinot/tuning/routing#partitioning)?
l

Luis Fernandez

06/09/2022, 4:12 PM
at the segment level when ingesting offline data, i’m trying to get pinot to recognize that data is partitioned but no luck
exactly what you linked to
i’m trying to get that to work
with data that’s coming from spark ingested thru the standalone job
Copy code
"segmentPartitionConfig": {
        "columnPartitionMap": {
          "user_id": {
            "functionName": "Murmur",
            "numPartitions": 16
          }
        }
      },
      "aggregateMetrics": false,
      "nullHandlingEnabled": false
    },
    "metadata": {},
    "quota": {},
    "routing": {
      "segmentPrunerTypes": [
        "partition"
      ]
    },
k

Ken Krugler

06/09/2022, 4:14 PM
Well, you have to make sure your Spark job partitioned the data into segments with the same approach as what Pinot is configured to expect.
l

Luis Fernandez

06/09/2022, 4:14 PM
this is what we have in the offline table configs regarding that
right exactly
m

Mayank

06/09/2022, 4:14 PM
How did you ensure that the implementation of the Murmur function matches on both sides? Just name is not enough
l

Luis Fernandez

06/09/2022, 4:15 PM
yea i think that’s the problem we are having
k

Ken Krugler

06/09/2022, 4:15 PM
We do this in Flink using a custom partitioner and then process the partitioned results in a custom operator that writes out per-segment CSV files, which we then turn into segments (and store in HDFS) using a Hadoop map-reduce job.
l

Luis Fernandez

06/09/2022, 4:15 PM
Copy code
final_representation
      .repartition(16, col("user_id"))
      .sortWithinPartitions("user_id")
      .write//.option("maxPartitionBytes", "128MB")
      .partitionBy("year", "month", "day")
      .mode("append")
      .parquet("<gs://pinot_offline/>" + output)
  }
that’s the code we have and i’m pretty sure that doesn’t translate to what the config we have in the offline table
m

Mayank

06/09/2022, 4:16 PM
This doesn’t specify to use Murmur partition function, or even define it.
l

Luis Fernandez

06/09/2022, 4:17 PM
there’s also murmur3 and murmur2 i think pinot uses same one as kafka which is mumur2 yes?
k

Ken Krugler

06/09/2022, 4:18 PM
Spark dataset repartition uses the hash of the raw bytes, I believe.
If so, then no surprises why that partitioning doesn’t match Pinot’s partitioning.
l

Luis Fernandez

06/09/2022, 4:20 PM
i tried all the partition algorithms pinot uses and none of them got it to be partitioned so then i was like ok this is def spark
m

Mayank

06/09/2022, 4:21 PM
As an experiment, try to copy Pinot impl of MurmurPartitionFunction and see what happens.
1
s

Sergii Balganbaiev

06/09/2022, 5:54 PM
As an experiment, try to copy Pinot impl of MurmurPartitionFunction and see what happens.
I can confirm that this works
🙌 2
m

Mayank

06/09/2022, 5:55 PM
We should probably provide that impl as a standalone jar for clients to include.
l

Luis Fernandez

06/09/2022, 5:56 PM
@Sergii Balganbaiev what do you mean by that what did you do 😄
m

Mayank

06/09/2022, 5:57 PM
Lifted Pinot’s implementation, and used it on Spark side
s

Sergii Balganbaiev

06/09/2022, 5:57 PM
I just took method
org.apache.pinot.segment.spi.partition.MurmurPartitionFunction#getPartition
and paste it to my code
l

Luis Fernandez

06/09/2022, 5:58 PM
can u share what do you do in spark that’s the main thing i’m having trouble with
Copy code
final_representation
      .repartition(16, col("user_id"))
      .sortWithinPartitions("user_id")
      .write//.option("maxPartitionBytes", "128MB")
      .partitionBy("year", "month", "day")
      .mode("append")
      .parquet("<gs://pinot_offline/>" + output)
  }
this is what i’m doing and i want to sort of give it a custom partitioner that may be what pinot is doing
but that .repartition thing is doing its own thing lol
s

Sergii Balganbaiev

06/09/2022, 6:08 PM
should be smth like that:
Copy code
private val NUM_PARTITIONS = 8
...
def getPartitionUdf: UserDefinedFunction = {
	udf((valueIn: Any) => {
	  (murmur2(valueIn.toString.getBytes(UTF_8)) & Integer.MAX_VALUE) % NUM_PARTITIONS
	})
}
...
val partitionUdf = getPartitionUdf

final_representation
	.repartition(NUM_PARTITIONS, partitionUdf(col("column_name")))
	...
where
murmur2
is function copy-pasted from
org.apache.pinot.segment.spi.partition.MurmurPartitionFunction
and jfyi this code example is written on scala(but overall an idea should be clear)
l

Luis Fernandez

06/09/2022, 7:14 PM
thank you so much Sergii let me try this out
👍 1
I wanna thank @Sergii Balganbaiev for all the help we have finally got this to work in dev and we will ingesting in prod soon, partitioning helped our response times a great deal 🙂 and also thank you to you all for your input love the support of the community.
❤️ 4
n

Neha Pawar

06/28/2022, 8:10 PM
GH issue to followup so we dont lose this: https://github.com/apache/pinot/issues/8887
👍 1