Hi, I got this issue when submit spark-job to inge...
# troubleshooting
p
Hi, I got this issue when submit spark-job to ingest batch file.
Copy code
21/04/20 03:03:42 ERROR org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand: Got exception to kick off standalone data ingestion job -
java.lang.RuntimeException: Caught exception during running - org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner
	at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:144)
	at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.runIngestionJob(IngestionJobLauncher.java:117)
	at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:132)
	at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.main(LaunchDataIngestionJobCommand.java:67)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.nio.file.FileSystemNotFoundException: Provider "gs" not installed
	at java.nio.file.Paths.get(Paths.java:147)
	at org.apache.pinot.plugin.filesystem.GcsPinotFS.copy(GcsPinotFS.java:262)
	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner.run(SparkSegmentGenerationJobRunner.java:344)
	at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:142)
	... 15 more
after some deep-dive, I think it’s the Scala filesystem only 2 providers: file, jar
base on some explain & discuss : https://stackoverflow.com/questions/39500445/filesystem-provider-disappearing-in-spark I suggest move
java.nio.file.Path
to
org.apache.hadoop.fs.Path
Have any idea ?
j
@Xiang Fu ^^
x
can you try to add
Copy code
google-cloud-nio
dependency into pinot-gcs pom file?
hmm, seems there is already one in pom:
Copy code
<dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-nio</artifactId>
      <version>0.120.0-alpha</version>
    </dependency>
p
yup, in pom already has dependency
maybe we can modify this code to directly use google cloudstorage api
does this code work on your standalone application
or test
but only failed on spark?
p
yup.
this code work on standalone application
but fail on spark cluster on dataproc
x
hmm
on spark cluster, can you try to put the gcs plugin to the classpath
p
already tried. 😄
x
I assume this is java 8?
or 11?
p
java 8
x
another way is to shade pinot-ingestion-spark and pinot-gcs together into one jar
p
😄 another way to fix, remove
java.nio.file.Paths
in gcs-pinot, then custom another Paths.get function ?
x
right
then you need to implement the relatize function there
I feel better to just use native google gcs lib
p
well, just another error ini sendSegmentUri pharse.
Copy code
Caused by: java.lang.IllegalStateException: PinotFS for scheme: gs has not been initialized
	at shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:518)
	at org.apache.pinot.spi.filesystem.PinotFSFactory.create(PinotFSFactory.java:80)
	at org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils.sendSegmentUris(SegmentPushUtils.java:158)
	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:122)
	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:117)
are you have any idea how to fix it ?
Error logs
x
It’s missing fs init in the code: https://github.com/apache/incubator-pinot/pull/6819
merged the above pr, can you try it again ?
p
hmm, i try it again but still errors.
Untitled
i guess i already find issue, i forget add dir.plugins property in spark job. But i’m found another issue when packing plugins to workers-node.
Untitled
Copy code
21/04/22 07:25:48 ERROR org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner: Failed to tar plugins directory
java.io.IOException: Request to write '4096' bytes exceeds size in header of '12453302' bytes for entry './pinot-plugins.tar.gz'
	at org.apache.commons.compress.archivers.tar.TarArchiveOutputStream.write(TarArchiveOutputStream.java:449)
