Edgaras Kryževičius
09/30/2022, 9:10 AMCaused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.240.0.12 executor 1): com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 409, "{"error":{"code":"PathAlreadyExists","message":"The specified path already exists.\nRequestId:2afa0318-501f-0004-38aa-d4c373000000\nTime:2022-09-30T08:57:09.6163232Z"}}"
In sparkIngestionJobSpec.yaml file I have outputDirUri set to: outputDirUri='<adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data/spireStatsV2/>'
In controller configurations:
controller.data.dir=<adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data>
I can see that once I started spark job after some time, it created segment file spireStatsV2_batch.tar.gz in <adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data/spireStatsV2/event_date=2022-08-20/event_type=other/>
. I imagine that same spark job tries to make a file with the same name on the same path and then it fails. How could I fix it?Edgaras Kryževičius
09/30/2022, 9:13 AMexecutionFrameworkSpec:
name: 'spark'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner'
extraConfigs:
jobType: SegmentCreationAndTarPush
inputDirURI: '<adl2://fs@ac.dfs.core.windows.net/qa/silver_onedayother/spire/>'
includeFileNamePattern: 'glob:**/*.parquet'
outputDirURI: '<adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data/spireStatsV2/>'
# outputDirURI: 'examples/batch/airlineStats/output'
overwriteOutput: true
pinotFSSpecs:
- scheme: adl2
className: org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS
configs:
accountName: 'ac'
accessKey: '.'
fileSystemName: 'fs'
recordReaderSpec:
dataFormat: 'parquet'
className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetNativeRecordReader'
tableSpec:
tableName: 'spireStatsV2'
schemaURI: '<http://20.93.55.246:9000/tables/spireStatsV2/schema>'
tableConfigURI: '<http://20.93.55.246:9000/tables/spireStatsV2>'
segmentNameGeneratorSpec:
type: normalizedDate
configs:
segment.name.prefix: 'spireStatsV2_batch'
exclude.sequence.id: true
pinotClusterSpecs:
- controllerURI: '<http://20.93.55.246:9000>'
pushJobSpec:
pushParallelism: 2
pushAttempts: 2
pushRetryIntervalMillis: 1000
Edgaras Kryževičius
09/30/2022, 9:16 AM/opt/spark/bin/spark-submit \
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
--deploy-mode cluster \
--master <k8s://api> \
--name spark-pinot \
--conf spark.kubernetes.executor.podTemplateFile=/home/user/apache-pinot-0.11.0-bin/executor_pod_template.yaml \
--conf spark.kubernetes.namespace=spark-pinot \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark-sa \
--conf spark.kubernetes.executor.node.selector.workload=sparkmem \
--conf spark.executor.instances=1 \
--conf spark.kubernetes.container.image=<http://rek8spoc.azurecr.io/spark:p0.11.0-s3.2.2-v6|rek8spoc.azurecr.io/spark:p0.11.0-s3.2.2-v6> \
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=/opt/pinot/plugins" \
--conf "spark.driver.extraClassPath=/opt/pinot/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/pinot-batch-ingestion-spark-3.2-0.11.0-shaded.jar:/opt/pinot/lib/pinot-all-0.11.0-jar-with-dependencies.jar:/opt/pinot/plugins/pinot-file-system/pinot-adls/pinot-adls-0.11.0-shaded.jar:/opt/pinot/plugins/pinot-input-format/pinot-parquet/pinot-parquet-0.11.0-shaded.jar" \
--conf "spark.executor.extraClassPath=/opt/pinot/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/pinot-batch-ingestion-spark-3.2-0.11.0-shaded.jar:/opt/pinot/lib/pinot-all-0.11.0-jar-with-dependencies.jar:/opt/pinot/plugins/pinot-file-system/pinot-adls/pinot-adls-0.11.0-shaded.jar:/opt/pinot/plugins/pinot-input-format/pinot-parquet/pinot-parquet-0.11.0-shaded.jar" \
local:///opt/pinot/lib/pinot-all-0.11.0-jar-with-dependencies.jar -jobSpecFile /opt/pinot/jobs/sparkIngestionJobSpec_spire.yaml
Edgaras Kryževičius
09/30/2022, 1:03 PMCaused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (172.29.200.0 executor driver): java.io.IOException: com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 412, "{"error":{"code":"ConditionNotMet","message":"The condition specified using HTTP conditional header(s) is not met.\nRequestId:c486c3dd-301f-0014-69cc-d4f595000000\nTime:2022-09-30T13:01:39.7657613Z"}}"
Edgaras Kryževičius
10/03/2022, 12:33 PMdata:
dir: <adl2://fs@ac.dfs.core.windows.net/qa/pinot/controller-data>
After running
/opt/spark/bin/spark-submit \
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
--master local \
--deploy-mode client \
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins" \
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/pinot-batch-ingestion-spark-3.2-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-adls/pinot-adls-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-avro/pinot-avro-${PINOT_VERSION}-shaded.jar" \
--conf "spark.executor.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark-3.2/pinot-batch-ingestion-spark-3.2-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-adls/pinot-adls-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-avro/pinot-avro-${PINOT_VERSION}-shaded.jar" \
local://${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/sparkIngestionJobSpec.yaml
I am getting this error:
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 31) (172.27.12.40 executor driver): java.lang.RuntimeException: org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 2 attempts
at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner$1.call(SparkSegmentTarPushJobRunner.java:125)
at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner$1.call(SparkSegmentTarPushJobRunner.java:112)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2254)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 2 attempts
at org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:65)
at org.apache.pinot.segment.local.utils.SegmentPushUtils.pushSegments(SegmentPushUtils.java:127)
at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner$1.call(SparkSegmentTarPushJobRunner.java:122)
... 17 more
Edgaras Kryževičius
10/03/2022, 1:05 PMException while uploading segment: com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 412, "{"error":{"code":"ConditionNotMet","message":"The condition specified using HTTP conditional header(s) is not met.\nRequestId:e7d2fdd8-a01f-001a-1c28-d7199e000000\nTime:2022-10-03T13:03:20.6602451Z"}}"
java.io.IOException: com.azure.storage.file.datalake.models.DataLakeStorageException: Status code 412, "{"error":{"code":"ConditionNotMet","message":"The condition specified using HTTP conditional header(s) is not met.\nRequestId:e7d2fdd8-a01f-001a-1c28-d7199e000000\nTime:2022-10-03T13:03:20.6602451Z"}}"
at org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS.copyInputStreamToDst(ADLSGen2PinotFS.java:654) ~[pinot-adls-0.12.0-SNAPSHOT-shaded.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
at org.apache.pinot.plugin.filesystem.ADLSGen2PinotFS.copyFromLocalFile(ADLSGen2PinotFS.java:511) ~[pinot-adls-0.12.0-SNAPSHOT-shaded.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
at org.apache.pinot.controller.api.upload.ZKOperator.copyFromSegmentFileToDeepStore(ZKOperator.java:337) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
at org.apache.pinot.controller.api.upload.ZKOperator.copySegmentToDeepStore(ZKOperator.java:328) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
at org.apache.pinot.controller.api.upload.ZKOperator.processNewSegment(ZKOperator.java:281) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
at org.apache.pinot.controller.api.upload.ZKOperator.completeSegmentOperations(ZKOperator.java:82) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
at org.apache.pinot.controller.api.resources.PinotSegmentUploadDownloadRestletResource.uploadSegment(PinotSegmentUploadDownloadRestletResource.java:365) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
at org.apache.pinot.controller.api.resources.PinotSegmentUploadDownloadRestletResource.uploadSegmentAsMultiPartV2(PinotSegmentUploadDownloadRestletResource.java:605) ~[pinot-all-0.12.0-SNAPSHOT-jar-with-dependencies.jar:0.12.0-SNAPSHOT-3f2547195c36937cffd4ed8332d2e691b008ab2c]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
Mayank
Kartik Khare
10/04/2022, 6:14 AMsegmentNameGeneratorSpec:
type: normalizedDate
configs:
segment.name.prefix: 'spireStatsV2_batch'
exclude.sequence.id: true
append.uuid.to.segment.name: true
For the second case, it is able to create the segments successfully but failing when pushing the segments. Seems like some AzureFS issue. I will take a look at the plugin and update you