I’m using spark-job to with jobType: SegmentCreati...
# troubleshooting
p
I’m using spark-job to with jobType: SegmentCreationAndUriPush It’s seem the bug on
copy
function ?
Config.yaml
Copy code
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:

  # name: execution framework name
  name: 'spark'

  # segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'

  # segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'

  # segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'

  # extraConfigs: extra configs for execution framework.
  extraConfigs:

    stagingDir: gs://{bucket_name}/tmp

# jobType: Pinot ingestion job type.
# Supported job types are:
#   'SegmentCreation'
#   'SegmentTarPush'
#   'SegmentUriPush'
#   'SegmentCreationAndTarPush'
#   'SegmentCreationAndUriPush'
jobType: SegmentCreationAndUriPush

# inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
inputDirURI: 'gs://{bucket_name}/rule_logs'

# includeFileNamePattern: include file name pattern, supported glob pattern.
# Sample usage:
#   'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
#   'glob:**/*.avro' will include all the avro files under inputDirURI recursively.
includeFileNamePattern: 'glob:**/*.avro'

# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
# Sample usage:
#   'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
#   'glob:**/*.avro' will exclude all the avro files under inputDirURI recursively.
# _excludeFileNamePattern: ''

# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
outputDirURI: 'gs://{bucket_name}/data'

# overwriteOutput: Overwrite output segments if existed.
overwriteOutput: true

pinotFSSpecs:

  - # scheme: used to identify a PinotFS.
    # E.g. local, hdfs, dbfs, etc
    scheme: gs
    className: org.apache.pinot.plugin.filesystem.GcsPinotFS
    configs:
      'projectId': 'xxxx'
      'gcpKey' : 'xxx.json'

# recordReaderSpec: defines all record reader
recordReaderSpec:

  # dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
  dataFormat: 'avro'
  #   org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
  #   org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
  #   org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
  #   org.apache.pinot.plugin.inputformat.json.JSONRecordReader
  #   org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
  #   org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
  className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'

# tableSpec: defines table name and where to fetch corresponding table config and table schema.
tableSpec:

  # tableName: Table name
  tableName: 'RuleLogsUAT'
  # schemaURI: defines where to read the table schema, supports PinotFS or HTTP.a
  schemaURI: '<http://localhost:9000/tables/RuleLogsUAT/schema>'
  # Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
  # The real table config is the object under the field 'OFFLINE'.
  tableConfigURI: '<http://localhost:9000/tables/RuleLogsUAT>'

# segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
segmentNameGeneratorSpec:

  # type: Current supported type is 'simple' and 'normalizedDate'.
  type: normalizedDate

  # configs: Configs to init SegmentNameGenerator.
  configs:
    segment.name.prefix: 'rule_logs_uat'
    exclude.sequence.id: true

# pinotClusterSpecs: defines the Pinot Cluster Access Point.
pinotClusterSpecs:
  - # controllerURI: used to fetch table/schema information and data push.
    controllerURI: '<http://localhost:9000>'

# pushJobSpec: defines segment push job related configuration.
pushJobSpec:

  # pushParallelism: push job parallelism, default is 1.
  pushParallelism: 2

  # pushAttempts: number of attempts for push job, default is 1, which means no retry.
  pushAttempts: 2

  # pushRetryIntervalMillis: retry wait Ms, default to 1 second.
  pushRetryIntervalMillis: 1000
