https://pinot.apache.org/ logo
Join Slack
Powered by
# general
  • s

    San Kumar

    04/04/2025, 8:46 AM
    Hello Team if realtime table design for proto serializer as stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.protobuf.ProtoBufMessageDecoder", and it has minion configuration
    Copy code
    "taskTypeConfigsMap": {
                "RealtimeToOfflineSegmentsTask": {
                  "schedule": "*/15 * * ** ?",
                  "bucketTimePeriod": "6h",
                  "bufferTimePeriod": "1d"
                }
              }
    when we create off line table on same schema then what will file format it will store what file format? in offline table we have config
    Copy code
    "ingestionConfig": {
              "continueOnError": false,
              "rowTimeValueCheck": false,
              "batchIngestionConfig": {
                "consistentDataPush": false,
                "segmentIngestionType": "APPEND",
                "segmentIngestionFrequency": "HOURLY"
              },
    is there anything we have to specify for file format in offline table when there is an hybrid table.How I ensure that offline table also use same file compression as real time. in summary Hybrid table definition if Realtime --> proto buffet --> what will offline table segment format type.is it csv /json/proto. How I can confirm Hybrid table definition if real time Avro ===>> what will offline table segment type
  • s

    San Kumar

    04/04/2025, 8:52 AM
    Hello Team how we create multiple tenent in pinot
    m
    • 2
    • 1
  • s

    Shiva Sharana

    04/04/2025, 9:35 AM
    Hello all, 1] Wanted to know how to calculate the Median value for a particular column in Pinot version v1.0.1,[Not using Percentile tho , as it returns max - min value] 2] and how to find the diff() between two rows current and previous rows without using Join function.
    m
    • 2
    • 2
  • s

    San Kumar

    04/05/2025, 1:10 PM
    Hello Team can we provide primary key in offline APPEND type table and push the segment sothat record will replace if exist else append.Is it possible in pinot.
    m
    • 2
    • 1
  • s

    San Kumar

    04/06/2025, 4:50 AM
    Hello Team I am currently looking for documentation related to segment file compression and storage formats in Apache Pinot. I find the existing documentation somewhat unclear. From my understanding, regardless of whether we use JSON, Proto, or Avro, segments are always stored using some form of file compression. This seems to be independent of the serialization format used when writing messages to the topic. Is my understanding correct? We are planning to implement a Pinot-based solution in the cloud, but without a clear understanding of segment compression, we are unable to determine whether Pinot will be cost-effective in terms of storage. Could someone please provide clarification on this topic? Your insights will greatly assist us in making an informed decision regarding our Pinot-based application.
    x
    • 2
    • 2
  • g

    Georgi Andonov

    04/06/2025, 9:34 PM
    Hello, everyone! I currently have this set up for Kafka ingestion in Pinot - I ingest data from a Kafka topic of the form Id - INT, Price - DOUBLE, UnixTime - LONG into a Pinot REALTIME table. I wanted to ask if there is a way to use the UnixTime timestamp from the ingested data as the timestamp column in the table - to use the value in the ingested message as Timestamp instead of ingestion time?
    m
    x
    • 3
    • 16
  • k

    kranthi kumar

    04/09/2025, 8:54 AM
    Hello Everyone, I am working towards a project to ingest historic data to pinot. We have nearly 50 Peta bytes of data stored in S3. We have to process that data to extract metadata and store it in pinot tables. Previously we were publishing to kafka and pinot was reading from the kafka. Now, since this is historic data , we want to move the flow away from real time streaming. I have read about the batch ingestion via minions and spark jobs in the documentation. I want to know which of those serves better for our case considering the huge amount of data we have. Any suggestions and if possible providing any references by anyone will be much helpful to my work.
    x
    • 2
    • 2
  • k

    kranthi kumar

    04/09/2025, 9:43 AM
    Hello all, If anyone has worked on running spark jobs to batch ingest data directly to pinot tables, please explain the steps and share your knowledge on it , any help is appreciated.
  • s

    San Kumar

    04/10/2025, 9:27 AM
    We are currently performing batch injection into a Pinot table using the provided example. In our first iteration, we have the following data:
    Copy code
    event_time, source, country, event_type
    epochmilli1, nokia, ind, event1
    epochmilli1, nokia, ind, event2
    epochmilli2, samsung, USA, event2
    We uploaded this data to Pinot, resulting in the following records:
    Copy code
    epochmilli1, nokia, ind, event2
    epochmilli2, samsung, USA, event0
    Subsequently, we received new data:
    Copy code
    epochmilli1, nokia, ind, event3
    epochmilli1, Apple, ind, event0
    Our program processed this new data and prepared it as follows:
    Copy code
    epochmilli1, nokia, ind, event3
    epochmilli1, Apple, ind, event0
    We aim to update the segment to reflect the final records, which should be:
    Copy code
    epochmilli1, nokia, ind, event3
    epochmilli2, samsung, USA, event0
    epochmilli1, Apple, ind, event0
    To clarify, we want to replace the record for
    epochmilli1
    where the device type is
    nokia
    and add any new records for device types that are not already in the database. Could you please provide guidance on how to achieve this update in the Pinot table? we will use offline table only no kafka topic use and no upserttable
    x
    • 2
    • 3
  • l

    Lakshya Devani

    04/10/2025, 9:41 AM
    Hi team, I have a cluster which has a realtime table ingesting from kafka. everytime I delete the segments, the connection between the table and the kafka breaks Can any one help me understand the root cause here.
    Copy code
    2025/04/10 09:34:36.948 INFO [HelixInstanceDataManager] [HelixTaskExecutor-message_handle_thread_21] Deleted segment: ruleset_evaluation_shadow_test_realtime_2__2__47__202
    50410T0520Z from table: ruleset_evaluation_shadow_test_realtime_2_REALTIME
    2025/04/10 09:34:37.177 INFO [RealtimeSegmentDataManager_ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] [ruleset_evaluation_shadow_test_realtime_2__2__4
    9__20250410T0716Z] Recreating stream consumer for topic partition ruleset_evaluation_shadow_test_realtime_2_REALTIME-topic.ruleset.evaluation.shadow.test-2, reason: Too ma
    ny transient errors
    2025/04/10 09:34:37.177 ERROR [KafkaConsumer] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] [Consumer clientId=ruleset_evaluation_shadow_test_realtime
    _2_REALTIME-topic.ruleset.evaluation.shadow.test-2, groupId=msk_consumer] Failed to close coordinator
    org.apache.kafka.common.errors.InterruptException: java.lang.InterruptedException
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeThrowInterruptException(ConsumerNetworkClient.java:520) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:281) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:245) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1026) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.maybeAutoCommitOffsetsSync(ConsumerCoordinator.java:1087) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.close(ConsumerCoordinator.java:919) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2366) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2333) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2283) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.close(KafkaPartitionLevelConnectionHandler.java:118) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.closePartitionGroupConsumer(RealtimeSegmentDataManager.java:1256) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.recreateStreamConsumer(RealtimeSegmentDataManager.java:1860) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.handleTransientStreamErrors(RealtimeSegmentDataManager.java:436) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager.consumeLoop(RealtimeSegmentDataManager.java:484) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager$PartitionConsumer.run(RealtimeSegmentDataManager.java:766) ~[pinot-all-1.4.0-SNAPSHOT-jar-with-dependencies.jar:1.4.0-SNAPSHOT-eb9c759344502969c80e3e9ec00fe67bd24d2965]
            at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
    Caused by: java.lang.InterruptedException
            ... 18 more
    2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Metrics scheduler closed
    2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Closing reporter org.apache.kafka.common.metrics.JmxReporter
    2025/04/10 09:34:37.177 INFO [Metrics] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] Metrics reporters closed
    2025/04/10 09:34:37.178 INFO [AppInfoParser] [ruleset_evaluation_shadow_test_realtime_2__2__49__20250410T0716Z] App info kafka.consumer for ruleset_evaluation_shadow_test_realtime_2_REALTIME-topic.ruleset.evaluation.shadow.test-2 unregistered
    what is happening in these logs
    x
    • 2
    • 9
  • k

    kranthi kumar

    04/10/2025, 11:36 AM
    Hello Everyone, I am trying to batch ingest data to pinot via spark jobs, I am running spark on Amazon EMR and my pinot is hosted on a EKS Cluster, i gave S3 Full access policies to my EMR, EKS IAM roles, But still blocked with this error
    Copy code
    Caused by: software.amazon.awssdk.core.exception.SdkClientException: Unable to load credentials from any of the providers in the chain AwsCredentialsProviderChain(credentialsProviders=[SystemPropertyCredentialsProvider(), EnvironmentVariableCredentialsProvider(), WebIdentityTokenCredentialsProvider(), ProfileCredentialsProvider(profileName=default, profileFile=ProfileFile(sections=[])), ContainerCredentialsProvider(), InstanceProfileCredentialsProvider()]) : [SystemPropertyCredentialsProvider(): Unable to load credentials from system settings. Access key must be specified either via environment variable (AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)., EnvironmentVariableCredentialsProvider(): Unable to load credentials from system settings. Access key must be specified either via environment variable (AWS_ACCESS_KEY_ID) or system property (aws.accessKeyId)., WebIdentityTokenCredentialsProvider(): Either the environment variable AWS_WEB_IDENTITY_TOKEN_FILE or the javaproperty aws.webIdentityTokenFile must be set., ProfileCredentialsProvider(profileName=default, profileFile=ProfileFile(sections=[])): Profile file contained no credentials for profile 'default': ProfileFile(sections=[]), ContainerCredentialsProvider(): Cannot fetch credentials from container - neither AWS_CONTAINER_CREDENTIALS_FULL_URI or AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variables are set., InstanceProfileCredentialsProvider(): Failed to load credentials from IMDS.]
    	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:111)
    	at software.amazon.awssdk.auth.credentials.AwsCredentialsProviderChain.resolveCredentials(AwsCredentialsProviderChain.java:130)
    	at software.amazon.awssdk.auth.credentials.internal.LazyAwsCredentialsProvider.resolveCredentials(LazyAwsCredentialsProvider.java:45)
    	at software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider.resolveCredentials(DefaultCredentialsProvider.java:129)
    	at software.amazon.awssdk.auth.credentials.AwsCredentialsProvider.resolveIdentity(AwsCredentialsProvider.java:54)
    	at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.lambda$trySelectAuthScheme$4(S3AuthSchemeInterceptor.java:163)
    	at software.amazon.awssdk.core.internal.util.MetricUtils.reportDuration(MetricUtils.java:77)
    	at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.trySelectAuthScheme(S3AuthSchemeInterceptor.java:163)
    	at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.selectAuthScheme(S3AuthSchemeInterceptor.java:84)
    	at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.beforeExecution(S3AuthSchemeInterceptor.java:64)
    	at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.lambda$beforeExecution$1(ExecutionInterceptorChain.java:59)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    	at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.beforeExecution(ExecutionInterceptorChain.java:59)
    	at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.runInitialInterceptors(AwsExecutionContextBuilder.java:248)
    	at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:138)
    	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsSyncClientHandler.java:67)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$0(BaseSyncClientHandler.java:62)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
    	at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:60)
    	at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:52)
    	at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:60)
    	at software.amazon.awssdk.services.s3.DefaultS3Client.getObject(DefaultS3Client.java:5570)
    	at org.apache.pinot.plugin.filesystem.S3PinotFS.copyToLocalFile(S3PinotFS.java:569)
    	at org.apache.pinot.spi.filesystem.NoClosePinotFS.copyToLocalFile(NoClosePinotFS.java:98)
    	at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner$1.call(SparkSegmentGenerationJobRunner.java:263)
    	at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner$1.call(SparkSegmentGenerationJobRunner.java:212)
    	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
    	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
    	at scala.collection.Iterator.foreach(Iterator.scala:943)
    	at scala.collection.Iterator.foreach$(Iterator.scala:943)
    	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    	at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1047)
    	at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1047)
    	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2539)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:174)
    	at org.apache.spark.scheduler.Task.run(Task.scala:152)
    	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:632)
    	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:635)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    	at java.base/java.lang.Thread.run(Thread.java:840)
    My JobSpec. executionFrameworkSpec: name: 'spark' segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner' segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner' segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner' segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner' extraConfigs: stagingDir: s3://aws-logs-686118793080-us-east-1/metadata/staging/ jobType: SegmentCreationAndMetadataPush inputDirURI: 's3://aws-logs-686118793080-us-east-1/metadata/basic-attribute/date=2025-04-10/' outputDirURI: 's3://aws-logs-686118793080-us-east-1/metadata/Output_Metadata/' overwriteOutput: true pinotFSSpecs: - scheme: s3 className: org.apache.pinot.plugin.filesystem.S3PinotFS configs: region: 'us-east-1' recordReaderSpec: dataFormat: 'json' className: 'org.apache.pinot.plugin.inputformat.json.JSONRecordReader' tableSpec: tableName: 'bat_backfill_emr_REALTIME' schemaURI: 's3://aws-logs-686118793080-us-east-1/metadata/batSchema.json' tableConfigURI: 's3://aws-logs-686118793080-us-east-1/metadata/batTable.json' pinotClusterSpecs: - controllerURI: 'http://localhost:9000' pushJobSpec: pushParallelism: 2 pushAttempts: 20 pushRetryIntervalMillis: 1000 . My spark submit command spark-submit --deploy-mode cluster --class org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand --master yarn --conf spark.driver.extraJavaOptions=-Dplugins.dir=s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/ --conf spark.driver.extraClassPath=s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-batch-ingestion-spark-3-1.1.0-shaded.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-all-1.1.0-jar-with-dependencies.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-s3-1.1.0-shaded.jar:s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-json-1.1.0-shaded.jar --jars s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-all-1.1.0-jar-with-dependencies.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-batch-ingestion-spark-3-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-json-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-s3-1.1.0-shaded.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-tools-1.1.0.jar,s3://aws-logs-686118793080-us-east-1/BatchIngestionJars/pinot-spi-1.1.0.jar --files s3://aws-logs-686118793080-us-east-1/metadata/executionFrameworkSpec4.yaml local://pinot-all-0.11.0-jar-with-dependencies.jar -jobSpecFile executionFrameworkSpec4.yaml
    x
    • 2
    • 5
  • g

    Georgi Andonov

    04/11/2025, 2:51 PM
    Hello everyone! I am not 100% sure if my understanding for the merge rollup task is correct and I am looking for some clarification. I have the following table and schema config:
    Copy code
    {
      "REALTIME": {
        "tableName": "TestRollup_REALTIME",
        "tableType": "REALTIME",
        "segmentsConfig": {
          "schemaName": "TestRollup",
          "replication": "1",
          "replicasPerPartition": "1",
          "timeColumnName": "ValueTimestamp",
          "minimizeDataMovement": false
        },
        "tenants": {
          "broker": "DefaultTenant",
          "server": "DefaultTenant",
          "tagOverrideConfig": {}
        },
        "tableIndexConfig": {
          "invertedIndexColumns": [],
          "noDictionaryColumns": [],
          "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.topic.name": "rollup-price-aggregation",
            "stream.kafka.broker.list": "kafka:9092",
            "stream.kafka.consumer.type": "lowlevel",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "realtime.segment.flush.threshold.rows": "0",
            "realtime.segment.flush.threshold.segment.rows": "500",
            "realtime.segment.flush.threshold.time": "20m"
          },
          "aggregateMetrics": false,
          "enableDefaultStarTree": false,
          "nullHandlingEnabled": false,
          "bloomFilterColumns": [],
          "onHeapDictionaryColumns": [],
          "rangeIndexColumns": [],
          "sortedColumn": [],
          "varLengthDictionaryColumns": [],
          "rangeIndexVersion": 2,
          "optimizeDictionaryForMetrics": false,
          "optimizeDictionary": false,
          "autoGeneratedInvertedIndex": false,
          "createInvertedIndexDuringSegmentGeneration": false,
          "loadMode": "MMAP",
          "enableDynamicStarTreeCreation": false,
          "columnMajorSegmentBuilderEnabled": true,
          "optimizeDictionaryType": false,
          "noDictionarySizeRatioThreshold": 0.85
        },
        "metadata": {},
        "quota": {},
        "task": {
          "taskTypeConfigsMap": {
            "MergeRollupTask": {
              "5m_2m.mergeType": "rollup",
              "5m_2m.bucketTimePeriod": "5m",
              "5m_2m.bufferTimePeriod": "2m",
              "5m_2m.roundBucketTimePeriod": "1m",
              "10m_2m.mergeType": "rollup",
              "10m_2m.bucketTimePeriod": "10m",
              "10m_2m.bufferTimePeriod": "12m",
              "10m_2m.roundBucketTimePeriod": "5m",
              "Price.aggregationType": "sum",
              "schedule": "0 * * * * ?"
            }
          }
        },
        "routing": {},
        "query": {},
        "ingestionConfig": {
          "continueOnError": false,
          "rowTimeValueCheck": false,
          "segmentTimeValueCheck": true
        },
        "isDimTable": false
      }
    }
    
    {
      "schemaName": "TestRollup",
      "enableColumnBasedNullHandling": false,
      "dimensionFieldSpecs": [
        {
          "name": "Id",
          "dataType": "INT",
          "fieldType": "DIMENSION",
          "notNull": false
        }
      ],
      "metricFieldSpecs": [
        {
          "name": "Price",
          "dataType": "DOUBLE",
          "fieldType": "METRIC",
          "notNull": false
        }
      ],
      "dateTimeFieldSpecs": [
        {
          "name": "ValueTimestamp",
          "dataType": "TIMESTAMP",
          "fieldType": "DATE_TIME",
          "notNull": false,
          "format": "TIMESTAMP",
          "granularity": "1:MILLISECONDS"
        }
      ]
    }
    I am ingesting data from a Kafka topic and all the records have a ValueTimestamp equal to now. From my understanding, with that configuration, the merge rollup tasks in the config will be executed once a minute and create merged segments based on the provided configuration - if there are records whose timestamp is within the timeframe of (bucketStart -> bucketStart + 5m or bucketStart -> bucketStart + 10m) and are older than 2m or older than 12m they will be added to the merged segment, is that correct? Also, for the rounding of the timestamp, the records in the merged segment will have timestamps that are rounded to 1m or rounded to 5m (for example: they will have timestamps like 2025-04-11 1150/1151 or 2025-04-11 1150/1155)?
    m
    r
    • 3
    • 6
  • s

    San Kumar

    04/13/2025, 10:30 AM
    Hello Team We are planning to build a ui on top of our pinot table. Can you suggest which one will be better Will it good to use pinot java driver or use pinot provided rest api i.e broker end point api to retrieve the data. Table is offline table and have atleast 5 years data
    x
    • 2
    • 6
  • s

    San Kumar

    04/13/2025, 10:31 AM
    We also have additional text columns marked as raw index
  • g

    Georgi Varbanov

    04/14/2025, 12:04 PM
    Hello team, we are still in POC phase with Apache Pinot for our use case, but we are happy go to go fully test the use case on production and i want to ask few questions in regards to capacity planning, i have gone through https://startree.ai/resources/capacity-planning-in-apache-pinot-part-1 few times, so here is my use case and questions, will be really grateful if you can assist: Use case: Data retention - infinite, all data is in hot tear Current data size - 12B rows with nesting/arrays (around 48B rows unnested) - each nested row is around 3kb(this is raw, without compression from pinot) of data when serialized in JSON bytes and 60ish columns in pinot. QPS requirements - 1500-2000 for current needs, but can be more in the future; P99 under 500ms, P90/50 under 100ms Kafka Ingestion Rate - 1000 msg/s high rate; during spikes we would need 20k-30k msg/s this is overall for a given topic (we currently have 100 partitions for the topic, but can scale even more if needed). In terms of per partition rates 10msg/s for normal and 300msg/s for spikes Daily ingestion rows - 12M per day with nesting/arrays Replication factor - 3 Segment size - 500MB (around 70k segments when all data is up * 3 (replication) = 210k segments) Types of queries - 99% of the queries are select Col1, Count, avg, sum,min,max from tbl where customerId = {customerId} groupBy col1 or similar queries, there are some use cases where we need to fetch raw data of up to 1000-2000 rows, but those are rare and not under the QPS requirements described above Number of tables - 1, possibly few more if we add DIM tables for join queries with nomenclatures, but currently we don't have/need it Questions: 1. Is it better to have many smaller machines or fewer bigger ones, as there are examples where you go up to 32 cores and 126GB of memory per machine 2. Is there a benefit to using offline and realtime tables in combination if all data is ingested through kafka and i see that our ingestion rate is not as high as in other use cases. As far as i have researched we should be fine using only realtime table without retention 3. Do you have any recommendations for the cluster: a. Controller/Zookeeper - 3x (16vCpu, 64GB Mem, 200GB storage each) - how do i calculate storage for controller/zookeeper (as far as i understood Controller and Zookeeper are singleton instances so multiplying them is just for redundancy) b. Broker - 5x(8vCpu, 32GB Mem) or 10x(4vCPU, 16GB Mem) ? c. Server - 20x(8vCpu, 64GB Mem, 2TB storage) or should i go with smaller/bigger machines
    m
    • 2
    • 5
  • k

    kranthi kumar

    04/15/2025, 12:23 PM
    Hi, I am getting this error while doing batch ingestion via spark jobs .
    Copy code
    java.util.concurrent.TimeoutException: Futures timed out after [100000 milliseconds]
    	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259) ~[scala-library-2.12.18.jar:?]
    	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263) ~[scala-library-2.12.18.jar:?]
    	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:48) ~[spark-common-utils_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310) ~[spark-core_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:509) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:268) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:937) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:936) ~[spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at java.security.AccessController.doPrivileged(AccessController.java:712) [?:?]
    	at javax.security.auth.Subject.doAs(Subject.java:439) [?:?]
    	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1953) [hadoop-client-api-3.4.0-amzn-2.jar:?]
    	at org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:936) [spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    	at org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala) [spark-yarn_2.12-3.5.3-amzn-0.jar:3.5.3-amzn-0]
    I have almost 200k files with 5000 records each , is there any Limitation on Pinot side ? thats causing this issue ?
    x
    • 2
    • 4
  • y

    Yarden Rokach

    04/15/2025, 6:30 PM
    Real-Time Analytics Summit; ONE MONTH TO GO! 🔥 We’re thirlled to see the numbers! already over 3000 registrants! If you haven’t yet, that’s the time to SAVE YOUR SEAT, it’s online and free! 🎫 Here are 4 examples of 4 great sessions that will be held: 🔹 Flexible Forecasting and Insights with Apache Pinot- Fetch Discover how Fetch built a high-performance Sales Intelligence Platform using Apache Pinot to deliver real-time shopper insights. 🔹 Data Mesh: Transforming Real-Time Analytics at Netflix Go behind the scenes with Netflix to explore how they built a transformative data mesh that powers personalized user experiences from real-time recommendations to game analytics, live events, and ad processing. 🔹 Managing Trade-Offs in System Design: Migrating from Flink to Pinot- 7Signal Explore how 7Signal transformed its data architecture to deliver real-time, low-latency analytics, enabling Wi-Fi performance insights at scale across industries such as healthcare and sports venues. 🔹 Apache Pinot in Action: Real-Time Analytics at CrowdStrike Get a firsthand look at how CrowdStrike uses Apache Pinot to process tens of thousands of events per second, enhancing security operations in real-time. 👉 See the full agenda here>
  • m

    Mannoj

    04/18/2025, 8:07 AM
    A quick question, I have only enabled Pinot Servers without HDFS. And data has been populating on them. Now when I enable HDFS, how the data sends to HDFS? 1. Will it automatically send data into HDFS 2. Or Pinot-server and pinot-controllers restart required? 3. Or rebalance or reload table will do the magic of sending data into hdfs? 4. NOPE : Nothing will do, after enabling hdfs. you need to drop table and recreate on pinot so it will start sending data to pinot and hdfs.
    x
    • 2
    • 24
  • s

    Saravanan Subburayal

    04/19/2025, 9:01 AM
    I want to use Azure Disk for the Deep Store instead of S3 or HDFS. Are there any supported file system for storing the files locally (Azure Disk through PVC)?
    x
    m
    • 3
    • 14
  • m

    Mannoj

    04/21/2025, 9:12 AM
    What is the training material that Apache Pinot can recommend for SREs or Devops or to go through any certification for Apache Pinot ?
    x
    p
    • 3
    • 3
  • p

    Peter Corless

    04/21/2025, 6:33 PM
    Folks sound off in this thread to let me know you're registered for rtasummit.com, and what sparked you to register (a specific talk, speaker, networking opportunity, etc.)! Feel free to chime in!
  • p

    Peter Corless

    04/22/2025, 8:13 PM
    Hi folks! We are going to be announcing the premier StarTree “Real-Time Revolutionaries” Awards at RTA Summit 2025. 4,500+ attendees have already registered, yet there will be only a dozen or so companies recognized in this year's inaugural awards. As one of the people behind organizing the awards, one of the things that limited me greatly was simply knowing about a real-time analytics use case. For StarTree customers, we have more direct communication and insights. Yet for many great Apache Pinot OSS stories, I haven't heard of what you're up to! So for next year, please avail yourselves of telling us what you're up to! You might just be a winner in 2026! • Apache Pinot User Story Meanwhile, remember to register for RTA Summit! Coming up this 14 May. rtasummit.com
  • j

    jamangstangs

    04/23/2025, 7:54 AM
    Hi, may I ask when Apache Pinot 1.4.0 is expected to be released?
    x
    • 2
    • 1
  • s

    San Kumar

    04/25/2025, 4:57 AM
    Hello is it recommended to set below properties for multi tenant enablement in table for pinot cluster.tenant.isolation.enable=false pinot.set.instance.id.to.hostname=true as I see from the document https://docs.pinot.apache.org/basics/concepts/components/cluster/tenant As far I know cluster.tenant.isolation.enable=false is not recommonded.
    n
    x
    • 3
    • 7
  • s

    San Kumar

    04/25/2025, 4:58 AM
    can you please provide on these properties for production setting
  • s

    San Kumar

    04/25/2025, 4:59 AM
    or what is the best approach to enable multiple tenant,
  • p

    Peter Corless

    04/25/2025, 8:27 PM
    Have you folks found the deepwiki for Apache Pinot yet? I'd love folks' feedback on the quality and accuracy: https://deepwiki.com/apache/pinot
  • y

    Yarden Rokach

    04/29/2025, 8:54 AM
    🚀 How are Uber, Netflix, and Spotify scaling real-time analytics- for themselves and their users? Join us at #RTASummit on May 14th — the must-attend (and free!) event for data architects and engineers. 💡 One highlight you won’t want to miss: Building Real-Time GenAI Pipelines with Apache Pinot and AWS Discover how to build a real-time social media analysis pipeline using Amazon Managed Service for Apache Flink, Amazon Bedrock, and Apache Pinot as a vector database. See how this architecture powers real-time RAG (Retrieval Augmented Generation), enabling instant insight into social media trends and unlocking next-gen GenAI search and analysis capabilities. Perfect for teams looking to harness GenAI + streaming data for real-time, data-driven decisions. RSVP>
  • s

    San Kumar

    04/29/2025, 9:52 AM
    Hello Team is The pinot java client api is a wrapper of the http restapi?
    y
    • 2
    • 1
  • y

    Yarden Rokach

    05/02/2025, 11:23 AM
    CONGRATS @Gonzalo Ortiz and @Sonam Mandal 💥