I am working with Pinot 0.11.0 and Spark 3.2. I am...
# troubleshooting
e
I am working with Pinot 0.11.0 and Spark 3.2. I am doing spark ingestion job and getting this error:
Copy code
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.240.0.12 executor 1): com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 409, "{"error":{"code":"PathAlreadyExists","message":"The specified path already exists.\nRequestId:2afa0318-501f-0004-38aa-d4c373000000\nTime:2022-09-30T08:57:09.6163232Z"}}"
In sparkIngestionJobSpec.yaml file I have outputDirUri set to:
outputDirUri='<adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data/spireStatsV2/>'
In controller configurations:
controller.data.dir=<adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data>
I can see that once I started spark job after some time, it created segment file spireStatsV2_batch.tar.gz in
<adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data/spireStatsV2/event_date=2022-08-20/event_type=other/>
. I imagine that same spark job tries to make a file with the same name on the same path and then it fails. How could I fix it?
sparkIngestionJobSpec.yml:
Copy code
executionFrameworkSpec:
  name: 'spark'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner'
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner'

  extraConfigs:

jobType: SegmentCreationAndTarPush

inputDirURI: '<adl2://fs@ac.dfs.core.windows.net/qa/silver_onedayother/spire/>'
includeFileNamePattern: 'glob:**/*.parquet'

outputDirURI: '<adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data/spireStatsV2/>'
# outputDirURI: 'examples/batch/airlineStats/output'

overwriteOutput: true
pinotFSSpecs:
    - scheme: adl2
      className: org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
      configs:
        accountName: 'ac'
        accessKey: '.'
        fileSystemName: 'fs'

recordReaderSpec:
  dataFormat: 'parquet'
  className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader'

tableSpec:
  tableName: 'spireStatsV2'
  schemaURI: '<http://20.93.55.246:9000/tables/spireStatsV2/schema>'
  tableConfigURI: '<http://20.93.55.246:9000/tables/spireStatsV2>'

segmentNameGeneratorSpec:
  type: normalizedDate
  configs:
    segment.name.prefix: 'spireStatsV2_batch'
    exclude.sequence.id: true

pinotClusterSpecs:
  - controllerURI: '<http://20.93.55.246:9000>'

pushJobSpec:
  pushParallelism: 2
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
Spark-Submit:
Copy code
/opt/spark/bin/spark-submit \
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
--deploy-mode cluster \
--master <k8s://api> \
--name spark-pinot \
--conf spark.kubernetes.executor.podTemplateFile=/home/user/apache-pinot-0.11.0-bin/executor_pod_template.yaml \
--conf spark.kubernetes.namespace=spark-pinot \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
--conf spark.kubernetes.executor.node.selector.workload=sparkmem \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.container.image=<http://rek8spoc.azurecr.io/spark:p0.11.0-s3.2.2-v6|rek8spoc.azurecr.io/spark:p0.11.0-s3.2.2-v6> \
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=/opt/pinot/plugins" \
--conf "spark.driver.extraClassPath=/opt/pinot/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/pinot-batch-ingestion-spark-3.2-0.11.0-shaded.jar:/opt/pinot/lib/pinot-all-0.11.0-jar-with-dependencies.jar:/opt/pinot/plugins/pinot-file-system/pinot-adls/pinot-adls-0.11.0-shaded.jar:/opt/pinot/plugins/pinot-input-format/pinot-parquet/pinot-parquet-0.11.0-shaded.jar" \
--conf "spark.executor.extraClassPath=/opt/pinot/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/pinot-batch-ingestion-spark-3.2-0.11.0-shaded.jar:/opt/pinot/lib/pinot-all-0.11.0-jar-with-dependencies.jar:/opt/pinot/plugins/pinot-file-system/pinot-adls/pinot-adls-0.11.0-shaded.jar:/opt/pinot/plugins/pinot-input-format/pinot-parquet/pinot-parquet-0.11.0-shaded.jar" \
local:///opt/pinot/lib/pinot-all-0.11.0-jar-with-dependencies.jar -jobSpecFile /opt/pinot/jobs/sparkIngestionJobSpec_spire.yaml
I tried to run the same spark job, but locally instead of k8s. I've got this error now:
Copy code
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (172.29.200.0 executor driver): java.io.IOException: com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 412, "{"error":{"code":"ConditionNotMet","message":"The condition specified using HTTP conditional header(s) is not met.\nRequestId:c486c3dd-301f-0014-69cc-d4f595000000\nTime:2022-09-30T13:01:39.7657613Z"}}"
Now, I am trying to run spark ingestion job on airlineStats example. Spark job is being executed locally. Pinot 0.11.0 is running on k8s cluster and the only thing I edited is deep store for controller.
Copy code
data:
    dir: <adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data>
