Hi everyone, I was trying to use spark to do batch...
# general
m
Hi everyone, I was trying to use spark to do batch ingestion. But I got an error like this when I executed:
Copy code
ERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.
ERROR StatusLogger Reconfiguration failed: No configuration found for '533ddba' at 'null' in 'null'
Exception in thread "main" java.lang.ExceptionInInitializerError
	at org.apache.pinot.tools.admin.command.StartKafkaCommand.<init>(StartKafkaCommand.java:51)
	at org.apache.pinot.tools.admin.PinotAdministrator.<clinit>(PinotAdministrator.java:98)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:237)
	at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:813)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:927)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:936)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.NoSuchElementException
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:365)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at org.apache.pinot.tools.utils.KafkaStarterUtils.getKafkaConnectorPackageName(KafkaStarterUtils.java:54)
	at org.apache.pinot.tools.utils.KafkaStarterUtils.<clinit>(KafkaStarterUtils.java:46)
	... 12 more
It seems like spark couldn't find
org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
from Kafka plugin. I built pinot from source code on the
master
branch using command (because we use jdk8 in our machines):
Copy code
mvn clean install -DskipTests -Pbin-dist -T 4  -Djdk.version=8
My spark job using commands like this, which I've set
-DPlugins.dir
according to documentation:
Copy code
export PINOT_VERSION=0.10.0-SNAPSHOT
export PINOT_DISTRIBUTION_DIR=/home/xxx/apache-pinot-0.10.0-SNAPSHOT-bin
echo ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
cd ${PINOT_DISTRIBUTION_DIR}

${SPARK_HOME}/bin/spark-submit \
  --class org.apache.pinot.tools.admin.PinotAdministrator \
  --master "local[2]" \
  --deploy-mode client \
  --conf "spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
  --conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
  --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
  --conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" \
  ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
  LaunchDataIngestionJob  \
  -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/transcriptData/sparkIngestionJobSpec.yml
