Monica
04/14/2022, 8:47 AMERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.
ERROR StatusLogger Reconfiguration failed: No configuration found for '533ddba' at 'null' in 'null'
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.pinot.tools.admin.command.StartKafkaCommand.<init>(StartKafkaCommand.java:51)
at org.apache.pinot.tools.admin.PinotAdministrator.<clinit>(PinotAdministrator.java:98)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:237)
at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:813)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:927)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:936)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.NoSuchElementException
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:365)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.pinot.tools.utils.KafkaStarterUtils.getKafkaConnectorPackageName(KafkaStarterUtils.java:54)
at org.apache.pinot.tools.utils.KafkaStarterUtils.<clinit>(KafkaStarterUtils.java:46)
... 12 more
It seems like spark couldn't find org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
from Kafka plugin. I built pinot from source code on the master
branch using command (because we use jdk8 in our machines):
mvn clean install -DskipTests -Pbin-dist -T 4 -Djdk.version=8
My spark job using commands like this, which I've set -DPlugins.dir
according to documentation:
export PINOT_VERSION=0.10.0-SNAPSHOT
export PINOT_DISTRIBUTION_DIR=/home/xxx/apache-pinot-0.10.0-SNAPSHOT-bin
echo ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
cd ${PINOT_DISTRIBUTION_DIR}
${SPARK_HOME}/bin/spark-submit \
--class org.apache.pinot.tools.admin.PinotAdministrator \
--master "local[2]" \
--deploy-mode client \
--conf "spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" \
${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
LaunchDataIngestionJob \
-jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/transcriptData/sparkIngestionJobSpec.yml
Is it because spark couldn't find my plugins' jars from plugins.dir
, I'm not familiar with spark, do I need to add all plugins' jars to spark classpath using --jars
or something? Could you help me?Kartik Khare
04/14/2022, 9:02 AM${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar
Kartik Khare
04/14/2022, 9:03 AMKartik Khare
04/14/2022, 9:07 AMorg.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand
Kartik Khare
04/14/2022, 9:08 AMMonica
04/14/2022, 10:08 AMorg.apache.pinot.tools.admin.PinotAdministrator
as the main class before, and it seems to need to load static variable
SUBCOMMAND_MAP
which finally caused the error in this question. Now I change my configuration by the full guide you mentioned above.
My new spark job command is like this:
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre
export HADOOP_VERSION=2.7.2U17-11
export HADOOP_GUAVA_VERSION=11.0.2
export HADOOP_GSON_VERSION=2.2.4
export PINOT_VERSION=0.10.0-SNAPSHOT
export PINOT_DISTRIBUTION_DIR=/home/xxx/apache-pinot-0.10.0-SNAPSHOT-bin
cd ${PINOT_DISTRIBUTION_DIR}
${SPARK_HOME}/bin/spark-submit \
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
--master "local[2]" \
--deploy-mode client \
--conf "spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dplugins.include=pinot-hdfs -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins-external/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-hdfs/pinot-hdfs-${PINOT_VERSION}-shaded.jar" \
local:// ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
-jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/transcriptData/sparkIngestionJobSpec.yml
My new ingestions spec is like this:
name: 'spark'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
extraConfigs:
stagingDir: <hdfs://namenodexxx/pinot/batch/transcript/staging>
jobType: SegmentCreationAndTarPush
inputDirURI: '<hdfs://namenode/home/xxx/pinot/examples/batch/transcriptData/>'
includeFileNamePattern: 'glob:**/*.csv'
outputDirURI: '<hdfs://namenode/home/xxx/pinot/segments>'
overwriteOutput: true
pinotFSSpecs:
- scheme: hdfs
className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'transcript'
pinotClusterSpecs:
- controllerURI: '<http://localhost:9199>'
pushJobSpec:
pushAttempts: 2
pushRetryIntervalMillis: 1000
My spark version is 2.4.5
when I executed, I got an error like this:
ERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.
ERROR StatusLogger Reconfiguration failed: No configuration found for '70dea4e' at 'null' in 'null'
Exception in thread "main" java.lang.NoSuchMethodException: org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.main([Ljava.lang.String;)
at java.lang.Class.getMethod(Class.java:1786)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:42)
at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:167)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:924)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
It seems like that org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand
don't have a main
method?Kartik Khare
04/14/2022, 4:22 PMKartik Khare
04/14/2022, 4:22 PMpublic static void main(String[] args) {
PluginManager.get().init();
(new CommandLine(new LaunchDataIngestionJobCommand())).execute(args);
}
Monica
04/15/2022, 4:23 AMLaunchDataIngestionJobCommand
class, it can works. But I find some problems in my job spec now.
If I put extraConfigs
in the same level with executionFrameworkSpec
(as the documentation mentioned), I would have an error like this:
Caused by: org.yaml.snakeyaml.error.YAMLException: Unable to find property 'extraConfigs' on class: org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec
at org.yaml.snakeyaml.introspector.PropertyUtils.getProperty(PropertyUtils.java:132)
at org.yaml.snakeyaml.introspector.PropertyUtils.getProperty(PropertyUtils.java:121)
at org.yaml.snakeyaml.constructor.Constructor$ConstructMapping.getProperty(Constructor.java:322)
at org.yaml.snakeyaml.constructor.Constructor$ConstructMapping.constructJavaBean2ndStep(Constructor.java:240)
... 31 more
If I put extraConfigs
in the same level with executionFrameworkSpec.name
(I saw `org.apache.pinot.spi.ingestion.batch.spec.ExecutionFrameworkSpec#_extraConfigs`from source code) I would have an error like this:
mapping values are not allowed here
in 'string', line 8, column 15:
extraConfigs:
^
at org.yaml.snakeyaml.scanner.ScannerImpl.fetchValue(ScannerImpl.java:871)
at org.yaml.snakeyaml.scanner.ScannerImpl.fetchMoreTokens(ScannerImpl.java:360)
at org.yaml.snakeyaml.scanner.ScannerImpl.checkToken(ScannerImpl.java:226)
at org.yaml.snakeyaml.parser.ParserImpl$ParseBlockMappingKey.produce(ParserImpl.java:558)
at org.yaml.snakeyaml.parser.ParserImpl.peekEvent(ParserImpl.java:158)
at org.yaml.snakeyaml.parser.ParserImpl.checkEvent(ParserImpl.java:143)
at org.yaml.snakeyaml.composer.Composer.composeMappingNode(Composer.java:224)
at org.yaml.snakeyaml.composer.Composer.composeNode(Composer.java:155)
at org.yaml.snakeyaml.composer.Composer.composeDocument(Composer.java:122)
at org.yaml.snakeyaml.composer.Composer.getSingleNode(Composer.java:105)
at org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:120)
at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:450)
at org.yaml.snakeyaml.Yaml.loadAs(Yaml.java:427)
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.getSegmentGenerationJobSpec(IngestionJobLauncher.java:94)
at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:102)
at org.apache.pinot.tools.Command.call(Command.java:33)
at org.apache.pinot.tools.Command.call(Command.java:29)
at picocli.CommandLine.executeUserObject(CommandLine.java:1953)
at picocli.CommandLine.access$1300(CommandLine.java:145)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2352)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2346)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2311)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179)
at picocli.CommandLine.execute(CommandLine.java:2078)
at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.main(LaunchDataIngestionJobCommand.java:153)
My job spec is like this:
executionFrameworkSpec:
name: 'spark'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
jobType: SegmentCreationAndTarPush
extraConfigs:
stagingDir: '<hdfs://namenode/home/xxx/pinot/batch/transcript/staging>'
inputDirURI: '<hdfs://namenode/home/xxx/pinot/examples/batch/transcriptData/>'
outputDirURI: '<hdfs://namenode/home/xxx/pinot/segments>'
includeFileNamePattern: 'glob:**/*.csv'
overwriteOutput: true
pinotFSSpecs:
- scheme: hdfs
className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'transcript'
pinotClusterSpecs:
- controllerURI: '<http://localhost:9199>'
pushJobSpec:
pushAttempts: 2
pushRetryIntervalMillis: 1000
Is there any problems with my job spec?Kartik Khare
04/15/2022, 6:30 AMexecutionFrameworkSpec:
name: 'spark'
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentMetadataPushJobRunner'
extraConfigs:
stagingDir: '<hdfs://namenode/home/xxx/pinot/batch/transcript/staging>'
jobType: SegmentCreationAndTarPush
inputDirURI: '<hdfs://namenode/home/xxx/pinot/examples/batch/transcriptData/>'
outputDirURI: '<hdfs://namenode/home/xxx/pinot/segments>'
includeFileNamePattern: 'glob:**/*.csv'
overwriteOutput: true
pinotFSSpecs:
- scheme: hdfs
className: org.apache.pinot.plugin.filesystem.HadoopPinotFS
recordReaderSpec:
dataFormat: 'csv'
className: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReader'
configClassName: 'org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig'
tableSpec:
tableName: 'transcript'
pinotClusterSpecs:
- controllerURI: '<http://localhost:9199>'
pushJobSpec:
pushAttempts: 2
pushRetryIntervalMillis: 1000
Kartik Khare
04/15/2022, 6:30 AMMonica
04/15/2022, 7:31 AM