https://pinot.apache.org/ logo
Join Slack
Powered by
# general
  • y

    Yarden Rokach

    03/25/2025, 11:54 AM
    Hey all! Thanks again for attending the meetup last week! As promised, here’s the

    meetup recording▾

    📖 Slides: Available on Barkha’s LinkedIn here -- 📩 Stay updated on future meetups, blogs, and more- *RSVP to our community newsletter* (plus, get a chance to win a t-shirt! 🎉) See you at the next event!
  • s

    San Kumar

    03/25/2025, 1:27 PM
    Hello Team we are using hourly segment and pushing the data to offline table. we want to store 5 years data ..is there any limitation on numbers of segments. We have total hours is 43,800 for 5 years, Hence it will create 43,800 segments. will pinot allow those much of segments.
    m
    • 2
    • 1
  • s

    San Kumar

    03/26/2025, 12:42 PM
    Hello Team
  • s

    San Kumar

    03/26/2025, 12:42 PM
    We created an offline table and are using a job spec to load or update segments, where all input files are in CSV format. Is it necessary to use JSON, Proto, or Avro as the input format in production? Will these formats consume more disk space or cause performance issues? I’ve noticed that CSV uploads segments faster, and we want to process around 3.5GB of data per hour.
    m
    • 2
    • 3
  • k

    KC Satish

    03/27/2025, 9:28 AM
    Subject: How to access Pinot system.tables? Hello, I'm able to access our application Pinot tables though pinot-controller UI. so far, Ok. But I'm not able to access tables like "system.connections".. Could someone guide how I can access system. tables? Below is the error I see when try access one of the system table.
  • p

    piby

    03/28/2025, 9:47 AM
    Hey all! Is there a way to mirror traffic (requests and responses) in Pinot? We want to monitor and analyse the traffic and react quickly in case of errors. Broker logs are unstructured and hard to analyse. We don’t know which user made this query or what is the source ip address or what is the query latency or number of segments scanned for this particular query. From time to time, we have found that prometheus metrics are not accurate. Simply pushing all requests (with username and all other headers ) and responses (just the query stats and error traces in case of errors, no data) to Kafka would be immensely useful to keep an eye on how Pinot cluster is utilized. We can even connect this kafka topic to an internal realtime table in Pinot and monitor everything right within Pinot and analyse traffic using SQL. Can this be done via some kind of plugin? I am willing to work on this if someone can point me to the right direction. Thanks!
  • m

    Mayank

    03/28/2025, 1:53 PM
    Aggregate stats are available via metrics. If you want individual query level stats, the broker query log is very structured
    p
    p
    • 3
    • 2
  • c

    charlie

    03/28/2025, 7:43 PM
    How have folks approached writing a test suite to be used while upgrading? These are the questions I'm most interested in right now: • Do you write tests for all of your queries or a subset? What does it take to gain confidence in the upgrade? • How do you deal with shared state between tests as your test suite grows (they all read from the same underlying table with data added for each test)? How do you reason about tests in a large suite given shared state?
    • 1
    • 1
  • y

    Yarden Rokach

    04/03/2025, 10:56 AM
    Host an ApachePinot Meetup in Your city with 𝗠𝗲𝗲𝘁𝘂𝗽 𝗶𝗻 𝗮 𝗕𝗼𝘅! 🎁 Whether you’re a seasoned organizer or planning your first-ever event, Meetup-in-a-Box makes it simple and fun to bring the community together! This complete toolkit has everything you need: ✅ Pre-made decks, demos & resource lists ✅ Community presentation template ✅ QR codes & links to Pinot resources ✅ Swag designs, mockups & official logos ✅ Event tracker & logistical support from StarTree (meetup setup, cross-promotion, swag & F&B funding) 𝗥𝗲𝗮𝗱𝘆 𝘁𝗼 𝗴𝗲𝘁 𝘀𝘁𝗮𝗿𝘁𝗲𝗱? Visit the program page, fill out the interest form, and we’ll get back to you shortly for the kickoff call: https://startree.ai/meetupinabox
  • t

    telugu bharadwaj

    04/04/2025, 8:30 AM
    Hello team, I am trying to set up S3 as the deep store for Pinot, but I’m facing issues. The configuration provided in the documentation is for version 0.6.0, and in this version, the joins are not working as expected. I want to use the latest version, but the configuration for that version isn’t working as it does in 0.6.0. Can you please assist me with this?
  • s

    San Kumar

    04/04/2025, 8:46 AM
    Hello Team if realtime table design for proto serializer as stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder", and it has minion configuration
    Copy code
    "taskTypeConfigsMap": {
                "RealtimeToOfflineSegmentsTask": {
                  "schedule": "*/15 * * ** ?",
                  "bucketTimePeriod": "6h",
                  "bufferTimePeriod": "1d"
                }
              }
    when we create off line table on same schema then what will file format it will store what file format? in offline table we have config
    Copy code
    "ingestionConfig": {
              "continueOnError": false,
              "rowTimeValueCheck": false,
              "batchIngestionConfig": {
                "consistentDataPush": false,
                "segmentIngestionType": "APPEND",
                "segmentIngestionFrequency": "HOURLY"
              },
    is there anything we have to specify for file format in offline table when there is an hybrid table.How I ensure that offline table also use same file compression as real time. in summary Hybrid table definition if Realtime --> proto buffet --> what will offline table segment format type.is it csv /json/proto. How I can confirm Hybrid table definition if real time Avro ===>> what will offline table segment type
  • s

    San Kumar

    04/04/2025, 8:52 AM
    Hello Team how we create multiple tenent in pinot
    m
    • 2
    • 1
  • s

    Shiva Sharana

    04/04/2025, 9:35 AM
    Hello all, 1] Wanted to know how to calculate the Median value for a particular column in Pinot version v1.0.1,[Not using Percentile tho , as it returns max - min value] 2] and how to find the diff() between two rows current and previous rows without using Join function.
    m
    • 2
    • 2
  • s

    San Kumar

    04/05/2025, 1:10 PM
    Hello Team can we provide primary key in offline APPEND type table and push the segment sothat record will replace if exist else append.Is it possible in pinot.
    m
    • 2
    • 1
  • s

    San Kumar

    04/06/2025, 4:50 AM
    Hello Team I am currently looking for documentation related to segment file compression and storage formats in Apache Pinot. I find the existing documentation somewhat unclear. From my understanding, regardless of whether we use JSON, Proto, or Avro, segments are always stored using some form of file compression. This seems to be independent of the serialization format used when writing messages to the topic. Is my understanding correct? We are planning to implement a Pinot-based solution in the cloud, but without a clear understanding of segment compression, we are unable to determine whether Pinot will be cost-effective in terms of storage. Could someone please provide clarification on this topic? Your insights will greatly assist us in making an informed decision regarding our Pinot-based application.
    x
    • 2
    • 2
  • g

    Georgi Andonov

    04/06/2025, 9:34 PM
    Hello, everyone! I currently have this set up for Kafka ingestion in Pinot - I ingest data from a Kafka topic of the form Id - INT, Price - DOUBLE, UnixTime - LONG into a Pinot REALTIME table. I wanted to ask if there is a way to use the UnixTime timestamp from the ingested data as the timestamp column in the table - to use the value in the ingested message as Timestamp instead of ingestion time?
    m
    x
    • 3
    • 16
  • k

    kranthi kumar

    04/09/2025, 8:54 AM
    Hello Everyone, I am working towards a project to ingest historic data to pinot. We have nearly 50 Peta bytes of data stored in S3. We have to process that data to extract metadata and store it in pinot tables. Previously we were publishing to kafka and pinot was reading from the kafka. Now, since this is historic data , we want to move the flow away from real time streaming. I have read about the batch ingestion via minions and spark jobs in the documentation. I want to know which of those serves better for our case considering the huge amount of data we have. Any suggestions and if possible providing any references by anyone will be much helpful to my work.
    x
    • 2
    • 2
  • k

    kranthi kumar

    04/09/2025, 9:43 AM
    Hello all, If anyone has worked on running spark jobs to batch ingest data directly to pinot tables, please explain the steps and share your knowledge on it , any help is appreciated.
  • s

    San Kumar

    04/10/2025, 9:27 AM
    We are currently performing batch injection into a Pinot table using the provided example. In our first iteration, we have the following data:
    Copy code
    event_time, source, country, event_type
    epochmilli1, nokia, ind, event1
    epochmilli1, nokia, ind, event2
    epochmilli2, samsung, USA, event2
    We uploaded this data to Pinot, resulting in the following records:
    Copy code
    epochmilli1, nokia, ind, event2
    epochmilli2, samsung, USA, event0
    Subsequently, we received new data:
    Copy code
    epochmilli1, nokia, ind, event3
    epochmilli1, Apple, ind, event0
    Our program processed this new data and prepared it as follows:
    Copy code
    epochmilli1, nokia, ind, event3
    epochmilli1, Apple, ind, event0
    We aim to update the segment to reflect the final records, which should be:
    Copy code
    epochmilli1, nokia, ind, event3
    epochmilli2, samsung, USA, event0
    epochmilli1, Apple, ind, event0
    To clarify, we want to replace the record for
    epochmilli1
    where the device type is
    nokia
    and add any new records for device types that are not already in the database. Could you please provide guidance on how to achieve this update in the Pinot table? we will use offline table only no kafka topic use and no upserttable
    x
    • 2
    • 3
  • l

    Lakshya Devani

    04/10/2025, 9:41 AM
    Hi team, I have a cluster which has a realtime table ingesting from kafka. everytime I delete the segments, the connection between the table and the kafka breaks Can any one help me understand the root cause here.
    Copy code
    2025/04/10 09:34:36.948 INFO [HelixInstanceDataManager] [HelixTaskExecutor-message_handle_thread_21] Deleted segment: ruleset_evaluation_shadow_test_realtime_2__2__47__202
    50410T0520Z from table: ruleset_evaluation_shadow_test_realtime_2_REALTIME
    2025/04/10 09:34:37.177 INFO [RealtimeSegmentDataManager_ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] [ruleset_evaluation_shadow_test_realtime_2__2__4
    9__20250410T0716Z] Recreating stream consumer for topic partition ruleset_evaluation_shadow_test_realtime_2_REALTIME-topic.ruleset.evaluation.shadow.test-2, reason: Too ma
    ny transient errors
    2025/04/10 09:34:37.177 ERROR [KafkaConsumer] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] [Consumer clientId=ruleset_evaluation_shadow_test_realtime
    _2_REALTIME-topic.ruleset.evaluation.shadow.test-2, groupId=msk_consumer] Failed to close coordinator
    org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1026) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync(ConsumerCoordinator.java:1087) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:919) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2366) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2333) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2283) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.close(KafkaPartitionLevelConnectionHandler.java:118) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.closePartitionGroupConsumer(RealtimeSegmentDataManager.java:1256) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.recreateStreamConsumer(RealtimeSegmentDataManager.java:1860) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.handleTransientStreamErrors(RealtimeSegmentDataManager.java:436) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.consumeLoop(RealtimeSegmentDataManager.java:484) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager$PartitionConsumer.run(RealtimeSegmentDataManager.java:766) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
    Caused by: java.lang.InterruptedException
            ... 18 more
    2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Metrics scheduler closed
    2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Closing reporter org.apache.kafka.common.metrics.JmxReporter
    2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Metrics reporters closed
    2025/04/10 09:34:37.178 INFO [AppInfoParser] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] App info kafka.consumer for ruleset_evaluation_shadow_test_realtime_2_REALTIME-topic.ruleset.evaluation.shadow.test-2 unregistered
    what is happening in these logs
    x
    • 2
    • 9
  • k

    kranthi kumar

    04/10/2025, 11:36 AM
    Hello Everyone, I am trying to batch ingest data to pinot via spark jobs, I am running spark on Amazon EMR and my pinot is hosted on a EKS Cluster, i gave S3 Full access policies to my EMR, EKS IAM roles, But still blocked with this error
    Copy code
    Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to load credentials from any of the providers in the chain AwsCredentialsProviderChain(credentialsProviders=[SystemPropertyCredentialsProvider(), EnvironmentVariableCredentialsProvider(), WebIdentityTokenCredentialsProvider(), ProfileCredentialsProvider(profileName=default, profileFile=ProfileFile(sections=[])), ContainerCredentialsProvider(), InstanceProfileCredentialsProvider()]) : [SystemPropertyCredentialsProvider(): Unable to load credentials from system settings. Access key must be specified either via environment variable (AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)., EnvironmentVariableCredentialsProvider(): Unable to load credentials from system settings. Access key must be specified either via environment variable (AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)., WebIdentityTokenCredentialsProvider(): Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set., ProfileCredentialsProvider(profileName=default, profileFile=ProfileFile(sections=[])): Profile file contained no credentials for profile 'default': ProfileFile(sections=[]), ContainerCredentialsProvider(): Cannot fetch credentials from container - neither AWS_CONTAINER_CREDENTIALS_FULL_URI or AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variables are set., InstanceProfileCredentialsProvider(): Failed to load credentials from IMDS.]
    	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
    	at software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:130)
    	at software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
    	at software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:129)
    	at software.amazon.awssdk.auth.credentials.AwsCredentialsProvider.resolveIdentity(AwsCredentialsProvider.java:54)
    	at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.lambda$trySelectAuthScheme$4(S3AuthSchemeInterceptor.java:163)
    	at software.amazon.awssdk.core.internal.util.MetricUtils.reportDuration(MetricUtils.java:77)
    	at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.trySelectAuthScheme(S3AuthSchemeInterceptor.java:163)
    	at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.selectAuthScheme(S3AuthSchemeInterceptor.java:84)
    	at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.beforeExecution(S3AuthSchemeInterceptor.java:64)
    	at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.lambda$beforeExecution$1(ExecutionInterceptorChain.java:59)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    	at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.beforeExecution(ExecutionInterceptorChain.java:59)
    	at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.runInitialInterceptors(AwsExecutionContextBuilder.java:248)
    	at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:138)
    	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsSyncClientHandler.java:67)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$0(BaseSyncClientHandler.java:62)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:60)
    	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:52)
    	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:60)
    	at software.amazon.awssdk.services.s3.DefaultS3Client.getObject(DefaultS3Client.java:5570)
    	at org.apache.pinot.plugin.filesystem.S3PinotFS.copyToLocalFile(S3PinotFS.java:569)
    	at org.apache.pinot.spi.filesystem.NoClosePinotFS.copyToLocalFile(NoClosePinotFS.java:98)
    	at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner$1.call(SparkSegmentGenerationJobRunner.java:263)
    	at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner$1.call(SparkSegmentGenerationJobRunner.java:212)
    	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
    	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
    	at scala.collection.Iterator.foreach(Iterator.scala:943)
    	at scala.collection.Iterator.foreach$(Iterator.scala:943)
    	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    	at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1047)
    	at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1047)
    	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2539)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
    	at org.apache.spark.scheduler.Task.run(Task.scala:152)
    	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
    	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    	at java.base/java.lang.Thread.run(Thread.java:840)
    My JobSpec. executionFrameworkSpec: name: 'spark' segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner' segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner' segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner' segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner' extraConfigs: stagingDir: s3://aws-logs-686118793080-us-east-1/metadata/staging/ jobType: SegmentCreationAndMetadataPush inputDirURI: 's3://aws-logs-686118793080-us-east-1/metadata/basic-attribute/date=2025-04-10/' outputDirURI: 's3://aws-logs-686118793080-us-east-1/metadata/Output_Metadata/' overwriteOutput: true pinotFSSpecs: - scheme: s3 className: org.apache.pinot.plugin.filesystem.S3PinotFS configs: region: 'us-east-1' recordReaderSpec: dataFormat: 'json' className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader' tableSpec: tableName: 'bat_backfill_emr_REALTIME' schemaURI: 's3://aws-logs-686118793080-us-east-1/metadata/batSchema.json' tableConfigURI: 's3://aws-logs-686118793080-us-east-1/metadata/batTable.json' pinotClusterSpecs: - controllerURI: 'http://localhost:9000' pushJobSpec: pushParallelism: 2 pushAttempts: 20 pushRetryIntervalMillis: 1000 . My spark submit command spark-submit --deploy-mode cluster --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand --master yarn --conf spark.driver.extraJavaOptions=-Dplugins.dir=s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/ --conf spark.driver.extraClassPath=s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-batch-ingestion-spark-3-1.1.0-shaded.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-all-1.1.0-jar-with-dependencies.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-s3-1.1.0-shaded.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-json-1.1.0-shaded.jar --jars s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-all-1.1.0-jar-with-dependencies.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-batch-ingestion-spark-3-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-json-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-s3-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-tools-1.1.0.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-spi-1.1.0.jar --files s3://aws-logs-686118793080-us-east-1/metadata/executionFrameworkSpec4.yaml local://pinot-all-0.11.0-jar-with-dependencies.jar -jobSpecFile executionFrameworkSpec4.yaml
    x
    • 2
    • 5
  • g

    Georgi Andonov

    04/11/2025, 2:51 PM
    Hello everyone! I am not 100% sure if my understanding for the merge rollup task is correct and I am looking for some clarification. I have the following table and schema config:
    Copy code
    {
      "REALTIME": {
        "tableName": "TestRollup_REALTIME",
        "tableType": "REALTIME",
        "segmentsConfig": {
          "schemaName": "TestRollup",
          "replication": "1",
          "replicasPerPartition": "1",
          "timeColumnName": "ValueTimestamp",
          "minimizeDataMovement": false
        },
        "tenants": {
          "broker": "DefaultTenant",
          "server": "DefaultTenant",
          "tagOverrideConfig": {}
        },
        "tableIndexConfig": {
          "invertedIndexColumns": [],
          "noDictionaryColumns": [],
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.topic.name": "rollup-price-aggregation",
            "stream.kafka.broker.list": "kafka:9092",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.segment.rows": "500",
            "realtime.segment.flush.threshold.time": "20m"
          },
          "aggregateMetrics": false,
          "enableDefaultStarTree": false,
          "nullHandlingEnabled": false,
          "bloomFilterColumns": [],
          "onHeapDictionaryColumns": [],
          "rangeIndexColumns": [],
          "sortedColumn": [],
          "varLengthDictionaryColumns": [],
          "rangeIndexVersion": 2,
          "optimizeDictionaryForMetrics": false,
          "optimizeDictionary": false,
          "autoGeneratedInvertedIndex": false,
          "createInvertedIndexDuringSegmentGeneration": false,
          "loadMode": "MMAP",
          "enableDynamicStarTreeCreation": false,
          "columnMajorSegmentBuilderEnabled": true,
          "optimizeDictionaryType": false,
          "noDictionarySizeRatioThreshold": 0.85
        },
        "metadata": {},
        "quota": {},
        "task": {
          "taskTypeConfigsMap": {
            "MergeRollupTask": {
              "5m_2m.mergeType": "rollup",
              "5m_2m.bucketTimePeriod": "5m",
              "5m_2m.bufferTimePeriod": "2m",
              "5m_2m.roundBucketTimePeriod": "1m",
              "10m_2m.mergeType": "rollup",
              "10m_2m.bucketTimePeriod": "10m",
              "10m_2m.bufferTimePeriod": "12m",
              "10m_2m.roundBucketTimePeriod": "5m",
              "Price.aggregationType": "sum",
              "schedule": "0 * * * * ?"
            }
          }
        },
        "routing": {},
        "query": {},
        "ingestionConfig": {
          "continueOnError": false,
          "rowTimeValueCheck": false,
          "segmentTimeValueCheck": true
        },
        "isDimTable": false
      }
    }
    
    {
      "schemaName": "TestRollup",
      "enableColumnBasedNullHandling": false,
      "dimensionFieldSpecs": [
        {
          "name": "Id",
          "dataType": "INT",
          "fieldType": "DIMENSION",
          "notNull": false
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "Price",
          "dataType": "DOUBLE",
          "fieldType": "METRIC",
          "notNull": false
        }
      ],
      "dateTimeFieldSpecs": [
        {
          "name": "ValueTimestamp",
          "dataType": "TIMESTAMP",
          "fieldType": "DATE_TIME",
          "notNull": false,
          "format": "TIMESTAMP",
          "granularity": "1:MILLISECONDS"
        }
      ]
    }
    I am ingesting data from a Kafka topic and all the records have a ValueTimestamp equal to now. From my understanding, with that configuration, the merge rollup tasks in the config will be executed once a minute and create merged segments based on the provided configuration - if there are records whose timestamp is within the timeframe of (bucketStart -> bucketStart + 5m or bucketStart -> bucketStart + 10m) and are older than 2m or older than 12m they will be added to the merged segment, is that correct? Also, for the rounding of the timestamp, the records in the merged segment will have timestamps that are rounded to 1m or rounded to 5m (for example: they will have timestamps like 2025-04-11 1150/1151 or 2025-04-11 1150/1155)?
    m
    r
    • 3
    • 6
  • s

    San Kumar

    04/13/2025, 10:30 AM
    Hello Team We are planning to build a ui on top of our pinot table. Can you suggest which one will be better Will it good to use pinot java driver or use pinot provided rest api i.e broker end point api to retrieve the data. Table is offline table and have atleast 5 years data
    x
    • 2
    • 6
  • s

    San Kumar

    04/13/2025, 10:31 AM
    We also have additional text columns marked as raw index
  • g

    Georgi Varbanov

    04/14/2025, 12:04 PM
    Hello team, we are still in POC phase with Apache Pinot for our use case, but we are happy go to go fully test the use case on production and i want to ask few questions in regards to capacity planning, i have gone through https://startree.ai/resources/capacity-planning-in-apache-pinot-part-1 few times, so here is my use case and questions, will be really grateful if you can assist: Use case: Data retention - infinite, all data is in hot tear Current data size - 12B rows with nesting/arrays (around 48B rows unnested) - each nested row is around 3kb(this is raw, without compression from pinot) of data when serialized in JSON bytes and 60ish columns in pinot. QPS requirements - 1500-2000 for current needs, but can be more in the future; P99 under 500ms, P90/50 under 100ms Kafka Ingestion Rate - 1000 msg/s high rate; during spikes we would need 20k-30k msg/s this is overall for a given topic (we currently have 100 partitions for the topic, but can scale even more if needed). In terms of per partition rates 10msg/s for normal and 300msg/s for spikes Daily ingestion rows - 12M per day with nesting/arrays Replication factor - 3 Segment size - 500MB (around 70k segments when all data is up * 3 (replication) = 210k segments) Types of queries - 99% of the queries are select Col1, Count, avg, sum,min,max from tbl where customerId = {customerId} groupBy col1 or similar queries, there are some use cases where we need to fetch raw data of up to 1000-2000 rows, but those are rare and not under the QPS requirements described above Number of tables - 1, possibly few more if we add DIM tables for join queries with nomenclatures, but currently we don't have/need it Questions: 1. Is it better to have many smaller machines or fewer bigger ones, as there are examples where you go up to 32 cores and 126GB of memory per machine 2. Is there a benefit to using offline and realtime tables in combination if all data is ingested through kafka and i see that our ingestion rate is not as high as in other use cases. As far as i have researched we should be fine using only realtime table without retention 3. Do you have any recommendations for the cluster: a. Controller/Zookeeper - 3x (16vCpu, 64GB Mem, 200GB storage each) - how do i calculate storage for controller/zookeeper (as far as i understood Controller and Zookeeper are singleton instances so multiplying them is just for redundancy) b. Broker - 5x(8vCpu, 32GB Mem) or 10x(4vCPU, 16GB Mem) ? c. Server - 20x(8vCpu, 64GB Mem, 2TB storage) or should i go with smaller/bigger machines
    m
    • 2
    • 5
  • k

    kranthi kumar

    04/15/2025, 12:23 PM
    Hi, I am getting this error while doing batch ingestion via spark jobs .
    Copy code
    java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
    	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) ~[scala-library-2.12.18.jar:?]
    	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) ~[scala-library-2.12.18.jar:?]
    	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:48) ~[spark-common-utils_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) ~[spark-core_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:509) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:268) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:937) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:936) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at java.security.AccessController.doPrivileged(AccessController.java:712) [?:?]
    	at javax.security.auth.Subject.doAs(Subject.java:439) [?:?]
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953) [hadoop-client-api-3.4.0-amzn-2.jar:?]
    	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:936) [spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) [spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    I have almost 200k files with 5000 records each , is there any Limitation on Pinot side ? thats causing this issue ?
    x
    • 2
    • 4
  • y

    Yarden Rokach

    04/15/2025, 6:30 PM
    Real-Time Analytics Summit; ONE MONTH TO GO! 🔥 We’re thirlled to see the numbers! already over 3000 registrants! If you haven’t yet, that’s the time to SAVE YOUR SEAT, it’s online and free! 🎫 Here are 4 examples of 4 great sessions that will be held: 🔹 Flexible Forecasting and Insights with Apache Pinot- Fetch Discover how Fetch built a high-performance Sales Intelligence Platform using Apache Pinot to deliver real-time shopper insights. 🔹 Data Mesh: Transforming Real-Time Analytics at Netflix Go behind the scenes with Netflix to explore how they built a transformative data mesh that powers personalized user experiences from real-time recommendations to game analytics, live events, and ad processing. 🔹 Managing Trade-Offs in System Design: Migrating from Flink to Pinot- 7Signal Explore how 7Signal transformed its data architecture to deliver real-time, low-latency analytics, enabling Wi-Fi performance insights at scale across industries such as healthcare and sports venues. 🔹 Apache Pinot in Action: Real-Time Analytics at CrowdStrike Get a firsthand look at how CrowdStrike uses Apache Pinot to process tens of thousands of events per second, enhancing security operations in real-time. 👉 See the full agenda here>
  • m

    Mannoj

    04/18/2025, 8:07 AM
    A quick question, I have only enabled Pinot Servers without HDFS. And data has been populating on them. Now when I enable HDFS, how the data sends to HDFS? 1. Will it automatically send data into HDFS 2. Or Pinot-server and pinot-controllers restart required? 3. Or rebalance or reload table will do the magic of sending data into hdfs? 4. NOPE : Nothing will do, after enabling hdfs. you need to drop table and recreate on pinot so it will start sending data to pinot and hdfs.
    x
    • 2
    • 24
  • s

    Saravanan Subburayal

    04/19/2025, 9:01 AM
    I want to use Azure Disk for the Deep Store instead of S3 or HDFS. Are there any supported file system for storing the files locally (Azure Disk through PVC)?
    x
    m
    • 3
    • 14
  • m

    Mannoj

    04/21/2025, 9:12 AM
    What is the training material that Apache Pinot can recommend for SREs or Devops or to go through any certification for Apache Pinot ?
    x
    p
    • 3
    • 3
1...156157158159160Latest