x
hmm, seems like an issue of tar’gz everything in plugin dir to a file
I will take a look
are you using default plugin dir or you’ve put more files into that
do you set this :
plugins.dir
? in your java cmd?
I was testing this util locally:
Copy code
public static void main(String[] args) {
    try {
      TarGzCompressionUtils.createTarGzFile(
          new File("/Users/xiangfu/workspace/pinot-dev/pinot-distribution/target/apache-pinot-incubating-0.8.0-SNAPSHOT-bin/apache-pinot-incubating-0.8.0-SNAPSHOT-bin/plugins"),
          new File("/tmp/plugin.tar.gz"));
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
it works and cannot reproduce the issue:
p
i’m prettery sure add env plugins.dir Log has INFO
Copy code
21/04/23 02:33:34 INFO org.apache.pinot.spi.plugin.PluginManager: Plugins root dir is [./]
21/04/23 02:33:34 INFO org.apache.pinot.spi.plugin.PluginManager: Trying to load plugins: [[pinot-gcs]]
Full log:
Copy code
:: retrieving :: org.apache.spark#spark-submit-parent-adf0fd1c-d000-4782-8499-d41f1396e726
	confs: [default]
	0 artifacts copied, 9 already retrieved (0kB/17ms)
21/04/23 02:33:34 INFO org.apache.pinot.spi.plugin.PluginManager: Plugins root dir is [./]
21/04/23 02:33:34 INFO org.apache.pinot.spi.plugin.PluginManager: Trying to load plugins: [[pinot-gcs]]
21/04/23 02:33:35 INFO org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher: SegmentGenerationJobSpec:
!!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
authToken: null
cleanUpOutputDir: false
excludeFileNamePattern: null
executionFrameworkSpec:
  extraConfigs: {stagingDir: '<gs://bucket_name/tmp/>'}
  name: spark
  segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner
  segmentMetadataPushJobRunnerClassName: null
  segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner
  segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner
failOnEmptySegment: false
includeFileNamePattern: glob:**/*.avro
inputDirURI: <gs://bucket_name/rule_logs/>
jobType: SegmentCreationAndUriPush
outputDirURI: <gs://bucket_name/data/>
overwriteOutput: true
pinotClusterSpecs:
- {controllerURI: '<http://localhost:8080>'}
pinotFSSpecs:
- {className: org.apache.pinot.plugin.filesystem.GcsPinotFS, configs: null, scheme: gs}
pushJobSpec: {pushAttempts: 2, pushParallelism: 2, pushRetryIntervalMillis: 1000,
  segmentUriPrefix: null, segmentUriSuffix: null}
recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.avro.AvroRecordReader,
  configClassName: null, configs: null, dataFormat: avro}
segmentCreationJobParallelism: 0
segmentNameGeneratorSpec:
  configs: {segment.name.prefix: rule_logs_uat, exclude.sequence.id: 'true'}
  type: simple
tableSpec: {schemaURI: '<http://localhost:8080/tables/RuleLogsUAT/schema>',
  tableConfigURI: '<http://localhost:8080/tables/RuleLogsUAT>', tableName: RuleLogsUAT}
tlsSpec: null
when i remove
-Dplugins.include=pinot-gcs
i found another jar files. Maybe it’s root-cause issue ?
x
for Plugins root dir can you try absolute path
p
let’s me try.
hmm, bz spark context working dir base on context_id
Copy code
/tmp/2694644d46744db78cbe27e6dd833f2a
so it’s hard to get absolute path
x
hmm, does
$(pwd)/plugins
work?
p
$(pwd)/plugins
will be current dir on remote control machine. Not worker exec machine
x
hmm
the ingestion job only tar gz plugin dir on driver and set it
the worker will untar the plugin dir from the context
image.png
this is how we package the plugin dir and add to sparkContext
then each worker will untar the plugin dir from the targz file:
you can check the class:
org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner
p
hmm, i will try with option: • cluster initialization-actions -> gsutil cp plugins dir to cluster folder.
x
i see, then it means the spark driver has no access to plugin dir?
or you will download the plugin dir ?
p
i will predownload the plugins dir in relative folder
all steps almost done. But the final step is SegmentUriPushJob is errors Left Tab: Logging spark job Right Tab: logging pinot-controller
PinotFS for scheme: gs has not been initialized
again 😞
oh, i finded the issue. i loaded jar
pinot-batch-ingestion-spark
from release branch
x
so the filesystem is not init yet
I think you found the issue
p
hmmm. One more issue
Untitled
API
v2/segments
seem conflict vs spark sendSegmentUris. Something null that’s make API return internal server errors
• logging in spark job request:
Copy code
Start sending table RuleLogsUAT segment URIs: [gs://{bucket{/data/year=2020/RuleLogsUAT_OFFLINE_18316_18627_0.tar.gz] to locations: [org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec@499782c3]" 

Sending table RuleLogsUAT segment URI: gs://{bucket}data/year=2020/RuleLogsUAT_OFFLINE_18316_18627_0.tar.gz to location: https://{domain} for 

Sending request: https://{domain}/v2/segments to controller: pinot-controller-0.pinot-controller-headless.analytics.svc.cluster.local, version: Unknown

Caught temporary exception while pushing table: RuleLogsUAT segment uri: gs://{bucket}/data/year=2020/RuleLogsUAT_OFFLINE_18316_18627_0.tar.gz to https://{domain}, will retry

Got error status code: 500 (Internal Server Error) with reason: "Caught internal server exception while uploading segment" while sending request: https://{domain}/v2/segments to controller: pinot-controller-0.pinot-controller-headless.analytics.svc.cluster.local, version: Unknown	at org.apache.pinot.common.utils.FileUploadDownloadClient.sendRequest(FileUploadDownloadClient.java:451)	at org.apache.pinot.common.utils.FileUploadDownloadClient.sendSegmentUri(FileUploadDownloadClient.java:771)	at org.apache.pinot.segment.local.utils.SegmentPushUtils.lambda$sendSegmentUris$1(SegmentPushUtils.java:178)	at org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:50)	at org.apache.pinot.segment.local.utils.SegmentPushUtils.sendSegmentUris(SegmentPushUtils.java:175)	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:127)	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:117)	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)	at scala.collection.Iterator.foreach(Iterator.scala:943)	at scala.collection.Iterator.foreach$(Iterator.scala:943)	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)	at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)	at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)	at org.apache.spark.scheduler.Task.run(Task.scala:131)	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:748)
As far as i can tell, Maybe headers request is null then default uploadType is
SEGMENT
-> exception when multiPart file null
x
hmm
for your pinot controller/server, do they have those pinot filesystem configured?
also, can you try segmentMetadataPush
Copy code
segmentMetadataPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner
jobType: SegmentCreationAndMetadataPush
p
well, i think i has found the problem. In nginx-ingress log, i found:
Copy code
4: *519 client sent invalid header line: "DOWNLOAD_URI: <gs://my-bucket-test-data-np/data/RuleLogsUAT_OFFLINE_18117_18731_0.tar.gz>" while reading client request headers, client: 10.255.160.94,
Copy code
When the use of underscores is disabled, request header fields whose names contain underscores are marked as invalid and become subject to the ignore_invalid_headers directive.
I think nginx ingress remove all headers, which are the invalid name
x
oic
then you need to change nginx to keep those headers