k
Without seeing the actual
<gs://xxx>
path, or an obfuscated version of the same, it’s hard to know why the
normalizeToDirectoryUri
method thinks you have a relative path in your absolute URI.
x
my feeling is that the bucket name got dropped from the processing?
k
Hmm, maybe it’s a Spark job issue that’s similar to the issue I fixed for Hadoop, with path normalization.
p
seems
normalizeToDirectoryUri
function is missing
/
after
uri.getHost()
? Mocks Test for with srcUri:
<gs://test/tmp>
, output is
gs:/testtmp/
k
There don’t see to be any unit tests for the GCS plugin, so not unexpected 🙂 But try adding a trailing ‘/’ to all of your gs://xxx paths. E.g.
gs://{bucket_name}/tmp/
(note trailing ‘/’) for
stagingDir
. That takes a different path through the code, where no normalization is done. If that works, then please file an issue.
p
I add ‘/’ but have another problem with copy() function
Copy code
[INFO ] 2021-04-19 12:20:06.433 [main] GCSPinotFSTest - Copying uri gs://{bucket_name}/tmp/ to uri gs://{bucket_name}/data/
[INFO ] 2021-04-19 12:20:08.066 [main] GCSPinotFSTest - Listed 1 files from URI: gs://{bucket_name}/tmp/, is recursive: true



java.io.IOException: java.net.URISyntaxException: Relative path in absolute URI: gs://{bucket_name}gs://{bucket_name}/tmp/0ec9c3b7-aba1-4ee8-83e7-79f7688f7d50.png

	at org.apache.pinot.plugin.filesystem.GCSPinotFSTest.copy(GCSPinotFSTest.java:102)
	at org.apache.pinot.plugin.filesystem.GCSPinotFSTest.testTouchFilesInFolder(GCSPinotFSTest.java:71)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:108)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:661)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:869)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1193)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:126)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
	at org.testng.TestRunner.privateRun(TestRunner.java:744)
	at org.testng.TestRunner.run(TestRunner.java:602)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:380)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:375)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:340)
	at org.testng.SuiteRunner.run(SuiteRunner.java:289)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1301)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1226)
	at org.testng.TestNG.runSuites(TestNG.java:1144)
	at org.testng.TestNG.run(TestNG.java:1115)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
Caused by: java.net.URISyntaxException: Relative path in absolute URI: gs://{bucket_name}gs://{bucket_name}/tmp/0ec9c3b7-aba1-4ee8-83e7-79f7688f7d50.png
	at java.net.URI.checkPath(URI.java:1823)
	at java.net.URI.<init>(URI.java:672)
	at java.net.URI.<init>(URI.java:774)
	at org.apache.pinot.plugin.filesystem.GCSPinotFSTest.copy(GCSPinotFSTest.java:92)
	... 25 more
x
hmm
p
i already try to fix it
x
can you share the test code?
oh
thanks
I think it’s the part to concat the relative path to absolution path
p
my new copy function
Copy code
public boolean copy(URI srcUri, URI dstUri) throws IOException {
        <http://LOGGER.info|LOGGER.info>("Copying uri {} to uri {}", srcUri, dstUri);
        checkState(exists(srcUri), "Source URI '%s' does not exist", srcUri);
        if (srcUri.equals(dstUri)) {
            return true;
        }
        if (!isDirectory(srcUri)) {
            delete(dstUri, true);
            return copyFile(srcUri, dstUri);
        }
        dstUri = normalizeToDirectoryUri(dstUri);
        ImmutableList.Builder<URI> builder = ImmutableList.builder();
        Path srcPath = Paths.get(srcUri);
        try {
            boolean copySucceeded = true;
            for (String directoryEntry : listFiles(srcUri, true)) {
                URI src = new URI(directoryEntry);
                String relativeSrcPath = srcPath.relativize(Paths.get(src)).toString();
                String dstPath = dstUri.resolve(relativeSrcPath).getPath();
                URI dst = new URI(dstUri.getScheme(), dstUri.getHost(), dstPath, null);
                <http://LOGGER.info|LOGGER.info>("copy from {}, to {}", src, dst);
                copySucceeded &= copyFile(src, dst);
            }
            return copySucceeded;
        } catch (URISyntaxException e) {
            throw new IOException(e);
        }
    }
i think problem is
Copy code
URI src = new URI(srcUri.getScheme(), srcUri.getHost(), directoryEntry, null);
String relativeSrcPath = srcPath.relativize(Paths.get(directoryEntry)).toString();
because the
directoryEntry
will output full path for example:
directoryEntry
will return gs://{bucket_name}/tmp/file1.xxx
Copy code
URI srcUri = new URI("<gs://bucket_name/tmp/>");
String dest = "<gs://bucket_name/tmp/file1.xxx>";

