https://pinot.apache.org/ logo
p

Phúc Huỳnh

04/20/2021, 4:40 AM
Hi, I got this issue when submit spark-job to ingest batch file.
Copy code
21/04/20 03:03:42 ERROR org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand: Got exception to kick off standalone data ingestion job -
java.lang.RuntimeException: Caught exception during running - org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner
	at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:144)
	at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.runIngestionJob(IngestionJobLauncher.java:117)
	at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:132)
	at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.main(LaunchDataIngestionJobCommand.java:67)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1030)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.nio.file.FileSystemNotFoundException: Provider "gs" not installed
	at java.nio.file.Paths.get(Paths.java:147)
	at org.apache.pinot.plugin.filesystem.GcsPinotFS.copy(GcsPinotFS.java:262)
	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner.run(SparkSegmentGenerationJobRunner.java:344)
	at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:142)
	... 15 more
after some deep-dive, I think it’s the Scala filesystem only 2 providers: file, jar
base on some explain & discuss : https://stackoverflow.com/questions/39500445/filesystem-provider-disappearing-in-spark I suggest move
java.nio.file.Path
to
org.apache.hadoop.fs.Path
Have any idea ?
j

Jackie

04/20/2021, 5:20 AM
@Xiang Fu ^^
x

Xiang Fu

04/20/2021, 6:48 AM
can you try to add
Copy code
google-cloud-nio
dependency into pinot-gcs pom file?
hmm, seems there is already one in pom:
Copy code
<dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>google-cloud-nio</artifactId>
      <version>0.120.0-alpha</version>
    </dependency>
p

Phúc Huỳnh

04/20/2021, 6:52 AM
yup, in pom already has dependency
maybe we can modify this code to directly use google cloudstorage api
does this code work on your standalone application
or test
but only failed on spark?
p

Phúc Huỳnh

04/20/2021, 7:07 AM
yup.
this code work on standalone application
but fail on spark cluster on dataproc
x

Xiang Fu

04/20/2021, 7:08 AM
hmm
on spark cluster, can you try to put the gcs plugin to the classpath
p

Phúc Huỳnh

04/20/2021, 7:09 AM
already tried. 😄
x

Xiang Fu

04/20/2021, 7:09 AM
I assume this is java 8?
or 11?
p

Phúc Huỳnh

04/20/2021, 7:09 AM
java 8
x

Xiang Fu

04/20/2021, 7:13 AM
another way is to shade pinot-ingestion-spark and pinot-gcs together into one jar
p

Phúc Huỳnh

04/20/2021, 7:18 AM
😄 another way to fix, remove
java.nio.file.Paths
in gcs-pinot, then custom another Paths.get function ?
x

Xiang Fu

04/20/2021, 7:21 AM
right
then you need to implement the relatize function there
I feel better to just use native google gcs lib
p

Phúc Huỳnh

04/20/2021, 10:38 AM
well, just another error ini sendSegmentUri pharse.
Copy code
Caused by: java.lang.IllegalStateException: PinotFS for scheme: gs has not been initialized
	at shaded.com.google.common.base.Preconditions.checkState(Preconditions.java:518)
	at org.apache.pinot.spi.filesystem.PinotFSFactory.create(PinotFSFactory.java:80)
	at org.apache.pinot.plugin.ingestion.batch.common.SegmentPushUtils.sendSegmentUris(SegmentPushUtils.java:158)
	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:122)
	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:117)
are you have any idea how to fix it ?
x

Xiang Fu

04/20/2021, 5:21 PM
It’s missing fs init in the code: https://github.com/apache/incubator-pinot/pull/6819
merged the above pr, can you try it again ?
p

Phúc Huỳnh

04/22/2021, 5:11 AM
hmm, i try it again but still errors.
i guess i already find issue, i forget add dir.plugins property in spark job. But i’m found another issue when packing plugins to workers-node.
Copy code
21/04/22 07:25:48 ERROR org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner: Failed to tar plugins directory
java.io.IOException: Request to write '4096' bytes exceeds size in header of '12453302' bytes for entry './pinot-plugins.tar.gz'
	at org.apache.commons.compress.archivers.tar.TarArchiveOutputStream.write(TarArchiveOutputStream.java:449)
x

Xiang Fu