Is it because spark couldn't find my plugins' jars from
plugins.dir
, I'm not familiar with spark, do I need to add all plugins' jars to spark classpath using
--jars
or something? Could you help me?
k
Hi. Can I understand why you need to use Kafka in Batch Ingestion? Also, can you share the ingestions spec. In you spark-submit command, pinot-batch-ingestion-spark plugin is missing. It is located in
${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar
Also can you specify the spark version
Also the --class needs to be
org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand
m
Hi @User, thank you for your help. I don't need to use Kafka in batch ingestion. Because I used
org.apache.pinot.tools.admin.PinotAdministrator
as the main class before, and it seems to need to load static variable
SUBCOMMAND_MAP
which finally caused the error in this question. Now I change my configuration by the full guide you mentioned above. My new spark job command is like this:
Copy code
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre
export HADOOP_VERSION=2.7.2U17-11
export HADOOP_GUAVA_VERSION=11.0.2
export HADOOP_GSON_VERSION=2.2.4
export PINOT_VERSION=0.10.0-SNAPSHOT
export PINOT_DISTRIBUTION_DIR=/home/xxx/apache-pinot-0.10.0-SNAPSHOT-bin
cd ${PINOT_DISTRIBUTION_DIR}

${SPARK_HOME}/bin/spark-submit \
  --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
  --master "local[2]" \
  --deploy-mode client \
  --conf "spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
  --conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
  --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dplugins.include=pinot-hdfs -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
  --conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-hdfs/pinot-hdfs-${PINOT_VERSION}-shaded.jar" \
local:// ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
  -jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/transcriptData/sparkIngestionJobSpec.yml
My new ingestions spec is like this:
Copy code
name: 'spark'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
extraConfigs:
  stagingDir: <hdfs://namenodexxx/pinot/batch/transcript/staging>
  jobType: SegmentCreationAndTarPush
  inputDirURI: '<hdfs://namenode/home/xxx/pinot/examples/batch/transcriptData/>'
  includeFileNamePattern: 'glob:**/*.csv'
  outputDirURI: '<hdfs://namenode/home/xxx/pinot/segments>'
  overwriteOutput: true
pinotFSSpecs:
  - scheme: hdfs
    className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'transcript'
pinotClusterSpecs:
  - controllerURI: '<http://localhost:9199>'
pushJobSpec:
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
My spark version is
2.4.5
when I executed, I got an error like this:
Copy code
ERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.
ERROR StatusLogger Reconfiguration failed: No configuration found for '70dea4e' at 'null' in 'null'
Exception in thread "main" java.lang.NoSuchMethodException: org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.main([Ljava.lang.String;)
	at java.lang.Class.getMethod(Class.java:1786)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:42)
	at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
It seems like that
org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand
don't have a
main
method?
k
Yes, that was a bug which is fixed now. Can you pull the latest code from master and build it.
Or you can simply add the following to the class
Copy code
public static void main(String[] args) {
        PluginManager.get().init();
        (new CommandLine(new LaunchDataIngestionJobCommand())).execute(args);
    }
m
Hi @User, I add the main method in
LaunchDataIngestionJobCommand
class, it can works. But I find some problems in my job spec now. If I put
extraConfigs
in the same level with
executionFrameworkSpec
(as the documentation mentioned), I would have an error like this:
Copy code
Caused by: org.yaml.snakeyaml.error.YAMLException: Unable to find property 'extraConfigs' on class: org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
	at org.yaml.snakeyaml.introspector.PropertyUtils.getProperty(PropertyUtils.java:132)
	at org.yaml.snakeyaml.introspector.PropertyUtils.getProperty(PropertyUtils.java:121)
	at org.yaml.snakeyaml.constructor.Constructor$ConstructMapping.getProperty(Constructor.java:322)
	at org.yaml.snakeyaml.constructor.Constructor$ConstructMapping.constructJavaBean2ndStep(Constructor.java:240)
	... 31 more
If I put
extraConfigs
in the same level with
executionFrameworkSpec.name
(I saw `org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec#_extraConfigs`from source code) I would have an error like this:
Copy code
mapping values are not allowed here
 in 'string', line 8, column 15:
      extraConfigs:
                  ^

	at org.yaml.snakeyaml.scanner.ScannerImpl.fetchValue(ScannerImpl.java:871)
	at org.yaml.snakeyaml.scanner.ScannerImpl.fetchMoreTokens(ScannerImpl.java:360)
	at org.yaml.snakeyaml.scanner.ScannerImpl.checkToken(ScannerImpl.java:226)
	at org.yaml.snakeyaml.parser.ParserImpl$ParseBlockMappingKey.produce(ParserImpl.java:558)
	at org.yaml.snakeyaml.parser.ParserImpl.peekEvent(ParserImpl.java:158)
	at org.yaml.snakeyaml.parser.ParserImpl.checkEvent(ParserImpl.java:143)
	at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:224)
	at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:155)
	at org.yaml.snakeyaml.composer.Composer.composeDocument(Composer.java:122)
	at org.yaml.snakeyaml.composer.Composer.getSingleNode(Composer.java:105)
	at org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:120)
	at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:450)
	at org.yaml.snakeyaml.Yaml.loadAs(Yaml.java:427)
	at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.getSegmentGenerationJobSpec(IngestionJobLauncher.java:94)
	at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:102)
	at org.apache.pinot.tools.Command.call(Command.java:33)
	at org.apache.pinot.tools.Command.call(Command.java:29)
	at picocli.CommandLine.executeUserObject(CommandLine.java:1953)
	at picocli.CommandLine.access$1300(CommandLine.java:145)
	at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2352)
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2346)
	at picocli.CommandLine$RunLast.handle(CommandLine.java:2311)
	at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179)
	at picocli.CommandLine.execute(CommandLine.java:2078)
	at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.main(LaunchDataIngestionJobCommand.java:153)
My job spec is like this:
Copy code
executionFrameworkSpec:
  name: 'spark'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
jobType: SegmentCreationAndTarPush
  extraConfigs:
    stagingDir: '<hdfs://namenode/home/xxx/pinot/batch/transcript/staging>'
inputDirURI: '<hdfs://namenode/home/xxx/pinot/examples/batch/transcriptData/>'
outputDirURI: '<hdfs://namenode/home/xxx/pinot/segments>'
includeFileNamePattern: 'glob:**/*.csv'
overwriteOutput: true
pinotFSSpecs:
  - scheme: hdfs
    className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'transcript'
pinotClusterSpecs:
  - controllerURI: '<http://localhost:9199>'
pushJobSpec:
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
Is there any problems with my job spec?
k
Copy code
executionFrameworkSpec:
  name: 'spark'
  segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
  segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
  segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
  segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
  extraConfigs:
    stagingDir: '<hdfs://namenode/home/xxx/pinot/batch/transcript/staging>'
jobType: SegmentCreationAndTarPush
inputDirURI: '<hdfs://namenode/home/xxx/pinot/examples/batch/transcriptData/>'
outputDirURI: '<hdfs://namenode/home/xxx/pinot/segments>'
includeFileNamePattern: 'glob:**/*.csv'
overwriteOutput: true
pinotFSSpecs:
  - scheme: hdfs
    className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
recordReaderSpec:
  dataFormat: 'csv'
  className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
  configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
  tableName: 'transcript'
pinotClusterSpecs:
  - controllerURI: '<http://localhost:9199>'
pushJobSpec:
  pushAttempts: 2
  pushRetryIntervalMillis: 1000
this should work
m
It works for me now. Thank you very much💯