Hey everyone , we have issues with the performance...
# pinot-perf-tuning
s
Hey everyone , we have issues with the performance specifically using the EMR for ingestion with spark3 Initially had issue with the file size, anything greater than 13 mb(parquet files) resulted in a 'No space left on device' error (segmentCreationJobParallelism = 1) so we reduced file size to under 5mb Now able to run the ingestion with segmentCreationJobParallelism = 4 but time is @25 sec/segment. questions #1) Is there a way to avoid the memory error when using parquet files ? #2) Is there any config change required in emr for increasing the speed of ingestion ? #3) Whats the ideal cluster size for EMR with files(parquet) upto 100 mb Note - Ingestion is at @10sec/segment when using my laptop (m3 processor,18gb RAM) More Details pinot version 1.0.0 EMR (Hadoop 3.3.6, Hive 3.1.3, JupyterEnterpriseGateway 2.6.0, Livy 0.8.0, Spark 3.5.0) Core : "m5.12xlarge",gp2,size-64gb Primary : "m5.2xlarge",gp2,size-64gb
k
m5.2xlarge is EBS-Only storage, not sure how I’d configure that w/EMR to ensure the segment building job has sufficient disk space. I assume this works the same as the Hadoop map-reduce job I used to run, so segments are built using local disk and then pushed to the Pinot controller or to S3/HDFS if using metadata push. You might want to use an
m5d.2xlarge
instance, which has an SSD.
x
segment build job could specify the temp directory, which you could mount an EBS and point working directory to that
Copy code
//Get staging directory for temporary output pinot segments
    String stagingDir = _spec.getExecutionFrameworkSpec().getExtraConfigs().get(STAGING_DIR);
    URI stagingDirURI = null;
    if (stagingDir != null) {
      stagingDirURI = URI.create(stagingDir);
      if (stagingDirURI.getScheme() == null) {
        stagingDirURI = new File(stagingDir).toURI();
      }
      if (!outputDirURI.getScheme().equals(stagingDirURI.getScheme())) {
        throw new RuntimeException(String
            .format("The scheme of staging directory URI [%s] and output directory URI [%s] has to be same.",
                stagingDirURI, outputDirURI));
      }
      outputDirFS.mkdir(stagingDirURI);
    }
sample job spec:
Copy code
# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:

  # name: execution framework name
  name: 'spark'

  # Class to use for segment generation and different push types.
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'

  # extraConfigs: extra configs for execution framework.
  extraConfigs:

    # stagingDir is used in distributed filesystem to host all the segments then move this directory entirely to output directory.
    stagingDir: examples/batch/airlineStats/staging
k
Just note that EBS performance is going to be a lot worse than an SSD, so if you’re trying to optimize segment generation performance, I think an EC2 instance type with SSDs will wind up being a win for you.
x
+1, ensure you get the NVME SSD configured correctly.