Alice
04/12/2022, 1:37 AMKevin Liu
04/12/2022, 2:08 AMAlice
04/12/2022, 3:21 AMChengxuan Wang
04/12/2022, 3:34 AMsunny
04/12/2022, 5:51 AM"tableIndexConfig": {
"segmentPartitionConfig": {
"columnPartitionMap": {
"subject": {
"functionName": "murmur",
"numPartitions": 3
}
}
},
And then add kafka topic partition (3->4) and produce data to kafka new partition.
But there is no new segment in pinot. So it doesn’t show data in kafka new partition. Although changing configuration numPartitions (3->4) in pinot and rebalance servers, the result is same.
It seems that there is no problem in realtime table (none partition). After adding kafka partition and then produce data to new partition, new segment is added in pinot. so It shows data in kafka new partition.
Is it normal case? Otherwise, what should I check?
Thanks :)Satyam Raj
04/12/2022, 7:52 AMexport PINOT_VERSION=0.10.0
export PINOT_DISTRIBUTION_DIR=/Users/satyam.raj/dataplatform/pinot-dist/apache-pinot-0.10.0-bin
bin/spark-submit \
--class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand \
--master "local[8]" \
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark/pinot-batch-ingestion-spark-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-s3/pinot-s3-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-input-format/pinot-parquet/pinot-parquet-${PINOT_VERSION}-shaded.jar:${PINOT_DISTRIBUTION_DIR}/plugins/pinot-file-system/pinot-hdfs/pinot-hdfs-${PINOT_VERSION}-shaded.jar" \
${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
-jobSpecFile '/Users/satyam.raj/dataplatform/pinot-dist/batchjob-spec/batch-job-spec.yaml'
Getting the below weird error:
Exception in thread "main" java.lang.VerifyError: Bad type on operand stack
Exception Details:
Location:
org/apache/spark/metrics/sink/MetricsServlet.<init>(Ljava/util/Properties;Lcom/codahale/metrics/MetricRegistry;Lorg/apache/spark/SecurityManager;)V @116: invokevirtual
Reason:
Type 'com/codahale/metrics/json/MetricsModule' (current frame, stack[2]) is not assignable to 'com/fasterxml/jackson/databind/Module'
Current Frame:
bci: @116
flags: { }
locals: { 'org/apache/spark/metrics/sink/MetricsServlet', 'java/util/Properties', 'com/codahale/metrics/MetricRegistry', 'org/apache/spark/SecurityManager' }
stack: { 'org/apache/spark/metrics/sink/MetricsServlet', 'com/fasterxml/jackson/databind/ObjectMapper', 'com/codahale/metrics/json/MetricsModule' }
Bytecode:
0000000: 2a2b b500 2a2a 2cb5 002f 2a2d b500 5c2a
0000010: b700 7e2a 1280 b500 322a 1282 b500 342a
0000020: 03b5 0037 2a2b 2ab6 0084 b600 8ab5 0039
0000030: 2ab2 008f 2b2a b600 91b6 008a b600 95bb
0000040: 0014 592a b700 96b6 009c bb00 1659 2ab7
0000050: 009d b600 a1b8 00a7 b500 3b2a bb00 7159
0000060: b700 a8bb 00aa 59b2 00b0 b200 b32a b600
0000070: b5b7 00b8 b600 bcb5 003e b1
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.util.Utils$.classForName(Utils.scala:238)
at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:200)
at org.apache.spark.metrics.MetricsSystem$$anonfun$registerSinks$1.apply(MetricsSystem.scala:196)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:130)
at org.apache.spark.metrics.MetricsSystem.registerSinks(MetricsSystem.scala:196)
at org.apache.spark.metrics.MetricsSystem.start(MetricsSystem.scala:104)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:514)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:117)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2550)
at org.apache.spark.SparkContext.getOrCreate(SparkContext.scala)
at org.apache.pinot.plugin.ingestion.batch.spark.SparkSegmentGenerationJobRunner.run(SparkSegmentGenerationJobRunner.java:196)
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:146)
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.runIngestionJob(IngestionJobLauncher.java:125)
at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:121)
at org.apache.pinot.tools.Command.call(Command.java:33)
at org.apache.pinot.tools.Command.call(Command.java:29)
at picocli.CommandLine.executeUserObject(CommandLine.java:1953)
at picocli.CommandLine.access$1300(CommandLine.java:145)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2352)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2346)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2311)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2179)
at picocli.CommandLine.execute(CommandLine.java:2078)
at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.main(LaunchDataIngestionJobCommand.java:153)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:855)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:930)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:939)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Padma Malladi
04/12/2022, 2:55 PMLars-Kristian Svenøy
04/13/2022, 12:37 PMfrancoisa
04/13/2022, 2:23 PMjava.lang.RuntimeException: shaded.com.fasterxml.jackson.databind.JsonMappingException: Infinite recursion (StackOverflowError) (through reference chain: org.apache.pinot.spi.data.readers.GenericRow["fieldToValueMap"]->java.util.Collections$UnmodifiableMap["$MULTIPLE_RECORDS_KEY$"]->java.util.ArrayList[0]->org.apache.pinot.spi.data.readers.GenericRow["fieldTo>
at org.apache.pinot.spi.data.readers.GenericRow.toString(GenericRow.java:247) ~[pinot-all-0.10.0-jar-with-dependencies.jar:0.10.0-30c4635bfeee88f88aa9c9f63b93bcd4a650607f]
at java.util.Formatter$FormatSpecifier.printString(Formatter.java:3031) ~[?:?]
at java.util.Formatter$FormatSpecifier.print(Formatter.java:2908) ~[?:?]
at java.util.Formatter.format(Formatter.java:2673) ~[?:?]
at java.util.Formatter.format(Formatter.java:2609) ~[?:?]
at java.lang.String.format(String.java:2897) ~[?:?]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.processStreamEvents(LLRealtimeSegmentDataManager.java:543) ~[pinot-all-0.10.0-jar-with-dependencies.jar:0.10.0-30c4635bfeee88f88aa9c9f63b93bcd4a650607f]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.consumeLoop(LLRealtimeSegmentDataManager.java:420) ~[pinot-all-0.10.0-jar-with-dependencies.jar:0.10.0-30c4635bfeee88f88aa9c9f63b93bcd4a650607f]
at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager$PartitionConsumer.run(LLRealtimeSegmentDataManager.java:598) [pinot-all-0.10.0-jar-with-dependencies.jar:0.10.0-30c4635bfeee88f88aa9c9f63b93bcd4a650607f]
at java.lang.Thread.run(Thread.java:829) [?:?]
TransformConfig as Folow ->
"complexTypeConfig": {
"fieldsToUnnest": [
"data.attributes.regularTimes"
],
"delimiter": ".",
"collectionNotUnnestedToJson": "NON_PRIMITIVE"
}
The other table as the same complexTypeConfig but based on another field. Any idea ?Bodu Janardhan
04/13/2022, 2:25 PMAlice
04/14/2022, 4:37 AMAlice
04/14/2022, 5:29 AMcoco
04/14/2022, 8:13 AMMonica
04/14/2022, 8:47 AMERROR StatusLogger Unrecognized format specifier [d]
ERROR StatusLogger Unrecognized conversion specifier [d] starting at position 16 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [thread]
ERROR StatusLogger Unrecognized conversion specifier [thread] starting at position 25 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [level]
ERROR StatusLogger Unrecognized conversion specifier [level] starting at position 35 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [logger]
ERROR StatusLogger Unrecognized conversion specifier [logger] starting at position 47 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [msg]
ERROR StatusLogger Unrecognized conversion specifier [msg] starting at position 54 in conversion pattern.
ERROR StatusLogger Unrecognized format specifier [n]
ERROR StatusLogger Unrecognized conversion specifier [n] starting at position 56 in conversion pattern.
ERROR StatusLogger Reconfiguration failed: No configuration found for '533ddba' at 'null' in 'null'
Exception in thread "main" java.lang.ExceptionInInitializerError
at org.apache.pinot.tools.admin.command.StartKafkaCommand.<init>(StartKafkaCommand.java:51)
at org.apache.pinot.tools.admin.PinotAdministrator.<clinit>(PinotAdministrator.java:98)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:237)
at <http://org.apache.spark.deploy.SparkSubmit.org|org.apache.spark.deploy.SparkSubmit.org>$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:813)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:927)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:936)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.NoSuchElementException
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:365)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at org.apache.pinot.tools.utils.KafkaStarterUtils.getKafkaConnectorPackageName(KafkaStarterUtils.java:54)
at org.apache.pinot.tools.utils.KafkaStarterUtils.<clinit>(KafkaStarterUtils.java:46)
... 12 more
It seems like spark couldn't find org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory
from Kafka plugin. I built pinot from source code on the master
branch using command (because we use jdk8 in our machines):
mvn clean install -DskipTests -Pbin-dist -T 4 -Djdk.version=8
My spark job using commands like this, which I've set -DPlugins.dir
according to documentation:
export PINOT_VERSION=0.10.0-SNAPSHOT
export PINOT_DISTRIBUTION_DIR=/home/xxx/apache-pinot-0.10.0-SNAPSHOT-bin
echo ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar
cd ${PINOT_DISTRIBUTION_DIR}
${SPARK_HOME}/bin/spark-submit \
--class org.apache.pinot.tools.admin.PinotAdministrator \
--master "local[2]" \
--deploy-mode client \
--conf "spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
--conf "spark.yarn.appMasterEnv.JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.171-8.b10.el7_5.x86_64/jre" \
--conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \
--conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar" \
${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \
LaunchDataIngestionJob \
-jobSpecFile ${PINOT_DISTRIBUTION_DIR}/examples/batch/transcriptData/sparkIngestionJobSpec.yml
Is it because spark couldn't find my plugins' jars from plugins.dir
, I'm not familiar with spark, do I need to add all plugins' jars to spark classpath using --jars
or something? Could you help me?Harish Bohara
04/14/2022, 10:50 PMHarish Bohara
04/15/2022, 8:07 AMNikhil Varma
04/16/2022, 4:14 AMDiogo Baeder
04/16/2022, 4:45 AMinputDirURI: '/foo/bar'
includeFileNamePattern: 'glob:baz/**/*.json'
doesn't work if you want to ingest from JSON files inside /foo/bar/baz
. Instead, this should be used:
inputDirURI: '/foo/bar/baz'
includeFileNamePattern: 'glob:**/*.json'
notice how inputDirURI
goes to the deepest possible fixed subdirectory, and then the pattern will start from there.Kevin Xu
04/18/2022, 9:27 AMcoco
04/19/2022, 4:28 AMHarish Bohara
04/20/2022, 9:37 AMYahya Zuberi
04/20/2022, 12:53 PMSaumya Upadhyay
04/20/2022, 1:34 PMJoshua Seagroves
04/21/2022, 5:08 PMDiana Arnos
04/21/2022, 6:06 PMMesut Özen
04/21/2022, 8:48 PMNizar Hejazi
04/22/2022, 9:29 AMerik bergsten
04/22/2022, 2:28 PMCarl
04/22/2022, 2:44 PMTejaswini Edara
04/25/2022, 11:51 AM