hey my friends question, has anyone of you gotten ...
# troubleshooting
l
hey my friends question, has anyone of you gotten spark to partition your data and then upload it to pinot successfully? I’m trying to get spark to partition my data, but, in pinot data keeps on appearing not partitioned
m
Tagging @Grace Lu @Ken Krugler @Kulbir Nijjer for reach.
k
We use Flink, so can’t directly answer your question. But it would be helpful if you explain exactly what you mean by partitioning. Is this just at the segment level, or are you also talking about higher level Pinot partitioning (https://docs.pinot.apache.org/operators/operating-pinot/tuning/routing#partitioning)?
l
at the segment level when ingesting offline data, i’m trying to get pinot to recognize that data is partitioned but no luck
exactly what you linked to
i’m trying to get that to work
with data that’s coming from spark ingested thru the standalone job
Copy code
"segmentPartitionConfig": {
        "columnPartitionMap": {
          "user_id": {
            "functionName": "Murmur",
            "numPartitions": 16
          }
        }
      },
      "aggregateMetrics": false,
      "nullHandlingEnabled": false
    },
    "metadata": {},
    "quota": {},
    "routing": {
      "segmentPrunerTypes": [
        "partition"
      ]
    },
k
Well, you have to make sure your Spark job partitioned the data into segments with the same approach as what Pinot is configured to expect.
l
this is what we have in the offline table configs regarding that
right exactly
m
How did you ensure that the implementation of the Murmur function matches on both sides? Just name is not enough
l
yea i think that’s the problem we are having
k
We do this in Flink using a custom partitioner and then process the partitioned results in a custom operator that writes out per-segment CSV files, which we then turn into segments (and store in HDFS) using a Hadoop map-reduce job.
l
Copy code
final_representation
      .repartition(16, col("user_id"))
      .sortWithinPartitions("user_id")
      .write//.option("maxPartitionBytes", "128MB")
      .partitionBy("year", "month", "day")
      .mode("append")
      .parquet("<gs://pinot_offline/>" + output)
  }
that’s the code we have and i’m pretty sure that doesn’t translate to what the config we have in the offline table
m
This doesn’t specify to use Murmur partition function, or even define it.
l
there’s also murmur3 and murmur2 i think pinot uses same one as kafka which is mumur2 yes?
k
Spark dataset repartition uses the hash of the raw bytes, I believe.
If so, then no surprises why that partitioning doesn’t match Pinot’s partitioning.
l
i tried all the partition algorithms pinot uses and none of them got it to be partitioned so then i was like ok this is def spark
m
As an experiment, try to copy Pinot impl of MurmurPartitionFunction and see what happens.
1
s
As an experiment, try to copy Pinot impl of MurmurPartitionFunction and see what happens.
I can confirm that this works
🙌 2
m
We should probably provide that impl as a standalone jar for clients to include.
l
@Sergii Balganbaiev what do you mean by that what did you do 😄
m
Lifted Pinot’s implementation, and used it on Spark side
s
I just took method
org.apache.pinot.segment.spi.partition.MurmurPartitionFunction#getPartition
and paste it to my code
l
can u share what do you do in spark that’s the main thing i’m having trouble with
Copy code
final_representation
      .repartition(16, col("user_id"))
      .sortWithinPartitions("user_id")
      .write//.option("maxPartitionBytes", "128MB")
      .partitionBy("year", "month", "day")
      .mode("append")
      .parquet("<gs://pinot_offline/>" + output)
  }
this is what i’m doing and i want to sort of give it a custom partitioner that may be what pinot is doing
but that .repartition thing is doing its own thing lol
s
should be smth like that:
Copy code
private val NUM_PARTITIONS = 8
...
def getPartitionUdf: UserDefinedFunction = {
	udf((valueIn: Any) => {
	  (murmur2(valueIn.toString.getBytes(UTF_8)) & Integer.MAX_VALUE) % NUM_PARTITIONS
	})
}
...
val partitionUdf = getPartitionUdf

final_representation
	.repartition(NUM_PARTITIONS, partitionUdf(col("column_name")))
	...
where
murmur2
is function copy-pasted from
org.apache.pinot.segment.spi.partition.MurmurPartitionFunction
and jfyi this code example is written on scala(but overall an idea should be clear)
l
thank you so much Sergii let me try this out
👍 1
I wanna thank @Sergii Balganbaiev for all the help we have finally got this to work in dev and we will ingesting in prod soon, partitioning helped our response times a great deal 🙂 and also thank you to you all for your input love the support of the community.
❤️ 4
n
GH issue to followup so we dont lose this: https://github.com/apache/pinot/issues/8887
👍 1