04/22/2021, 5:25 PM
hmm, seems like an issue of tar’gz everything in plugin dir to a file
I will take a look
are you using default plugin dir or you’ve put more files into that
do you set this :
plugins.dir
? in your java cmd?
I was testing this util locally:
Copy code
public static void main(String[] args) {
    try {
      TarGzCompressionUtils.createTarGzFile(
          new File("/Users/xiangfu/workspace/pinot-dev/pinot-distribution/target/apache-pinot-incubating-0.8.0-SNAPSHOT-bin/apache-pinot-incubating-0.8.0-SNAPSHOT-bin/plugins"),
          new File("/tmp/plugin.tar.gz"));
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
it works and cannot reproduce the issue:
p

Phúc Huỳnh

04/23/2021, 2:36 AM
i’m prettery sure add env plugins.dir Log has INFO
Copy code
21/04/23 02:33:34 INFO org.apache.pinot.spi.plugin.PluginManager: Plugins root dir is [./]
21/04/23 02:33:34 INFO org.apache.pinot.spi.plugin.PluginManager: Trying to load plugins: [[pinot-gcs]]
Full log:
Copy code
:: retrieving :: org.apache.spark#spark-submit-parent-adf0fd1c-d000-4782-8499-d41f1396e726
	confs: [default]
	0 artifacts copied, 9 already retrieved (0kB/17ms)
21/04/23 02:33:34 INFO org.apache.pinot.spi.plugin.PluginManager: Plugins root dir is [./]
21/04/23 02:33:34 INFO org.apache.pinot.spi.plugin.PluginManager: Trying to load plugins: [[pinot-gcs]]
21/04/23 02:33:35 INFO org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher: SegmentGenerationJobSpec:
!!org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
authToken: null
cleanUpOutputDir: false
excludeFileNamePattern: null
executionFrameworkSpec:
  extraConfigs: {stagingDir: '<gs://bucket_name/tmp/>'}
  name: spark
  segmentGenerationJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner
  segmentMetadataPushJobRunnerClassName: null
  segmentTarPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner
  segmentUriPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner
failOnEmptySegment: false
includeFileNamePattern: glob:**/*.avro
inputDirURI: <gs://bucket_name/rule_logs/>
jobType: SegmentCreationAndUriPush
outputDirURI: <gs://bucket_name/data/>
overwriteOutput: true
pinotClusterSpecs:
- {controllerURI: '<http://localhost:8080>'}
pinotFSSpecs:
- {className: org.apache.pinot.plugin.filesystem.GcsPinotFS, configs: null, scheme: gs}
pushJobSpec: {pushAttempts: 2, pushParallelism: 2, pushRetryIntervalMillis: 1000,
  segmentUriPrefix: null, segmentUriSuffix: null}
recordReaderSpec: {className: org.apache.pinot.plugin.inputformat.avro.AvroRecordReader,
  configClassName: null, configs: null, dataFormat: avro}
segmentCreationJobParallelism: 0
segmentNameGeneratorSpec:
  configs: {segment.name.prefix: rule_logs_uat, exclude.sequence.id: 'true'}
  type: simple
tableSpec: {schemaURI: '<http://localhost:8080/tables/RuleLogsUAT/schema>',
  tableConfigURI: '<http://localhost:8080/tables/RuleLogsUAT>', tableName: RuleLogsUAT}
tlsSpec: null
when i remove
-Dplugins.include=pinot-gcs
i found another jar files. Maybe it’s root-cause issue ?
x

Xiang Fu

04/23/2021, 3:50 AM
for Plugins root dir can you try absolute path
p

Phúc Huỳnh

04/23/2021, 3:55 AM
let’s me try.
hmm, bz spark context working dir base on context_id
Copy code
/tmp/2694644d46744db78cbe27e6dd833f2a
so it’s hard to get absolute path
x

Xiang Fu

04/23/2021, 6:57 AM
hmm, does
$(pwd)/plugins
work?
p

Phúc Huỳnh

04/23/2021, 7:00 AM
$(pwd)/plugins
will be current dir on remote control machine. Not worker exec machine
x

Xiang Fu

04/23/2021, 7:07 AM
hmm
the ingestion job only tar gz plugin dir on driver and set it
the worker will untar the plugin dir from the context
message has been deleted
this is how we package the plugin dir and add to sparkContext
then each worker will untar the plugin dir from the targz file:
you can check the class:
org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner
p

Phúc Huỳnh

04/23/2021, 7:22 AM
hmm, i will try with option: • cluster initialization-actions -> gsutil cp plugins dir to cluster folder.
x

Xiang Fu

04/23/2021, 7:38 AM
i see, then it means the spark driver has no access to plugin dir?
or you will download the plugin dir ?
p

Phúc Huỳnh

04/23/2021, 7:51 AM
i will predownload the plugins dir in relative folder
all steps almost done. But the final step is SegmentUriPushJob is errors Left Tab: Logging spark job Right Tab: logging pinot-controller
PinotFS for scheme: gs has not been initialized
again 😞
oh, i finded the issue. i loaded jar
pinot-batch-ingestion-spark
from release branch
x

Xiang Fu

04/23/2021, 8:36 AM
so the filesystem is not init yet
I think you found the issue
p

Phúc Huỳnh

04/24/2021, 2:32 AM
hmmm. One more issue
API
v2/segments
seem conflict vs spark sendSegmentUris. Something null that’s make API return internal server errors
• logging in spark job request:
Copy code
Start sending table RuleLogsUAT segment URIs: [gs://{bucket{/data/year=2020/RuleLogsUAT_OFFLINE_18316_18627_0.tar.gz] to locations: [org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec@499782c3]" 

Sending table RuleLogsUAT segment URI: gs://{bucket}data/year=2020/RuleLogsUAT_OFFLINE_18316_18627_0.tar.gz to location: https://{domain} for 

Sending request: https://{domain}/v2/segments to controller: pinot-controller-0.pinot-controller-headless.analytics.svc.cluster.local, version: Unknown

Caught temporary exception while pushing table: RuleLogsUAT segment uri: gs://{bucket}/data/year=2020/RuleLogsUAT_OFFLINE_18316_18627_0.tar.gz to https://{domain}, will retry

Got error status code: 500 (Internal Server Error) with reason: "Caught internal server exception while uploading segment" while sending request: https://{domain}/v2/segments to controller: pinot-controller-0.pinot-controller-headless.analytics.svc.cluster.local, version: Unknown	at org.apache.pinot.common.utils.FileUploadDownloadClient.sendRequest(FileUploadDownloadClient.java:451)	at org.apache.pinot.common.utils.FileUploadDownloadClient.sendSegmentUri(FileUploadDownloadClient.java:771)	at org.apache.pinot.segment.local.utils.SegmentPushUtils.lambda$sendSegmentUris$1(SegmentPushUtils.java:178)	at org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:50)	at org.apache.pinot.segment.local.utils.SegmentPushUtils.sendSegmentUris(SegmentPushUtils.java:175)	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:127)	at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:117)	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)	at scala.collection.Iterator.foreach(Iterator.scala:943)	at scala.collection.Iterator.foreach$(Iterator.scala:943)	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)	at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1012)	at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1012)	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2242)	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)	at org.apache.spark.scheduler.Task.run(Task.scala:131)	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)	at java.lang.Thread.run(Thread.java:748)
As far as i can tell, Maybe headers request is null then default uploadType is
SEGMENT
-> exception when multiPart file null
x

Xiang Fu

04/24/2021, 8:22 AM
hmm
for your pinot controller/server, do they have those pinot filesystem configured?
also, can you try segmentMetadataPush
Copy code
segmentMetadataPushJobRunnerClassName: org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner
jobType: SegmentCreationAndMetadataPush
p

Phúc Huỳnh

04/26/2021, 2:48 AM
well, i think i has found the problem. In nginx-ingress log, i found:
Copy code
4: *519 client sent invalid header line: "DOWNLOAD_URI: <gs://my-bucket-test-data-np/data/RuleLogsUAT_OFFLINE_18117_18731_0.tar.gz>" while reading client request headers, client: 10.255.160.94,
Copy code
When the use of underscores is disabled, request header fields whose names contain underscores are marked as invalid and become subject to the ignore_invalid_headers directive.
I think nginx ingress remove all headers, which are the invalid name
x

Xiang Fu

04/26/2021, 9:03 AM
oic
then you need to change nginx to keep those headers