https://pinot.apache.org/ logo
Join Slack
Powered by
# getting-started
  • p

    Peter Corless

    02/27/2025, 7:46 PM
    Right now the only form of "S3 as a tiered storage layer" is StarTree Cloud.
  • p

    Peter Corless

    02/27/2025, 7:47 PM
    And, also, in that configuration, I think that the S3 for tiered storage is separate from the deep store.
  • p

    Peter Corless

    02/27/2025, 7:48 PM
    This section of the docs explains why:
    Why can't we use the copy already in deep-store?
    The segment in deep store is compressed, and typically will not have all indexes, especially if you've added indexes or derived columns to the segment later on. We use a separate location and copy for the tiered storage query execution, so we can keep it uncompressed and exactly in sync with what would have been on the server. Think of this S3 bucket as an exact replacement for the storage that would've been on the servers locally, with one major difference that we'll keep only 1 copy in S3 vs replication number of copies locally.
    Source: https://dev.startree.ai/docs/manage-data/set-up-tiered-storage/architecture
  • r

    Ravi Jain

    03/02/2025, 8:53 PM
    I am trying to create a realtime table in Pinot using avro data in kafka topic along with schema registry using the below config. but it failing without any proper error. Can somebosy help me debug this. I am sharing my config below.
    Copy code
    {
      "tableName": "ingestionClientAvro",
      "tableType": "REALTIME",
      "segmentsConfig": {
        "schemaName": "ingestionClientAvro",
        "replication": "1",
        "replicasPerPartition": "1",
        "timeColumnName": "timestamp",
        "minimizeDataMovement": false
      },
      "tenants": {
        "broker": "DefaultTenant",
        "server": "DefaultTenant",
        "tagOverrideConfig": {}
      },
      "tableIndexConfig": {
        "noDictionaryColumns": [],
        "invertedIndexColumns": [],
        "streamConfigs": {
          "streamType": "kafka",
          "stream.kafka.consumer.type": "LOWLEVEL",
          "stream.kafka.topic.name": "genericUserEvents",
          "stream.kafka.broker.list": "localhost:9092",
          "stream.kafka.consumer.prop.auto.offset.reset": "largest",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
          "stream.kafka.decoder.prop.format": "AVRO",
          "realtime.segment.flush.threshold.time": "24h",
          "realtime.segment.flush.threshold.segment.size": "100M",
          "schema.registry.url": "<http://localhost:8081>",
          "schema.registry.subject": "genericUserEvents-value"
        },
        "aggregateMetrics": false,
        "enableDefaultStarTree": false,
        "nullHandlingEnabled": false,
        "bloomFilterColumns": [],
        "onHeapDictionaryColumns": [],
        "rangeIndexColumns": [],
        "sortedColumn": [],
        "varLengthDictionaryColumns": [],
        "rangeIndexVersion": 2,
        "optimizeDictionaryForMetrics": false,
        "optimizeDictionary": false,
        "autoGeneratedInvertedIndex": false,
        "createInvertedIndexDuringSegmentGeneration": false,
        "loadMode": "MMAP",
        "enableDynamicStarTreeCreation": false,
        "columnMajorSegmentBuilderEnabled": true,
        "optimizeDictionaryType": false,
        "noDictionarySizeRatioThreshold": 0.85
      },
      "metadata": {},
      "quota": {},
      "routing": {},
      "query": {},
      "ingestionConfig": {
        "continueOnError": false,
        "rowTimeValueCheck": false,
        "segmentTimeValueCheck": true
      },
      "isDimTable": false
    }
    Schema:
    Copy code
    {
      "schemaName": "ingestionClientAvro",
      "dimensionFieldSpecs": [
        {
          "name": "data",
          "dataType": "STRING"
        },
        {
          "name": "id",
          "dataType": "STRING"
        },
        {
          "name": "schemaName",
          "dataType": "STRING"
        },
        {
          "name": "schemaVersion",
          "dataType": "STRING"
        },
        {
          "name": "tz",
          "dataType": "STRING"
        }
      ],
      "metricFieldSpecs": [],
      "dateTimeFieldSpecs": [
        {
          "name": "timestamp",
          "dataType": "LONG",
          "format": "1:MILLISECONDS:EPOCH",
          "granularity": "1:MILLISECONDS"
        }
      ]
    }
    m
    • 2
    • 3
  • c

    Chao Cao

    03/04/2025, 2:16 AM
    Hi all, I'm trying to perform multi-level flattening on ingestion using Apache Pinot. I've been following the documentation here, but I'm struggling to get deeper than the first level of nested fields. Could anyone provide guidance or examples on how to access and flatten fields that are more than one level deep? Thanks in advance!
  • r

    Rajat

    03/04/2025, 6:45 AM
    Hey team, I came to know that Pinot 1.3.0 version is released and there is a feature called 'multi-stream ingestion' in the Release notes of this feature, can anyone please explain anything about it or provide docs to read about it. Really interested in knowing more. @Mayank @Xiang Fu
  • r

    Rajat

    03/04/2025, 8:25 AM
    Hi Hi Team, I am evaluating Apache Pinot for a use case and wanted to understand the best practices for deployment. 1. POC Deployment ◦ What is the suggested approach for a quick Proof of Concept (POC)? ◦ Can we deploy all Pinot components on a single instance for minimal setup, or would you recommend using EKS or direct EC2? ◦ Are there any pre-built Helm charts or configurations for quick deployment in Kubernetes/EKS? • For POC --> if EKS --> What Instance type do you suggest for basic POC on Production Data for various benchmarking and other PODs. ◦ For EKS, do you suggest to deploy each component on separate EKS machines or for POC we can deploy it in single EKS machine with distributed memory and CPU allocation. 2. Production Deployment ◦ What is the recommended way to deploy Pinot in production? ◦ Should we use EKS (Kubernetes) or standalone EC2 instances for better performance and scalability? ◦ For small to medium workloads, is it advisable to have separate Controller, Broker, and Server nodes, or can they be combined?Are there any recommended EC2 instance types for different workloads (query-heavy vs ingestion-heavy)? ◦ Are there any recommended EC2 instance types for different workloads (query-heavy vs ingestion-heavy)? @Mayank @Xiang Fu @Jackie
    m
    m
    • 3
    • 16
  • j

    Jonathan Baxter

    03/04/2025, 3:58 PM
    Hello, so cool that this community exists! Hoping to benefit from yalls experience with Pinot with this early schema design question my team at Etsy is pondering. We’re weighing two approaches for storing organic keyword search event data (think: a stats tool showing impressions, clicks, search volume, etc for search queries that buyers use) in our system and would love your input. One approach uses upserts to maintain one row per search “session” and would let us use AVG() directly to calculate metrics like “average # of total results by search query (over time)”. The other is append-only—storing separate rows for a search, impression, click, etc and computing averages via something like SUM(total_results)/SUM(search_count). So a lot of rows with a lot of zeroes. I’m particularly interested in: • How you weigh the write latency and contention risks of an upsert approach. Is this a real thing to worry about at high write volume? • Any challenges with the increased storage needed or additional processing needed to aggregate at query time with an append-only design. Thanks in advance!
    k
    • 2
    • 4
  • c

    Chao Cao

    03/04/2025, 5:20 PM
    I figured out 2 levels: You need to combine: 1. Chaining Transformations 2. JSON Flatten In the tableConfigFileyou go into TransformConfigs and then into transformConfigs and set: "field_name": "jsonPathString(outer.middle, ‘$.inner’)”
  • c

    Chao Cao

    03/04/2025, 6:04 PM
    But I'm having some trouble when trying to access something deeply nested like:
    {
    "outer_key_1": {
    "outer_key_2": "
    [
    {
    \"inner_key_1\":
    {
    {
    \"inner_key_2\":
    {
    \"value_key_1\":
    {\"key\" : \"value\"}
    }
    }
    }
    }
    ]"
    }
    I suspect the answer is in ChainingTransformations. But I don't know how to keep chaining in the
    TransformConfigs
    when I need to go over to
    ingestionConfig.complexTypeConfig
    to put in arrays that need to be un-nested.
    • 1
    • 1
  • w

    Wilson

    03/05/2025, 3:02 AM
    I was performing a table rebalance on a realtime table with includeConsuming=true, and I encountered the following error message.
    Copy code
    Caught exception while calculating target assignment: java.lang.IllegalStateException: There are less instances: 1 in instance partitions: ODS_TIRESIAS_MCD_BJ_SEGMENT_LIST_CONSUMING than the table replication: 2
    My realtime table replication is set to 2, and I have 2 servers. I’m not sure what 'instance partitions' means — where can I find more information about it? When I use the
    GET tables/{tableName}/instancePartitions
    , it always responds
    Copy code
    {
      "code": 404,
      "error": "Failed to find the instance partitions"
    }
    m
    • 2
    • 7
  • a

    Amit

    03/05/2025, 2:10 PM
    Hi, How is arrayToMv different from VALUEIN ? I searched the pinot doc but could not find much info for arrayToMv I am working on a multi-value column. I want to show the exploded values. I have found that using VALUEIN works for equals type of queries and arrayToMv works for not equals Can someone please share more insights on it ?
  • d

    DvirZaken

    03/06/2025, 8:04 PM
    Hey everyone, What's the best way to query Apache Pinot securely and prevent SQL injection? using TypeScript
    p
    • 2
    • 3
  • k

    Kai D

    03/13/2025, 1:20 PM
    Hello Pinot team. I need to compare Pinot 1.3 performance against Pinot 1.0 performance which we currently using in production. Do I need to rebuild segments with Pinot 1.3 or 1.3 can load segments built by 1.0? Any problems like incompatible formats? I don't see this in release notes.
    m
    p
    • 3
    • 3
  • r

    Roshan

    03/13/2025, 4:36 PM
    Hi Team alert i was testing deploying pinot to AKS and using helm. i have successfully deployed pinot and also implemented the Authorization both admin and user as mentioned in the official pinot documentation. but now i am facing only in ingesting data via api. i am able to create schema and table via api (authorization headers also included) what can be the issue. is it something to do with the config files mismatch while using helm or . does anyone faced similar issue. CAN SOMEBODY HELP PLS. I AM STUCK HERE!!!
    m
    • 2
    • 1
  • y

    Yarden Rokach

    03/14/2025, 11:14 AM
    For those looking to #C01H1S9J5BJ with Contribution, join this call on April 3rd! https://www.meetup.com/apache-pinot/events/306700201/?eventOrigin=your_events
  • v

    Venkat

    03/18/2025, 7:36 AM
    Hi Team, I am trying to create a realtime table in Pinot with kafka mysql debezium connector using the below config. It is successfully fetching the records but all the values are getting null. Please help me to debug this. table.json { "tableName": "mp_saletransactions", "tableType": "REALTIME", "segmentsConfig": { "replication": "1", "schemaName": "mp_saletransactions", "timeColumnName": "transactiondate", "timeType": "MILLISECONDS" }, "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "mysql-saleentry.saleentry.mp_saletransactions", "stream.kafka.bootstrap.servers": "localhost:9092", "stream.kafka.broker.list": "localhost:9092", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.decoder.type": "debezium", "stream.kafka.decoder.debezium.schema.registry.url": "" } }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "routing": { "upsertConfig": { "mode": "FULL", "primaryKeyColumns": ["id"] } }, "metadata": {} } schema.json { "schemaName": "mp_saletransactions", "dimensionFieldSpecs": [ {"name": "id", "dataType": "INT"}, {"name": "transactiontypes", "dataType": "STRING"}, {"name": "status", "dataType": "STRING"}, {"name": "storenumber", "dataType": "STRING"}, {"name": "employeenumber", "dataType": "STRING"}, {"name": "phone", "dataType": "STRING"}, {"name": "plancode", "dataType": "STRING"}, {"name": "monthlyaccess", "dataType": "STRING"} ], "enableColumnBasedNullHandling": true, "metricFieldSpecs": [], "dateTimeFieldSpecs": [ { "name": "transactiondate", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" } ] }
  • k

    Kasinath Reddy

    03/18/2025, 10:53 AM
    Hello everyone we are planning to build a hybrid table on pinot where we have 1) Realtime table with upserts enabled 2) Offine table with star tree index As per my understanding we cannot set metadataTTL to 0 on upsertConfig without factoring heapsize 1) So assume I have sent a record with key a twice and assuming metadata map had {"a":(doc_id:1),"a":(doc_id:2)} if segement compaction drops doc_id:1 Is it automatically removed from metadata map ? 2) If we have configured metadataTTL to 1 day and assume timestamp column is configured properly and I got initial event as {key:"a",value:{"test":1}} and after a day {key:"a",value:{"test":2}} in this case there will be data duplication ? 3) If point 2 is valid when user queries data for key a we will receive 2 entries basically even if we have a in offline table due to timeboundary ?
  • k

    Kasinath Reddy

    03/18/2025, 10:53 AM
    Copy code
    Current upsert config
     "upsertConfig": {
          "mode": "PARTIAL",
          "comparisonColumns": [
            "processed_at"
          ],
          "metadataTTL": 0,
          "dropOutOfOrderRecord": false,
          "enableSnapshot": true,
          "deletedKeysTTL": 0,
          "partialUpsertStrategies": {},
          "enablePreload": false,
          "consistencyMode": "SNAPSHOT",
          "upsertViewRefreshIntervalMs": 3000,
          "allowPartialUpsertConsumptionDuringCommit": false,
          "hashFunction": "NONE",
          "defaultPartialUpsertStrategy": "OVERWRITE"
        },
        "isDimTable": false
      }
  • r

    Roshan

    03/18/2025, 3:35 PM
    hi guys i am facing data ingestion issue after adding authentication to the pinot. does it have anything to do with the table config? i am getting error: Upload response: 500 Error: {"code":500,"error":"Caught exception when ingesting file into table: test_tenant_OFFLINE. Caught exception while uploading segments. Push mode: TAR, segment tars: [[file:/opt/pinot/tmp/ingestion_dir/working_dir_test_tenant_OFFLINE_1742312090194_FRKYyJJrdp/segment_tar_dir/test_tenant_1742312090217.tar.gz]]"} table_config = { "tableName": f"{tenant_name}", "tableType": "OFFLINE", "segmentsConfig": { "minimizeDataMovement": False, "deletedSegmentsRetentionPeriod": "7d", "retentionTimeUnit": "DAYS", "retentionTimeValue": "30", "segmentPushType": "REFRESH", "replication": "1" }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "tableIndexConfig": { "invertedIndexColumns": [ "Time_of_Resolution", "Category", "Description", "Escalate", "Ticket_Status", "Customer_Gender", "Ticket_Subject", "product_purchased", "Source", "Relevance", "Customer_Email", "Customer_Name", "Tags", "Agent", "Number_of_conversations", "Intent", "Ticket_Priority", "Sentiment", "Type", "Created_date", "First_Response_Time", "Customer_Satisfaction_Rating", "Date_of_Purchase", "Resolution", "Created_date_month", "Created_date_year" ], "autoGeneratedInvertedIndex": False, "varLengthDictionaryColumns": [ "Time_of_Resolution", "Category", "Description", "Escalate", "Ticket_Status", "Customer_Gender", "Ticket_Subject", "product_purchased", "Source", "Relevance", "Customer_Email", "Customer_Name", "Tags", "Agent", "Intent", "Ticket_Priority", "Sentiment", "Type", "Created_date", "First_Response_Time", "Date_of_Purchase", "Resolution" ], "enableDefaultStarTree": False, "nullHandlingEnabled": True, "createInvertedIndexDuringSegmentGeneration": True, "rangeIndexVersion": 2, "aggregateMetrics": False, "optimizeDictionary": False, "loadMode": "MMAP", "enableDynamicStarTreeCreation": False, "columnMajorSegmentBuilderEnabled": False, "optimizeDictionaryForMetrics": False, "noDictionarySizeRatioThreshold": 0 }, "metadata": {}, "ingestionConfig": { "batchIngestionConfig": { "segmentIngestionType": "APPEND", "consistentDataPush": False }, "segmentTimeValueCheck": False, "transformConfigs": [ { "columnName": "First_Response_Time_epoch_millis", "transformFunction": "CASEWHEN(strcmp(Number_of_conversations,'null')=0,null,DATETIMECONVERT(First_Response_Time, 'SIMPLE_DATE_FORMAT|dd-MM-yyyy HH:mm', '1MILLISECONDSEPOCH', 'MINUTES|1'))" }, { "columnName": "Time_of_Resolution_epoch_millis", "transformFunction": "CASEWHEN(strcmp(Ticket_Status,'Closed')=0,DATETIMECONVERT(Time_of_Resolution, 'SIMPLE_DATE_FORMAT|dd-MM-yyyy HH:mm', '1MILLISECONDSEPOCH', 'MINUTES|1'),null)" }, { "columnName": "Created_date_timestamp", "transformFunction": "DATETIMECONVERT(Created_date, 'SIMPLE_DATE_FORMAT|dd-MM-yyyy HH:mm', 'SIMPLE_DATE_FORMAT|yyyy-MM-dd HH:mm', 'MINUTES|1')" }, { "columnName": "Created_date_epoch_millis", "transformFunction": "DATETIMECONVERT(Created_date, 'SIMPLE_DATE_FORMAT|dd-MM-yyyy HH:mm', '1MILLISECONDSEPOCH', 'MINUTES|1')" }, { "columnName": "Created_date_month", "transformFunction": "Month(Created_date_epoch_millis, 'UTC')" }, { "columnName": "Created_date_year", "transformFunction": "YEAR(Created_date_epoch_millis)" }, { "columnName": "Sentiment_score", "transformFunction": "CASEWHEN(strcmp(Sentiment,'Positive')=0,2,CASEWHEN(strcmp(Sentiment,'Neutral')=0,1,0))" }, { "columnName": "Sentiment_Positive", "transformFunction": "CASEWHEN(strcmp(Sentiment,'Positive')=0,2,0)" }, { "columnName": "Sentiment_Neutral", "transformFunction": "CASEWHEN(strcmp(Sentiment,'Neutral')=0,1,0)" }, { "columnName": "Sentiment_Negative", "transformFunction": "CASEWHEN(strcmp(Sentiment,'Negative')=0,0,0)" }, { "columnName": "Customer_Satisfaction_Rating", "transformFunction": "\"Customer Satisfaction Rating\"" }, { "columnName": "Number_of_conversations", "transformFunction": "\"Number of conversations\"" }, { "columnName": "Ticket_Subject", "transformFunction": "\"Ticket Subject\"" }, { "columnName": "Customer_Name", "transformFunction": "\"Customer Name\"" }, { "columnName": "Time_of_Resolution", "transformFunction": "\"Time to Resolution\"" }, { "columnName": "Customer_Age", "transformFunction": "\"Customer Age\"" }, { "columnName": "First_Response_Time", "transformFunction": "\"First Response Time\"" }, { "columnName": "Customer_Email", "transformFunction": "\"Customer Email\"" }, { "columnName": "Ticket_Status", "transformFunction": "\"Ticket Status\"" }, { "columnName": "Created_date", "transformFunction": "\"Created date\"" }, { "columnName": "Date_of_Purchase", "transformFunction": "\"Date of Purchase\"" }, { "columnName": "Agent_overall_performance", "transformFunction": "\"Agent overall performance\"" }, { "columnName": "Agent_politeness", "transformFunction": "\"Agent politeness\"" }, { "columnName": "Customer_Gender", "transformFunction": "\"Customer Gender\"" }, { "columnName": "Agent_communication", "transformFunction": "\"Agent communication\"" }, { "columnName": "Agent_Patience", "transformFunction": "\"Agent Patience\"" }, { "columnName": "Ticket_ID", "transformFunction": "\"Ticket ID\"" }, { "columnName": "Ticket_Priority", "transformFunction": "\"Ticket Priority\"" } ], "continueOnError": True, "rowTimeValueCheck": True }, "isDimTable": False }
  • r

    Roshan

    03/18/2025, 4:40 PM
    please can anybody help i have been stuck for 4 days now!!!!!!!!!!alert the code i used was properly working fine with a namespace that didnt have authentication setup. when i did the authorization only i am facing this issue. the schema creation and table creation is working fine after authentication done in pinot. but the data ingestion is having issues. these are the config files: when i create the schema and table for a sample table. when i try ingest a sample csv file i cant do that: i am getting the above mentioned error in the chat. Upload response: 500 Error: {"code":500,"error":"Caught exception when ingesting file into table: test_tenant_OFFLINE. Caught exception while uploading segments. Push mode: TAR, segment tars: [[file:/opt/pinot/tmp/ingestion_dir/working_dir_test_tenant_OFFLINE_1742312090194_FRKYyJJrdp/segment_tar_dir/test_tenant_1742312090217.tar.gz]]"} ingest-pythoncode:
    Copy code
    import requests
    import json
    from pathlib import Path
    import time
    
    PINOT_CONTROLLER = "********"
    PINOT_BROKER = "*********"
    AUTH_HEADERS = {
        "Authorization": "Basic YWRtaW46dmVyeXNlY3JldA==",
        "Content-Type": "application/json"
    }
    
    def verify_segment(tenant_name, segment_name):
        max_retries = 10
        retry_interval = 2  # seconds
        
        for i in range(max_retries):
            print(f"\nChecking segment status (attempt {i+1}/{max_retries})...")
            
            response = requests.get(f"{PINOT_CONTROLLER}/segments/{tenant_name}/{segment_name}/metadata",headers=AUTH_HEADERS)
            if response.status_code == 200:
                print("Segment is ready!")
                return True
                
            print(f"Segment not ready yet, waiting {retry_interval} seconds...")
            time.sleep(retry_interval)
        
        return False
    
    def simple_ingest(tenant_name):
        # Verify schema and table exist
        schema_response = requests.get(f'{PINOT_CONTROLLER}/schemas/{tenant_name}',headers=AUTH_HEADERS)
        table_response = requests.get(f'{PINOT_CONTROLLER}/tables/{tenant_name}',headers=AUTH_HEADERS)
        
        if schema_response.status_code != 200 or table_response.status_code != 200:
            print(f"Schema or table missing for tenant {tenant_name}. Please run create_schema.py and create_table.py first")
            return
    
        csv_path = Path(f"data/{tenant_name}_data.csv")
        
        print(f"\nUploading data for tenant {tenant_name}...")
        with open(csv_path, 'rb') as f:
            files = {'file': (f'{tenant_name}_data.csv', f, 'text/csv')}
            
            # Using a dictionary for column mapping first
            column_map = {
                "Ticket ID": "Ticket_ID",
                "Customer Name": "Customer_Name",
                "Customer Email": "Customer_Email",
                "Company_name": "Company_name",
                "Customer Age": "Customer_Age",
                "Customer Gender": "Customer_Gender",
                "Product purchased": "product_purchased",
                "Date of Purchase": "Date_of_Purchase",
                "Ticket Subject": "Ticket_Subject",
                "Description": "Description",
                "Ticket Status": "Ticket_Status",
                "Resolution": "Resolution",
                "Ticket Priority": "Ticket_Priority",
                "Source": "Source",
                "Created date": "Created_date",
                "First Response Time": "First_Response_Time",
                "Time to Resolution": "Time_of_Resolution",
                "Number of conversations": "Number_of_conversations",
                "Customer Satisfaction Rating": "Customer_Satisfaction_Rating",
                "Category": "Category",
                "Intent": "Intent",
                "Type": "Type",
                "Relevance": "Relevance",
                "Escalate": "Escalate",
                "Sentiment": "Sentiment",
                "Tags": "Tags",
                "Agent": "Agent",
                "Agent politeness": "Agent_politeness",
                "Agent communication": "Agent_communication",
                "Agent Patience": "Agent_Patience",
                "Agent overall performance": "Agent_overall_performance",
                "Conversations": "Conversations"
            }
    
            config = {
                "inputFormat": "csv",
                "header": "true",
                "delimiter": ",",
                "fileFormat": "csv",
                "multiValueDelimiter": ";",
                "skipHeader": "false",
                # Convert dictionary to proper JSON string
                "columnHeaderMap": json.dumps(column_map)
            }
            
            params = {
                'tableNameWithType': f'{tenant_name}_OFFLINE',
                'batchConfigMapStr': json.dumps(config)
            }
    
            upload_headers = {
                "Authorization": AUTH_HEADERS["Authorization"]
            }
            
            response = <http://requests.post|requests.post>(
                f'{PINOT_CONTROLLER}/ingestFromFile',
                files=files,
                params=params,
                headers=upload_headers
            )
        
        print(f"Upload response: {response.status_code}")
        if response.status_code != 200:
            print(f"Error: {response.text}")
            return
    
        if response.status_code == 200:
            try:
                response_data = json.loads(response.text)
                print(f"Response data: {response_data}")
                segment_name = response_data["status"].split("segment: ")[1]
                print(f"\nWaiting for segment {segment_name} to be ready...")
                
                if verify_segment(tenant_name, segment_name):
                    query = {
                        "sql": f"SELECT COUNT(*) FROM {tenant_name}_OFFLINE",
                        "trace": False
                    }
                    
                    query_response = <http://requests.post|requests.post>(
                        f"{PINOT_BROKER}/query/sql",
                        json=query,
                        headers=AUTH_HEADERS
                    )
                    
                    print("\nQuery response:", query_response.status_code)
                    if query_response.status_code == 200:
                        print(json.dumps(query_response.json(), indent=2))
                else:
                    print("Segment verification timed out")
            except Exception as e:
                print(f"Error processing segment: {e}")
                print(f"Full response text: {response.text}")
    
    if __name__ == "__main__":
        simple_ingest("test_tenant")
    Slack Conversation
    p
    m
    • 3
    • 4
  • s

    Sandeep Jain

    03/20/2025, 4:33 AM
    Hi there! Quick question: Is it possible to visualise data stored in Pinot using Grafana?
    a
    p
    s
    • 4
    • 7
  • k

    Kasinath Reddy

    03/20/2025, 4:58 AM
    Hello pinot team, this is a realtime segment URI , we need to backfill it , there is a option to upload segments to real time table right so , I need to upload the corrected data with the same URI or what is the procedure
    Copy code
    segment uri : "<http://pinot-controller:9000/segments/test/test__0__101__20250315T0506Z>"
  • m

    Mike Stewart

    03/20/2025, 5:08 PM
    Hi All! I've been experimenting with Star Tree indexes today and am struggling to understand why the index is ignored in certain cases when it seems like it should be used. Please could someone help explain this behavior and whether there’s anything I need to adjust to optimize it for my use case? Pinot may not work for my needs without it. Any insights would be greatly appreciated—thank you! I've set up a very simple Star Tree Index on my table with the following configuration:
    Copy code
    "starTreeIndexConfigs": [
      {
        "dimensionsSplitOrder": ["event_type"],
        "functionColumnPairs": ["SUM__event_currency_amount"],
        "maxLeafRecords": 10000
      }
    ]
    When I execute the following query:
    Copy code
    SELECT event_type, SUM(event_currency_amount)  
    FROM [table]  
    GROUP BY event_type;
    the Star Tree index is used, and only 9 documents are read. However, when I introduce a time filter of any kind:
    Copy code
    SELECT event_type, SUM(event_currency_amount)  
    FROM [table]  
    WHERE event_time >= 0 AND event_time < 1800000000000  
    GROUP BY event_type;
    the Star Tree index is ignored, resulting in 14 million documents being scanned instead. Since the
    event_time
    range in the
    WHERE
    clause fully encompasses the segment’s
    event_time
    range, I expected the Star Tree index to still be utilized. Based on previous discussions, I was under the impression that this should be the case. I also reviewed the below link, which I understood might specifically address this scenario in the product. For fuller context, I currently have a single offline table with a single segment. Initially, I had a Hybrid table, but I removed it to minimize variables while troubleshooting this issue. Any guidance on why this is happening and how I might ensure the Star Tree index is used would be greatly appreciated. Thanks in advance! Improve star-tree to use star-node when the predicate matches all the non-star nodes by Jackie-Jiang · Pull Request #9667 · apache/pinot · GitHub
  • m

    Manish G

    03/22/2025, 3:04 PM
    my pinot server is running, and i have created a schema, and a table for same. How do I insert data in table from UI?
    • 1
    • 1
  • m

    Manish G

    03/22/2025, 3:46 PM
    I have defined a schema as:
    Copy code
    {
      "schemaName": "my-test-schema",
      "enableColumnBasedNullHandling": true,
      "dimensionFieldSpecs": [
        {
          "name": "field",
          "dataType": "FLOAT",
          "fieldType": "DIMENSION"
        }
      ]
    }
    I want to insert null value in column: field 1.43 null It throws error:
    Copy code
    Caused by: java.lang.NumberFormatException: For input string: "null"
            at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054)
            at java.base/jdk.internal.math.FloatingDecimal.parseFloat(FloatingDecimal.java:122)
            at java.base/java.lang.Float.parseFloat(Float.java:570)
            at org.apache.pinot.common.utils.PinotDataType$11.toFloat(PinotDataType.java:617)
            at org.apache.pinot.common.utils.PinotDataType$7.convert(PinotDataType.java:425)
            at org.apache.pinot.common.utils.PinotDataType$7.convert(PinotDataType.java:375)
            at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:118)
    What is correct way of having null values?
    a
    • 2
    • 1
  • b

    baarath

    04/23/2025, 6:02 AM
    Hi All, Could someone share a job spec or relevant documentation for setting up Pinot offline batch ingestion from CSV files? I tried to follow the job spec given in official doc But got stuck with below error.
    Copy code
    java.lang.RuntimeException: Caught exception during running - org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
            at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:152)
            at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.runIngestionJob(IngestionJobLauncher.java:121)
            at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:132)
            at org.apache.pinot.tools.Command.call(Command.java:33)
            at org.apache.pinot.tools.Command.call(Command.java:29)
            at picocli.CommandLine.executeUserObject(CommandLine.java:2045)
            at picocli.CommandLine.access$1500(CommandLine.java:148)
            at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
            at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
            at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
            at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
            at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
            at picocli.CommandLine.execute(CommandLine.java:2174)
            at org.apache.pinot.tools.admin.PinotAdministrator.execute(PinotAdministrator.java:173)
            at org.apache.pinot.tools.admin.PinotAdministrator.main(PinotAdministrator.java:204)
    Caused by: java.lang.IllegalArgumentException
            at java.base/sun.nio.fs.UnixFileSystem.getPathMatcher(UnixFileSystem.java:286)
            at org.apache.pinot.common.segment.generation.SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(SegmentGenerationUtils.java:263)
            at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.run(SegmentGenerationJobRunner.java:177)
            at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:150)
            ... 14 more
    x
    • 2
    • 6
  • b

    baarath

    04/24/2025, 10:41 AM
    Hi Everyone Im trying to do batch ingestion using Spark as standalone is not recommended for production Im following below link to implement the same https://dev.startree.ai/docs/pinot/recipes/ingest-parquet-files-from-s3-using-spark But Im kind of confused if spark need to be installed in same instance as pinot or can it be ran using EMR where i usually run spark jobs. Can anyone help me understand ?
    x
    • 2
    • 5
  • b

    baarath

    04/28/2025, 8:54 AM
    Hi All, I have CSV data with header having column name in CamelCase. I want it be snake_case in pinot offline table How to map column when doing batch ingestion
    • 1
    • 1
  • r

    Ram Makireddi

    04/28/2025, 3:17 PM
    Hi, this is Ram. I am asked to build a Data Analytics platform in my PG and after extensive research i decided to go with Pinot. I am curious to know what the orchestrater tools it can seamlessly integrate with to orchestrate data pipelines? Any recommendations.
    x
    p
    • 3
    • 4