After running
Copy code
/opt/spark/bin/spark-submit \
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
--master local \
--deploy-mode client \
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" \
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/pinot-batch-ingestion-spark-3.2-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-adls/pinot-adls-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-avro/pinot-avro-${PINOT_VERSION}-shaded.jar" \
--conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/pinot-batch-ingestion-spark-3.2-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-adls/pinot-adls-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-avro/pinot-avro-${PINOT_VERSION}-shaded.jar" \
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/sparkIngestionJobSpec.yaml
I am getting this error:
Copy code
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 31) (172.27.12.40 executor driver): java.lang.RuntimeException: org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 2 attempts
        at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner$1.call(SparkSegmentTarPushJobRunner.java:125)
        at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner$1.call(SparkSegmentTarPushJobRunner.java:112)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
        at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 2 attempts
        at org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:65)
        at org.apache.pinot.segment.local.utils.SegmentPushUtils.pushSegments(SegmentPushUtils.java:127)
        at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner$1.call(SparkSegmentTarPushJobRunner.java:122)
        ... 17 more
While running spark ingestion job, I can see these errors popping up in controller logs:
Copy code
Exception while uploading segment: com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 412, "{"error":{"code":"ConditionNotMet","message":"The condition specified using HTTP conditional header(s) is not met.\nRequestId:e7d2fdd8-a01f-001a-1c28-d7199e000000\nTime:2022-10-03T13:03:20.6602451Z"}}"
java.io.IOException: com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 412, "{"error":{"code":"ConditionNotMet","message":"The condition specified using HTTP conditional header(s) is not met.\nRequestId:e7d2fdd8-a01f-001a-1c28-d7199e000000\nTime:2022-10-03T13:03:20.6602451Z"}}"
        at org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS.copyInputStreamToDst(ADLSGen2PinotFS.java:654) ~[pinot-adls-0.12.0-SNAPSHOT-shaded.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
        at org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS.copyFromLocalFile(ADLSGen2PinotFS.java:511) ~[pinot-adls-0.12.0-SNAPSHOT-shaded.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
        at org.apache.pinot.controller.api.upload.ZKOperator.copyFromSegmentFileToDeepStore(ZKOperator.java:337) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
        at org.apache.pinot.controller.api.upload.ZKOperator.copySegmentToDeepStore(ZKOperator.java:328) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
        at org.apache.pinot.controller.api.upload.ZKOperator.processNewSegment(ZKOperator.java:281) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
        at org.apache.pinot.controller.api.upload.ZKOperator.completeSegmentOperations(ZKOperator.java:82) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
        at org.apache.pinot.controller.api.resources.PinotSegmentUploadDownloadRestletResource.uploadSegment(PinotSegmentUploadDownloadRestletResource.java:365) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
        at org.apache.pinot.controller.api.resources.PinotSegmentUploadDownloadRestletResource.uploadSegmentAsMultiPartV2(PinotSegmentUploadDownloadRestletResource.java:605) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
m
@Kartik Khare ^^
k
For the first error, can you try with this?
Copy code
segmentNameGeneratorSpec:
  type: normalizedDate
  configs:
    segment.name.prefix: 'spireStatsV2_batch'
    exclude.sequence.id: true
    append.uuid.to.segment.name: true
For the second case, it is able to create the segments successfully but failing when pushing the segments. Seems like some AzureFS issue. I will take a look at the plugin and update you