URI src = new URI(srcUri.getScheme(), srcUri.getHost(), dest, null);
Output is => gs:<gs://bucket_name/tmp/file1.xxx>
x
got it
🙌 1
then it’s uri parsing issue?
Copy code
srcUri.getHost()
is gs://?
p
i think it’s bucket name
x
hmm, then why there are double gs:
p
😄, i don’t know why, but with bucket_name is
bucket_name
, the getHost() return null 😄
gs://bucket_name -> host is null gs://test -> host is test
x
ic, I think we need to add some test to ensure this logic is correct
@Elon you don’t see this issue right?
👀 1
e
iirc this was fixed with this commit:
Copy code
commit 950295ab40844e628c5802649e99fc4feec4af54
Author: Elon Azoulay <elon.azoulay@gmail.com>
Date:   Sun Jan 17 13:21:49 2021 -0800

    Fix gcs listFiles (#6426)
Which version are you using?
p
https://github.com/apache/incubator-pinot/commit/950295ab40844e628c5802649e99fc4feec4af54 i think this commit is missing a part
public boolean copy(URI srcUri, URI dstUri)
function
e
I think you're right. iirc, I was fixing the segment deletion manager, we never used the copy directory -> directory. I can create a pr to fix this. I'll keep you updated, should be tomorrow.
Thanks for catching this @Phúc Huỳnh!
🙌 1
Hi @Phúc Huỳnh - created a related pr for another fix. This one will prevent 429 "rate limit exceeded" errors in gcs: https://github.com/apache/incubator-pinot/pull/6831
cc @Xiang Fu
x
put some comments 😛
🙌 1
e
Thanks!
Ah, was the comment about the path for this issue (i.e. the relativize issue)? I can add that as a separate commit. Found 2 other minor issues also:)
Actually it can be added as another commit, is that ok @Xiang Fu?
the path fix?
x
sure
my comments are just code re-org
👍 1
e
Adding a test - we tried to add using the local storage test but it doesn't support the operations, so just adding unit tests for all of those uri/path functions.
also - lmk if my reply makes sense: to minimize calls, keep the isDirectory() on line 194? i.e. just return if it's
/
x
I think that’s good, if we can leverage the sample input/output from gs calls
👍 1
e
hopefully in a newer version google will have.a better testing library - or if the "stealth" company has a test bucket we can use it 😃 then we can even have integration tests for gcs.
or maybe a test bucket only accessible from ci/cd? And I can use a local test bucket (I have integration tests I cannot push bcz of there's no bucket to push it to).
x
I somehow feel google should have some public gs bucket for people to access?
e
Yeah, they can afford it:) Maybe, will see...
Doesn't look like it.
What about this? In ci script use environment variables for project, key file location, etc. - if set then the test runs, otherwise skip it?
ie. .github/workflows/pinot_tests.yml?
x
for unit test, you can specify some env variable
and your test can examine that
👍 1
e
Wow, URI has a lot of corner cases, the
bucket_name
isn't the host because of the
_
. Looking at URI.Parser - will test all the cases.
Update: using RemoteStorageHelper for tests, should be pushing soon.
👍 2
Made some changes to PinotFS also: https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6587184 getHost() cannot contain `_`'s so it returns null (!!!).
Ok, updated the the pr, added tests and javadocs. If you set the environment you can run them on your project. So gcs buckets can have `_`'s but s3 and azure buckets cannot (like regular uri's). This was one of the issues. There were a few more, fixed them and added tests to verify.
This also eliminates those 429 errors (mkdir was the cause).
lmk what you think when you have some time. thanks!
also changed to use Path for resolving - @Xiang Fu 🙂
x
cool
i think you need to create a new pr
😁 1
e
Thanks:) Just saw that, will do!
https://github.com/apache/incubator-pinot/pull/6841 - this will also fix @Phúc Huỳnh's issue (with path.resolve and getAuthority for bucket name w `_`'s)
p
that’s great.