https://pinot.apache.org/ logo
k

Ken Krugler

01/26/2021, 3:16 PM
I’m trying to use the map-reduce job to build segments. In HadoopSegmentGenerationJobRunner.packPluginsToDistributedCache, there’s this code:
Copy code
File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
      try {
        TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);
      } catch (IOException e) {
        LOGGER.error("Failed to tar plugins directory", e);
        throw new RuntimeException(e);
      }
      job.addCacheArchive(pluginsTarGzFile.toURI());
This creates a
pinot-plugins.tar.gz
file in the Flink distribution directory, which is on my server. But as the Hadoop DistributedCache documentation states, “The 
DistributedCache
 assumes that the files specified via urls are already present on the 
FileSystem
 at the path specified by the url and are accessible by every machine in the cluster.”
So what you get is this error:
<http://java.io|java.io>.FileNotFoundException: File file:/path/to/distribution/apache-pinot-incubating-0.7.0-SNAPSHOT-bin/pinot-plugins.tar.gz does not exist
I think the job needs to use the staging directory (in HDFS) for this file (and any others going into the distributed cache).
k

Kishore G

01/26/2021, 3:19 PM
what is the fix?
k

Ken Krugler

01/26/2021, 3:25 PM
I think the tar file (in snippet above) should be generated in a temp dir, and then uploaded to the staging directory. and the staging dir URI is what’s added to the distributed cache
I think this might only be an error path through the code when a plugins dir is explicitly provided…trying without it now
k

Kishore G

01/26/2021, 3:27 PM
what do you mean by upload to staginging directory
I thought the addacheArchirve code is getting executed on the gateway node
k

Ken Krugler

01/26/2021, 3:28 PM
As part of the job spec file, you include a
stagingDir
configuration.
k

Kishore G

01/26/2021, 3:29 PM
so stagingDir should be on HDFS?
k

Ken Krugler

01/26/2021, 3:29 PM
And yes, the addCacheArchive() gets called on the server where you start the job. Which is why it has to be provided a URI to a file that’s available on every slave server. So it can’t be a file://xxx path.
k

Kishore G

01/26/2021, 3:30 PM
we thought that happens automatically
k

Ken Krugler

01/26/2021, 3:30 PM
And yes, stagingDir should be on HDFS (when running distributed). And if you don’t specify it as such, the job fails (as it should) because it’s not using the same file system as the input/output directories.
From the DistributedCache JavaDocs: “Applications specify the files, via urls (hdfs:// or http://) to be cached via the 
JobConf"
k

Kishore G

01/26/2021, 3:31 PM
got it! is this how it was from day one?
k

Ken Krugler

01/26/2021, 3:31 PM
It will work if you run locally, of course, because the file://path is accessible to the mappers
Or if every server has the shared drive mounted that contains the Flink distribution
Those are the only reasons why I think it could work as-is now
Maybe @Xiang Fu has some insights, I think he wrote this code. I could be reading it wrong, of course…
k

Kishore G

01/26/2021, 3:37 PM
what you are saying makes sense, but I thought job launcher pushes this to worker nodes
looks like its more of a pull from the worker node
k

Ken Krugler

01/26/2021, 3:42 PM
It’s a bit confusing…if you use the standard Hadoop command line
-files
parameter (as an example), then the standard Hadoop tool framework will copy the file(s) to HDFS first, before adding them to the JobConf as
<hdfs://blah>
paths. In the Pinot code, you need to do this first step (of copying to HDFS) yourself.
And then the Hadoop slaves will take care of copying these cache files from HDFS to a local directory (that part you don’t have to do anything special for)
k

Kishore G

01/26/2021, 3:48 PM
then the standard Hadoop tool framework will copy the file(s) to HDFS first
that what I thought would happen when we do it via code, do you know which staging directory will it copy it to?
k

Ken Krugler

01/26/2021, 4:02 PM
Each Hadoop job has a “staging” directory in the cluster
There’s a job-specific directory inside of that, where the archives (jar files), etc get copied
Taking off for a bit, I might file a PR for this
k

Kishore G

01/26/2021, 4:04 PM
thanks
x

Xiang Fu

01/26/2021, 6:09 PM
Thanks @Ken Krugler
k

Ken Krugler

01/26/2021, 9:05 PM
I just filed https://github.com/apache/incubator-pinot/issues/6492, looking at a fix now.
x

Xiang Fu

01/26/2021, 9:46 PM
Thanks!
in the branch
can you help validate if this one works and you can submit a PR for fixing it!
k

Ken Krugler

01/26/2021, 10:06 PM
Funny, looks very similar to what I’ve done:
Copy code
protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) {
    File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
    if (pluginsRootDir.exists()) {
      try {
        File pluginsTarGzFile = File.createTempFile("pinot-plugins", ".tar.gz");
        TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);
        
        // Copy to staging directory
        Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
        outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri());
        job.addCacheArchive(cachedPluginsTarball.toUri());
Working on a way to unit test…
x

Xiang Fu

01/26/2021, 10:07 PM
👍
k

Ken Krugler

01/26/2021, 10:07 PM
I’ve also got a change to
addDepsJarToDistributedCache
, which has the same issue
I’m hoping to try it out tonight. brb
Well, it’s almost ready for a PR - finally got a successful run at scale. I’ve fixed a number of bugs, I’ve got one more fix and more more issue to investigate, and then a refactoring to do, and cleaning up some test code, and editing some documentation.
@Xiang Fu what’s the expectation for the staging directory (specified in the job yaml) after the job finishes? Currently it has all of the input, output, and segmentTar files (the latter are duplicated in the outputDirUri directory). I would have thought that directory would be deleted when the job completed.
x

Xiang Fu

01/28/2021, 1:07 AM
I would expect the staging dir to be cleaned up
at least the directories created by the job
k

Ken Krugler

01/28/2021, 1:08 AM
OK, I’ll fix that too…
x

Xiang Fu

01/28/2021, 1:09 AM
Many thanks!
k

Ken Krugler

01/28/2021, 1:15 AM
@Xiang Fu what’s the “push type” when doing a Hadoop batch segment generation? Asking because it looks like
NormalizedDateSegmentNameGenerator
thinks it’s not APPEND, so you don’t get any timestamp in the generated filename.
And wondering why the /segmentTar sub-dir in the staging dir is even needed - couldn’t the mapper just generate the files directly in the output directory, versus having the main code do this when the job has finished?
Actually the staging dir will be cleaned up, the job just looks like it’s hung while copying 620 files from staging/segmentTar/ to the output dir.
That copy means every single generated segment byte is read from HDFS to the server where the job was run, and then written back to HDFS. Ouch.
x

Xiang Fu

01/28/2021, 2:50 AM
PushType is the way to push a segment to pinot, we have 3 right now,
TAR
upload the segment tar file to pinot controller;
URI
, save segment to deep store, notify controller th uri and let controller download and validate it .
METADATA
push both uri and metadata to avoid controller download
NormalizedDateSegmentNameGenerator
requires extra table configs like time column name and type to help it find out the start/end time (event time)
That copy is required as you don’t want the partial generated data to be copied to the output dir, especially for the full table replacement use case
k

Ken Krugler

01/29/2021, 10:41 PM
Hi @Xiang Fu I generated a PR, see https://github.com/apache/incubator-pinot/pull/6506
x

Xiang Fu

01/29/2021, 11:39 PM
Thanks! I will take a look!