Phúc Huỳnh
04/16/2021, 11:00 AMcopy
function ?Phúc Huỳnh
04/16/2021, 11:05 AM#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# <http://www.apache.org/licenses/LICENSE-2.0>
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# executionFrameworkSpec: Defines ingestion jobs to be running.
executionFrameworkSpec:
# name: execution framework name
name: 'spark'
# segmentGenerationJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentGenerationJobRunner interface.
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
# segmentTarPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentTarPushJobRunner interface.
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
# segmentUriPushJobRunnerClassName: class name implements org.apache.pinot.spi.batch.ingestion.runner.SegmentUriPushJobRunner interface.
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
# extraConfigs: extra configs for execution framework.
extraConfigs:
stagingDir: gs://{bucket_name}/tmp
# jobType: Pinot ingestion job type.
# Supported job types are:
# 'SegmentCreation'
# 'SegmentTarPush'
# 'SegmentUriPush'
# 'SegmentCreationAndTarPush'
# 'SegmentCreationAndUriPush'
jobType: SegmentCreationAndUriPush
# inputDirURI: Root directory of input data, expected to have scheme configured in PinotFS.
inputDirURI: 'gs://{bucket_name}/rule_logs'
# includeFileNamePattern: include file name pattern, supported glob pattern.
# Sample usage:
# 'glob:*.avro' will include all avro files just under the inputDirURI, not sub directories;
# 'glob:**/*.avro' will include all the avro files under inputDirURI recursively.
includeFileNamePattern: 'glob:**/*.avro'
# excludeFileNamePattern: exclude file name pattern, supported glob pattern.
# Sample usage:
# 'glob:*.avro' will exclude all avro files just under the inputDirURI, not sub directories;
# 'glob:**/*.avro' will exclude all the avro files under inputDirURI recursively.
# _excludeFileNamePattern: ''
# outputDirURI: Root directory of output segments, expected to have scheme configured in PinotFS.
outputDirURI: 'gs://{bucket_name}/data'
# overwriteOutput: Overwrite output segments if existed.
overwriteOutput: true
pinotFSSpecs:
- # scheme: used to identify a PinotFS.
# E.g. local, hdfs, dbfs, etc
scheme: gs
className: org.apache.pinot.plugin.filesystem.GcsPinotFS
configs:
'projectId': 'xxxx'
'gcpKey' : 'xxx.json'
# recordReaderSpec: defines all record reader
recordReaderSpec:
# dataFormat: Record data format, e.g. 'avro', 'parquet', 'orc', 'csv', 'json', 'thrift' etc.
dataFormat: 'avro'
# org.apache.pinot.plugin.inputformat.avro.AvroRecordReader
# org.apache.pinot.plugin.inputformat.csv.CSVRecordReader
# org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader
# org.apache.pinot.plugin.inputformat.json.JSONRecordReader
# org.apache.pinot.plugin.inputformat.orc.ORCRecordReader
# org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReader
className: 'org.apache.pinot.plugin.inputformat.avro.AvroRecordReader'
# tableSpec: defines table name and where to fetch corresponding table config and table schema.
tableSpec:
# tableName: Table name
tableName: 'RuleLogsUAT'
# schemaURI: defines where to read the table schema, supports PinotFS or HTTP.a
schemaURI: '<http://localhost:9000/tables/RuleLogsUAT/schema>'
# Note that the API to read Pinot table config directly from pinot controller contains a JSON wrapper.
# The real table config is the object under the field 'OFFLINE'.
tableConfigURI: '<http://localhost:9000/tables/RuleLogsUAT>'
# segmentNameGeneratorSpec: defines how to init a SegmentNameGenerator.
segmentNameGeneratorSpec:
# type: Current supported type is 'simple' and 'normalizedDate'.
type: normalizedDate
# configs: Configs to init SegmentNameGenerator.
configs:
segment.name.prefix: 'rule_logs_uat'
exclude.sequence.id: true
# pinotClusterSpecs: defines the Pinot Cluster Access Point.
pinotClusterSpecs:
- # controllerURI: used to fetch table/schema information and data push.
controllerURI: '<http://localhost:9000>'
# pushJobSpec: defines segment push job related configuration.
pushJobSpec:
# pushParallelism: push job parallelism, default is 1.
pushParallelism: 2
# pushAttempts: number of attempts for push job, default is 1, which means no retry.
pushAttempts: 2
# pushRetryIntervalMillis: retry wait Ms, default to 1 second.
pushRetryIntervalMillis: 1000
Ken Krugler
04/16/2021, 2:13 PM<gs://xxx>
path, or an obfuscated version of the same, it’s hard to know why the normalizeToDirectoryUri
method thinks you have a relative path in your absolute URI.Xiang Fu
Ken Krugler
04/16/2021, 5:43 PMPhúc Huỳnh
04/17/2021, 4:09 AMnormalizeToDirectoryUri
function is missing /
after uri.getHost()
?
Mocks Test for with srcUri: <gs://test/tmp>
, output is gs:/testtmp/
Ken Krugler
04/17/2021, 4:02 PMgs://{bucket_name}/tmp/
(note trailing ‘/’) for stagingDir
.
That takes a different path through the code, where no normalization is done.
If that works, then please file an issue.Phúc Huỳnh
04/19/2021, 5:22 AM[INFO ] 2021-04-19 12:20:06.433 [main] GCSPinotFSTest - Copying uri gs://{bucket_name}/tmp/ to uri gs://{bucket_name}/data/
[INFO ] 2021-04-19 12:20:08.066 [main] GCSPinotFSTest - Listed 1 files from URI: gs://{bucket_name}/tmp/, is recursive: true
java.io.IOException: java.net.URISyntaxException: Relative path in absolute URI: gs://{bucket_name}gs://{bucket_name}/tmp/0ec9c3b7-aba1-4ee8-83e7-79f7688f7d50.png
at org.apache.pinot.plugin.filesystem.GCSPinotFSTest.copy(GCSPinotFSTest.java:102)
at org.apache.pinot.plugin.filesystem.GCSPinotFSTest.testTouchFilesInFolder(GCSPinotFSTest.java:71)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:108)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:661)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:869)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1193)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:126)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.privateRun(TestRunner.java:744)
at org.testng.TestRunner.run(TestRunner.java:602)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:380)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:375)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:340)
at org.testng.SuiteRunner.run(SuiteRunner.java:289)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1301)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1226)
at org.testng.TestNG.runSuites(TestNG.java:1144)
at org.testng.TestNG.run(TestNG.java:1115)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
Caused by: java.net.URISyntaxException: Relative path in absolute URI: gs://{bucket_name}gs://{bucket_name}/tmp/0ec9c3b7-aba1-4ee8-83e7-79f7688f7d50.png
at java.net.URI.checkPath(URI.java:1823)
at java.net.URI.<init>(URI.java:672)
at java.net.URI.<init>(URI.java:774)
at org.apache.pinot.plugin.filesystem.GCSPinotFSTest.copy(GCSPinotFSTest.java:92)
... 25 more
Xiang Fu
Phúc Huỳnh
04/19/2021, 7:07 AMXiang Fu
Xiang Fu
Xiang Fu
Xiang Fu
Phúc Huỳnh
04/19/2021, 7:08 AMpublic boolean copy(URI srcUri, URI dstUri) throws IOException {
<http://LOGGER.info|LOGGER.info>("Copying uri {} to uri {}", srcUri, dstUri);
checkState(exists(srcUri), "Source URI '%s' does not exist", srcUri);
if (srcUri.equals(dstUri)) {
return true;
}
if (!isDirectory(srcUri)) {
delete(dstUri, true);
return copyFile(srcUri, dstUri);
}
dstUri = normalizeToDirectoryUri(dstUri);
ImmutableList.Builder<URI> builder = ImmutableList.builder();
Path srcPath = Paths.get(srcUri);
try {
boolean copySucceeded = true;
for (String directoryEntry : listFiles(srcUri, true)) {
URI src = new URI(directoryEntry);
String relativeSrcPath = srcPath.relativize(Paths.get(src)).toString();
String dstPath = dstUri.resolve(relativeSrcPath).getPath();
URI dst = new URI(dstUri.getScheme(), dstUri.getHost(), dstPath, null);
<http://LOGGER.info|LOGGER.info>("copy from {}, to {}", src, dst);
copySucceeded &= copyFile(src, dst);
}
return copySucceeded;
} catch (URISyntaxException e) {
throw new IOException(e);
}
}
Phúc Huỳnh
04/19/2021, 7:10 AMURI src = new URI(srcUri.getScheme(), srcUri.getHost(), directoryEntry, null);
String relativeSrcPath = srcPath.relativize(Paths.get(directoryEntry)).toString();
Phúc Huỳnh
04/19/2021, 7:11 AMdirectoryEntry
will output full path
for example: directoryEntry
will return gs://{bucket_name}/tmp/file1.xxxPhúc Huỳnh
04/19/2021, 7:17 AMURI srcUri = new URI("<gs://bucket_name/tmp/>");
String dest = "<gs://bucket_name/tmp/file1.xxx>";
URI src = new URI(srcUri.getScheme(), srcUri.getHost(), dest, null);
Output is => gs:<gs://bucket_name/tmp/file1.xxx>
Xiang Fu
Xiang Fu
Xiang Fu
srcUri.getHost()
is gs://?Phúc Huỳnh
04/19/2021, 7:56 AMXiang Fu
Phúc Huỳnh
04/19/2021, 8:05 AMbucket_name
, the getHost() return null 😄Phúc Huỳnh
04/19/2021, 8:06 AMXiang Fu
Xiang Fu
Elon
04/19/2021, 8:28 AMcommit 950295ab40844e628c5802649e99fc4feec4af54
Author: Elon Azoulay <elon.azoulay@gmail.com>
Date: Sun Jan 17 13:21:49 2021 -0800
Fix gcs listFiles (#6426)
Elon
04/19/2021, 8:28 AMPhúc Huỳnh
04/19/2021, 8:32 AMpublic boolean copy(URI srcUri, URI dstUri)
functionElon
04/19/2021, 8:39 AMElon
04/19/2021, 8:42 AMElon
04/21/2021, 9:38 PMElon
04/21/2021, 9:38 PMXiang Fu
Elon
04/21/2021, 9:58 PMElon
04/21/2021, 10:05 PMElon
04/21/2021, 10:10 PMElon
04/21/2021, 10:11 PMXiang Fu
Xiang Fu
Elon
04/21/2021, 10:26 PMElon
04/21/2021, 10:26 PM/
Xiang Fu
Elon
04/21/2021, 10:31 PMElon
04/21/2021, 10:32 PMXiang Fu
Elon
04/21/2021, 10:33 PMElon
04/21/2021, 10:35 PMElon
04/21/2021, 10:36 PMElon
04/21/2021, 10:36 PMXiang Fu
Xiang Fu
Elon
04/22/2021, 12:43 AMbucket_name
isn't the host because of the _
. Looking at URI.Parser - will test all the cases.Elon
04/22/2021, 6:38 PMElon
04/23/2021, 10:14 AMElon
04/25/2021, 6:03 AMElon
04/25/2021, 6:04 AMElon
04/25/2021, 6:05 AMElon
04/25/2021, 6:06 AMXiang Fu
Xiang Fu
Elon
04/25/2021, 3:55 PMElon
04/25/2021, 7:37 PMPhúc Huỳnh
04/26/2021, 4:12 PM