Ken Krugler
01/26/2021, 3:16 PMFile pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
try {
TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);
} catch (IOException e) {
LOGGER.error("Failed to tar plugins directory", e);
throw new RuntimeException(e);
}
job.addCacheArchive(pluginsTarGzFile.toURI());
This creates a pinot-plugins.tar.gz
file in the Flink distribution directory, which is on my server. But as the Hadoop DistributedCache documentation states, “The DistributedCache
assumes that the files specified via urls are already present on the FileSystem
at the path specified by the url and are accessible by every machine in the cluster.”Ken Krugler
01/26/2021, 3:16 PM<http://java.io|java.io>.FileNotFoundException: File file:/path/to/distribution/apache-pinot-incubating-0.7.0-SNAPSHOT-bin/pinot-plugins.tar.gz does not exist
Ken Krugler
01/26/2021, 3:18 PMKishore G
Ken Krugler
01/26/2021, 3:25 PMKen Krugler
01/26/2021, 3:27 PMKishore G
Kishore G
Ken Krugler
01/26/2021, 3:28 PMstagingDir
configuration.Kishore G
Ken Krugler
01/26/2021, 3:29 PMKishore G
Ken Krugler
01/26/2021, 3:30 PMKen Krugler
01/26/2021, 3:31 PMJobConf"
Kishore G
Ken Krugler
01/26/2021, 3:31 PMKen Krugler
01/26/2021, 3:32 PMKen Krugler
01/26/2021, 3:32 PMKen Krugler
01/26/2021, 3:36 PMKishore G
Kishore G
Ken Krugler
01/26/2021, 3:42 PM-files
parameter (as an example), then the standard Hadoop tool framework will copy the file(s) to HDFS first, before adding them to the JobConf as <hdfs://blah>
paths. In the Pinot code, you need to do this first step (of copying to HDFS) yourself.Ken Krugler
01/26/2021, 3:43 PMKishore G
then the standard Hadoop tool framework will copy the file(s) to HDFS firstthat what I thought would happen when we do it via code, do you know which staging directory will it copy it to?
Ken Krugler
01/26/2021, 4:02 PMKen Krugler
01/26/2021, 4:03 PMKen Krugler
01/26/2021, 4:04 PMKishore G
Xiang Fu
Ken Krugler
01/26/2021, 9:05 PMXiang Fu
Xiang Fu
Xiang Fu
Xiang Fu
Ken Krugler
01/26/2021, 10:06 PMprotected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) {
File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
if (pluginsRootDir.exists()) {
try {
File pluginsTarGzFile = File.createTempFile("pinot-plugins", ".tar.gz");
TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);
// Copy to staging directory
Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri());
job.addCacheArchive(cachedPluginsTarball.toUri());
Ken Krugler
01/26/2021, 10:06 PMXiang Fu
Ken Krugler
01/26/2021, 10:07 PMaddDepsJarToDistributedCache
, which has the same issueKen Krugler
01/26/2021, 10:08 PMKen Krugler
01/28/2021, 12:50 AMKen Krugler
01/28/2021, 1:02 AMXiang Fu
Xiang Fu
Ken Krugler
01/28/2021, 1:08 AMXiang Fu
Ken Krugler
01/28/2021, 1:15 AMNormalizedDateSegmentNameGenerator
thinks it’s not APPEND, so you don’t get any timestamp in the generated filename.Ken Krugler
01/28/2021, 1:18 AMKen Krugler
01/28/2021, 1:26 AMKen Krugler
01/28/2021, 1:27 AMXiang Fu
TAR
upload the segment tar file to pinot controller; URI
, save segment to deep store, notify controller th uri and let controller download and validate it . METADATA
push both uri and metadata to avoid controller downloadXiang Fu
NormalizedDateSegmentNameGenerator
requires extra table configs like time column name and type to help it find out the start/end time (event time)Xiang Fu
Ken Krugler
01/29/2021, 10:41 PMXiang Fu