https://pinot.apache.org/ logo
Join Slack
Powered by
# getting-started
  • v

    Venkat

    03/18/2025, 7:36 AM
    Hi Team, I am trying to create a realtime table in Pinot with kafka mysql debezium connector using the below config. It is successfully fetching the records but all the values are getting null. Please help me to debug this. table.json { "tableName": "mp_saletransactions", "tableType": "REALTIME", "segmentsConfig": { "replication": "1", "schemaName": "mp_saletransactions", "timeColumnName": "transactiondate", "timeType": "MILLISECONDS" }, "tableIndexConfig": { "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "mysql-saleentry.saleentry.mp_saletransactions", "stream.kafka.bootstrap.servers": "localhost:9092", "stream.kafka.broker.list": "localhost:9092", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "stream.kafka.decoder.type": "debezium", "stream.kafka.decoder.debezium.schema.registry.url": "" } }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "routing": { "upsertConfig": { "mode": "FULL", "primaryKeyColumns": ["id"] } }, "metadata": {} } schema.json { "schemaName": "mp_saletransactions", "dimensionFieldSpecs": [ {"name": "id", "dataType": "INT"}, {"name": "transactiontypes", "dataType": "STRING"}, {"name": "status", "dataType": "STRING"}, {"name": "storenumber", "dataType": "STRING"}, {"name": "employeenumber", "dataType": "STRING"}, {"name": "phone", "dataType": "STRING"}, {"name": "plancode", "dataType": "STRING"}, {"name": "monthlyaccess", "dataType": "STRING"} ], "enableColumnBasedNullHandling": true, "metricFieldSpecs": [], "dateTimeFieldSpecs": [ { "name": "transactiondate", "dataType": "LONG", "format": "1MILLISECONDSEPOCH", "granularity": "1:MILLISECONDS" } ] }
  • k

    Kasinath Reddy

    03/18/2025, 10:53 AM
    Hello everyone we are planning to build a hybrid table on pinot where we have 1) Realtime table with upserts enabled 2) Offine table with star tree index As per my understanding we cannot set metadataTTL to 0 on upsertConfig without factoring heapsize 1) So assume I have sent a record with key a twice and assuming metadata map had {"a":(doc_id:1),"a":(doc_id:2)} if segement compaction drops doc_id:1 Is it automatically removed from metadata map ? 2) If we have configured metadataTTL to 1 day and assume timestamp column is configured properly and I got initial event as {key:"a",value:{"test":1}} and after a day {key:"a",value:{"test":2}} in this case there will be data duplication ? 3) If point 2 is valid when user queries data for key a we will receive 2 entries basically even if we have a in offline table due to timeboundary ?
  • k

    Kasinath Reddy

    03/18/2025, 10:53 AM
    Copy code
    Current upsert config
     "upsertConfig": {
          "mode": "PARTIAL",
          "comparisonColumns": [
            "processed_at"
          ],
          "metadataTTL": 0,
          "dropOutOfOrderRecord": false,
          "enableSnapshot": true,
          "deletedKeysTTL": 0,
          "partialUpsertStrategies": {},
          "enablePreload": false,
          "consistencyMode": "SNAPSHOT",
          "upsertViewRefreshIntervalMs": 3000,
          "allowPartialUpsertConsumptionDuringCommit": false,
          "hashFunction": "NONE",
          "defaultPartialUpsertStrategy": "OVERWRITE"
        },
        "isDimTable": false
      }
  • r

    Roshan

    03/18/2025, 3:35 PM
    hi guys i am facing data ingestion issue after adding authentication to the pinot. does it have anything to do with the table config? i am getting error: Upload response: 500 Error: {"code":500,"error":"Caught exception when ingesting file into table: test_tenant_OFFLINE. Caught exception while uploading segments. Push mode: TAR, segment tars: [[file:/opt/pinot/tmp/ingestion_dir/working_dir_test_tenant_OFFLINE_1742312090194_FRKYyJJrdp/segment_tar_dir/test_tenant_1742312090217.tar.gz]]"} table_config = { "tableName": f"{tenant_name}", "tableType": "OFFLINE", "segmentsConfig": { "minimizeDataMovement": False, "deletedSegmentsRetentionPeriod": "7d", "retentionTimeUnit": "DAYS", "retentionTimeValue": "30", "segmentPushType": "REFRESH", "replication": "1" }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant" }, "tableIndexConfig": { "invertedIndexColumns": [ "Time_of_Resolution", "Category", "Description", "Escalate", "Ticket_Status", "Customer_Gender", "Ticket_Subject", "product_purchased", "Source", "Relevance", "Customer_Email", "Customer_Name", "Tags", "Agent", "Number_of_conversations", "Intent", "Ticket_Priority", "Sentiment", "Type", "Created_date", "First_Response_Time", "Customer_Satisfaction_Rating", "Date_of_Purchase", "Resolution", "Created_date_month", "Created_date_year" ], "autoGeneratedInvertedIndex": False, "varLengthDictionaryColumns": [ "Time_of_Resolution", "Category", "Description", "Escalate", "Ticket_Status", "Customer_Gender", "Ticket_Subject", "product_purchased", "Source", "Relevance", "Customer_Email", "Customer_Name", "Tags", "Agent", "Intent", "Ticket_Priority", "Sentiment", "Type", "Created_date", "First_Response_Time", "Date_of_Purchase", "Resolution" ], "enableDefaultStarTree": False, "nullHandlingEnabled": True, "createInvertedIndexDuringSegmentGeneration": True, "rangeIndexVersion": 2, "aggregateMetrics": False, "optimizeDictionary": False, "loadMode": "MMAP", "enableDynamicStarTreeCreation": False, "columnMajorSegmentBuilderEnabled": False, "optimizeDictionaryForMetrics": False, "noDictionarySizeRatioThreshold": 0 }, "metadata": {}, "ingestionConfig": { "batchIngestionConfig": { "segmentIngestionType": "APPEND", "consistentDataPush": False }, "segmentTimeValueCheck": False, "transformConfigs": [ { "columnName": "First_Response_Time_epoch_millis", "transformFunction": "CASEWHEN(strcmp(Number_of_conversations,'null')=0,null,DATETIMECONVERT(First_Response_Time, 'SIMPLE_DATE_FORMAT|dd-MM-yyyy HH:mm', '1MILLISECONDSEPOCH', 'MINUTES|1'))" }, { "columnName": "Time_of_Resolution_epoch_millis", "transformFunction": "CASEWHEN(strcmp(Ticket_Status,'Closed')=0,DATETIMECONVERT(Time_of_Resolution, 'SIMPLE_DATE_FORMAT|dd-MM-yyyy HH:mm', '1MILLISECONDSEPOCH', 'MINUTES|1'),null)" }, { "columnName": "Created_date_timestamp", "transformFunction": "DATETIMECONVERT(Created_date, 'SIMPLE_DATE_FORMAT|dd-MM-yyyy HH:mm', 'SIMPLE_DATE_FORMAT|yyyy-MM-dd HH:mm', 'MINUTES|1')" }, { "columnName": "Created_date_epoch_millis", "transformFunction": "DATETIMECONVERT(Created_date, 'SIMPLE_DATE_FORMAT|dd-MM-yyyy HH:mm', '1MILLISECONDSEPOCH', 'MINUTES|1')" }, { "columnName": "Created_date_month", "transformFunction": "Month(Created_date_epoch_millis, 'UTC')" }, { "columnName": "Created_date_year", "transformFunction": "YEAR(Created_date_epoch_millis)" }, { "columnName": "Sentiment_score", "transformFunction": "CASEWHEN(strcmp(Sentiment,'Positive')=0,2,CASEWHEN(strcmp(Sentiment,'Neutral')=0,1,0))" }, { "columnName": "Sentiment_Positive", "transformFunction": "CASEWHEN(strcmp(Sentiment,'Positive')=0,2,0)" }, { "columnName": "Sentiment_Neutral", "transformFunction": "CASEWHEN(strcmp(Sentiment,'Neutral')=0,1,0)" }, { "columnName": "Sentiment_Negative", "transformFunction": "CASEWHEN(strcmp(Sentiment,'Negative')=0,0,0)" }, { "columnName": "Customer_Satisfaction_Rating", "transformFunction": "\"Customer Satisfaction Rating\"" }, { "columnName": "Number_of_conversations", "transformFunction": "\"Number of conversations\"" }, { "columnName": "Ticket_Subject", "transformFunction": "\"Ticket Subject\"" }, { "columnName": "Customer_Name", "transformFunction": "\"Customer Name\"" }, { "columnName": "Time_of_Resolution", "transformFunction": "\"Time to Resolution\"" }, { "columnName": "Customer_Age", "transformFunction": "\"Customer Age\"" }, { "columnName": "First_Response_Time", "transformFunction": "\"First Response Time\"" }, { "columnName": "Customer_Email", "transformFunction": "\"Customer Email\"" }, { "columnName": "Ticket_Status", "transformFunction": "\"Ticket Status\"" }, { "columnName": "Created_date", "transformFunction": "\"Created date\"" }, { "columnName": "Date_of_Purchase", "transformFunction": "\"Date of Purchase\"" }, { "columnName": "Agent_overall_performance", "transformFunction": "\"Agent overall performance\"" }, { "columnName": "Agent_politeness", "transformFunction": "\"Agent politeness\"" }, { "columnName": "Customer_Gender", "transformFunction": "\"Customer Gender\"" }, { "columnName": "Agent_communication", "transformFunction": "\"Agent communication\"" }, { "columnName": "Agent_Patience", "transformFunction": "\"Agent Patience\"" }, { "columnName": "Ticket_ID", "transformFunction": "\"Ticket ID\"" }, { "columnName": "Ticket_Priority", "transformFunction": "\"Ticket Priority\"" } ], "continueOnError": True, "rowTimeValueCheck": True }, "isDimTable": False }
  • r

    Roshan

    03/18/2025, 4:40 PM
    please can anybody help i have been stuck for 4 days now!!!!!!!!!!alert the code i used was properly working fine with a namespace that didnt have authentication setup. when i did the authorization only i am facing this issue. the schema creation and table creation is working fine after authentication done in pinot. but the data ingestion is having issues. these are the config files: when i create the schema and table for a sample table. when i try ingest a sample csv file i cant do that: i am getting the above mentioned error in the chat. Upload response: 500 Error: {"code":500,"error":"Caught exception when ingesting file into table: test_tenant_OFFLINE. Caught exception while uploading segments. Push mode: TAR, segment tars: [[file:/opt/pinot/tmp/ingestion_dir/working_dir_test_tenant_OFFLINE_1742312090194_FRKYyJJrdp/segment_tar_dir/test_tenant_1742312090217.tar.gz]]"} ingest-pythoncode:
    Copy code
    import requests
    import json
    from pathlib import Path
    import time
    
    PINOT_CONTROLLER = "********"
    PINOT_BROKER = "*********"
    AUTH_HEADERS = {
        "Authorization": "Basic YWRtaW46dmVyeXNlY3JldA==",
        "Content-Type": "application/json"
    }
    
    def verify_segment(tenant_name, segment_name):
        max_retries = 10
        retry_interval = 2  # seconds
        
        for i in range(max_retries):
            print(f"\nChecking segment status (attempt {i+1}/{max_retries})...")
            
            response = requests.get(f"{PINOT_CONTROLLER}/segments/{tenant_name}/{segment_name}/metadata",headers=AUTH_HEADERS)
            if response.status_code == 200:
                print("Segment is ready!")
                return True
                
            print(f"Segment not ready yet, waiting {retry_interval} seconds...")
            time.sleep(retry_interval)
        
        return False
    
    def simple_ingest(tenant_name):
        # Verify schema and table exist
        schema_response = requests.get(f'{PINOT_CONTROLLER}/schemas/{tenant_name}',headers=AUTH_HEADERS)
        table_response = requests.get(f'{PINOT_CONTROLLER}/tables/{tenant_name}',headers=AUTH_HEADERS)
        
        if schema_response.status_code != 200 or table_response.status_code != 200:
            print(f"Schema or table missing for tenant {tenant_name}. Please run create_schema.py and create_table.py first")
            return
    
        csv_path = Path(f"data/{tenant_name}_data.csv")
        
        print(f"\nUploading data for tenant {tenant_name}...")
        with open(csv_path, 'rb') as f:
            files = {'file': (f'{tenant_name}_data.csv', f, 'text/csv')}
            
            # Using a dictionary for column mapping first
            column_map = {
                "Ticket ID": "Ticket_ID",
                "Customer Name": "Customer_Name",
                "Customer Email": "Customer_Email",
                "Company_name": "Company_name",
                "Customer Age": "Customer_Age",
                "Customer Gender": "Customer_Gender",
                "Product purchased": "product_purchased",
                "Date of Purchase": "Date_of_Purchase",
                "Ticket Subject": "Ticket_Subject",
                "Description": "Description",
                "Ticket Status": "Ticket_Status",
                "Resolution": "Resolution",
                "Ticket Priority": "Ticket_Priority",
                "Source": "Source",
                "Created date": "Created_date",
                "First Response Time": "First_Response_Time",
                "Time to Resolution": "Time_of_Resolution",
                "Number of conversations": "Number_of_conversations",
                "Customer Satisfaction Rating": "Customer_Satisfaction_Rating",
                "Category": "Category",
                "Intent": "Intent",
                "Type": "Type",
                "Relevance": "Relevance",
                "Escalate": "Escalate",
                "Sentiment": "Sentiment",
                "Tags": "Tags",
                "Agent": "Agent",
                "Agent politeness": "Agent_politeness",
                "Agent communication": "Agent_communication",
                "Agent Patience": "Agent_Patience",
                "Agent overall performance": "Agent_overall_performance",
                "Conversations": "Conversations"
            }
    
            config = {
                "inputFormat": "csv",
                "header": "true",
                "delimiter": ",",
                "fileFormat": "csv",
                "multiValueDelimiter": ";",
                "skipHeader": "false",
                # Convert dictionary to proper JSON string
                "columnHeaderMap": json.dumps(column_map)
            }
            
            params = {
                'tableNameWithType': f'{tenant_name}_OFFLINE',
                'batchConfigMapStr': json.dumps(config)
            }
    
            upload_headers = {
                "Authorization": AUTH_HEADERS["Authorization"]
            }
            
            response = <http://requests.post|requests.post>(
                f'{PINOT_CONTROLLER}/ingestFromFile',
                files=files,
                params=params,
                headers=upload_headers
            )
        
        print(f"Upload response: {response.status_code}")
        if response.status_code != 200:
            print(f"Error: {response.text}")
            return
    
        if response.status_code == 200:
            try:
                response_data = json.loads(response.text)
                print(f"Response data: {response_data}")
                segment_name = response_data["status"].split("segment: ")[1]
                print(f"\nWaiting for segment {segment_name} to be ready...")
                
                if verify_segment(tenant_name, segment_name):
                    query = {
                        "sql": f"SELECT COUNT(*) FROM {tenant_name}_OFFLINE",
                        "trace": False
                    }
                    
                    query_response = <http://requests.post|requests.post>(
                        f"{PINOT_BROKER}/query/sql",
                        json=query,
                        headers=AUTH_HEADERS
                    )
                    
                    print("\nQuery response:", query_response.status_code)
                    if query_response.status_code == 200:
                        print(json.dumps(query_response.json(), indent=2))
                else:
                    print("Segment verification timed out")
            except Exception as e:
                print(f"Error processing segment: {e}")
                print(f"Full response text: {response.text}")
    
    if __name__ == "__main__":
        simple_ingest("test_tenant")
    Slack Conversation
    p
    m
    • 3
    • 4
  • s

    Sandeep Jain

    03/20/2025, 4:33 AM
    Hi there! Quick question: Is it possible to visualise data stored in Pinot using Grafana?
    a
    p
    s
    • 4
    • 7
  • k

    Kasinath Reddy

    03/20/2025, 4:58 AM
    Hello pinot team, this is a realtime segment URI , we need to backfill it , there is a option to upload segments to real time table right so , I need to upload the corrected data with the same URI or what is the procedure
    Copy code
    segment uri : "<http://pinot-controller:9000/segments/test/test__0__101__20250315T0506Z>"
  • m

    Mike Stewart

    03/20/2025, 5:08 PM
    Hi All! I've been experimenting with Star Tree indexes today and am struggling to understand why the index is ignored in certain cases when it seems like it should be used. Please could someone help explain this behavior and whether there’s anything I need to adjust to optimize it for my use case? Pinot may not work for my needs without it. Any insights would be greatly appreciated—thank you! I've set up a very simple Star Tree Index on my table with the following configuration:
    Copy code
    "starTreeIndexConfigs": [
      {
        "dimensionsSplitOrder": ["event_type"],
        "functionColumnPairs": ["SUM__event_currency_amount"],
        "maxLeafRecords": 10000
      }
    ]
    When I execute the following query:
    Copy code
    SELECT event_type, SUM(event_currency_amount)  
    FROM [table]  
    GROUP BY event_type;
    the Star Tree index is used, and only 9 documents are read. However, when I introduce a time filter of any kind:
    Copy code
    SELECT event_type, SUM(event_currency_amount)  
    FROM [table]  
    WHERE event_time >= 0 AND event_time < 1800000000000  
    GROUP BY event_type;
    the Star Tree index is ignored, resulting in 14 million documents being scanned instead. Since the
    event_time
    range in the
    WHERE
    clause fully encompasses the segment’s
    event_time
    range, I expected the Star Tree index to still be utilized. Based on previous discussions, I was under the impression that this should be the case. I also reviewed the below link, which I understood might specifically address this scenario in the product. For fuller context, I currently have a single offline table with a single segment. Initially, I had a Hybrid table, but I removed it to minimize variables while troubleshooting this issue. Any guidance on why this is happening and how I might ensure the Star Tree index is used would be greatly appreciated. Thanks in advance! Improve star-tree to use star-node when the predicate matches all the non-star nodes by Jackie-Jiang · Pull Request #9667 · apache/pinot · GitHub
  • m

    Manish G

    03/22/2025, 3:04 PM
    my pinot server is running, and i have created a schema, and a table for same. How do I insert data in table from UI?
    • 1
    • 1
  • m

    Manish G

    03/22/2025, 3:46 PM
    I have defined a schema as:
    Copy code
    {
      "schemaName": "my-test-schema",
      "enableColumnBasedNullHandling": true,
      "dimensionFieldSpecs": [
        {
          "name": "field",
          "dataType": "FLOAT",
          "fieldType": "DIMENSION"
        }
      ]
    }
    I want to insert null value in column: field 1.43 null It throws error:
    Copy code
    Caused by: java.lang.NumberFormatException: For input string: "null"
            at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054)
            at java.base/jdk.internal.math.FloatingDecimal.parseFloat(FloatingDecimal.java:122)
            at java.base/java.lang.Float.parseFloat(Float.java:570)
            at org.apache.pinot.common.utils.PinotDataType$11.toFloat(PinotDataType.java:617)
            at org.apache.pinot.common.utils.PinotDataType$7.convert(PinotDataType.java:425)
            at org.apache.pinot.common.utils.PinotDataType$7.convert(PinotDataType.java:375)
            at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:118)
    What is correct way of having null values?
    a
    • 2
    • 1
  • b

    baarath

    04/23/2025, 6:02 AM
    Hi All, Could someone share a job spec or relevant documentation for setting up Pinot offline batch ingestion from CSV files? I tried to follow the job spec given in official doc But got stuck with below error.
    Copy code
    java.lang.RuntimeException: Caught exception during running - org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
            at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:152)
            at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.runIngestionJob(IngestionJobLauncher.java:121)
            at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:132)
            at org.apache.pinot.tools.Command.call(Command.java:33)
            at org.apache.pinot.tools.Command.call(Command.java:29)
            at picocli.CommandLine.executeUserObject(CommandLine.java:2045)
            at picocli.CommandLine.access$1500(CommandLine.java:148)
            at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
            at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
            at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
            at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
            at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
            at picocli.CommandLine.execute(CommandLine.java:2174)
            at org.apache.pinot.tools.admin.PinotAdministrator.execute(PinotAdministrator.java:173)
            at org.apache.pinot.tools.admin.PinotAdministrator.main(PinotAdministrator.java:204)
    Caused by: java.lang.IllegalArgumentException
            at java.base/sun.nio.fs.UnixFileSystem.getPathMatcher(UnixFileSystem.java:286)
            at org.apache.pinot.common.segment.generation.SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(SegmentGenerationUtils.java:263)
            at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.run(SegmentGenerationJobRunner.java:177)
            at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:150)
            ... 14 more
    x
    • 2
    • 6
  • b

    baarath

    04/24/2025, 10:41 AM
    Hi Everyone Im trying to do batch ingestion using Spark as standalone is not recommended for production Im following below link to implement the same https://dev.startree.ai/docs/pinot/recipes/ingest-parquet-files-from-s3-using-spark But Im kind of confused if spark need to be installed in same instance as pinot or can it be ran using EMR where i usually run spark jobs. Can anyone help me understand ?
    x
    • 2
    • 5
  • b

    baarath

    04/28/2025, 8:54 AM
    Hi All, I have CSV data with header having column name in CamelCase. I want it be snake_case in pinot offline table How to map column when doing batch ingestion
    • 1
    • 1
  • r

    Ram Makireddi

    04/28/2025, 3:17 PM
    Hi, this is Ram. I am asked to build a Data Analytics platform in my PG and after extensive research i decided to go with Pinot. I am curious to know what the orchestrater tools it can seamlessly integrate with to orchestrate data pipelines? Any recommendations.
    x
    p
    • 3
    • 4
  • v

    Vivekanand

    05/12/2025, 7:17 AM
    Hi, how does Pinot compare to Cassandra? Just curious thanks
    x
    p
    • 3
    • 9
  • v

    Vishruth Raj

    06/09/2025, 1:40 AM
    Hi, im interested in deploying a Ceph cluster as my deep storage layer for Apache Pinot. Are there any docs on how to do this?
    x
    • 2
    • 2
  • r

    Ross Morrow

    06/09/2025, 1:14 PM
    Hey all, are there any general sizing guidelines/practices published? I'm playing around with tables I can take in stages from orders like 1M, 10M, 100M (last one in progress) and modifying PVC size, heap/pod mem size, segment size, server count etc to try to build some intuition. Though I would ask if there are docs about how to think about this in general.
    m
    • 2
    • 4
  • a

    Ajinkya

    06/17/2025, 8:50 AM
    Hello Team, I am exploring whether Apache Pinot would be a good fit for our use case and would appreciate your insights please :) We are currently on AWS and use S3 as our data lake. After cleaning and transforming the data with Spark, we write the output to MySQL RDS tables, which are then used in various API calls. The challenge we are facing is CPU spike on RDS when Spark writes to it, especially since these jobs run every 3 hours. Slowing down the writes helps somewhat, but as we add more of these jobs, it becomes a bottleneck ... both on RDS and EMR. Scaling RDS isn’t a viable option due to cost concerns. Would it make sense to write the output from Spark directly to Pinot instead of MySQL, and serve the API reads from Pinot? Could this be a cost-effective and scalable strategy? Thanks in advance!
    m
    • 2
    • 8
  • n

    Nithish

    07/04/2025, 9:29 AM
    Hey Team, I am trying pinot offline table ingestion using below spec file, the tar file is getting created but the spark job is failing after 3 attempts as specified in the config below Note: It has worked when tried with •
    SegmentCreationAndTarPush
    but getting 413 error for large tar files •
    SegmentCreationAndMetadataPush
    this works fine but it has known issue as per the thread - https://apache-pinot.slack.com/archives/CDRCA57FC/p1715293105121389
    Copy code
    jobType: SegmentCreationAndUriPush
    
    inputDirURI: '<gs://bucket-name/warehouse/dataengineering.db/ems_attributes/data>'
    includeFileNamePattern: 'glob:**/*.parquet'
    excludeFileNamePattern: 'glob:**/_SUCCESS,glob:**/*.crc,glob:**/*metadata*,glob:**/*.json'
    outputDirURI: '<gs://bucket-name/pinot-segments/poc_ems_attributes>'
    overwriteOutput: true
    
    # Execution Framework
    executionFrameworkSpec:
      name: 'spark'
    
      # replace spark with spark3 for versions > 3.2.0
      segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner'
      segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner'
      segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner'
      segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner'
      extraConfigs:
        stagingDir: '<gs://bucket-name/pinot-batch-ingestion/staging>'
    
    # Record Reader Configuration for Parquet
    recordReaderSpec:
      dataFormat: 'parquet'
      className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
    
    # Pinot File System
    pinotFSSpecs:
      - scheme: 'gs'
        className: 'org.apache.pinot.plugin.filesystem.GcsPinotFS'
    
    # Table Configuration
    tableSpec:
      tableName: 'poc_ems_attributes'
      schemaURI: '<https://prod-dp-pinot-controller.in/schemas/poc_ems_attributes>'
      tableConfigURI: '<https://prod-dp-pinot-controller.in/tables/poc_ems_attributes>'
    
    # Segment Name Generation
    segmentNameGeneratorSpec:
      type: simple
      configs:
        segment.name.prefix: 'poc_ems_attributes'
        segment.name.postfix: 'uri_push'
        exclude.sequence.id: false
    
    # Pinot Cluster Configuration
    pinotClusterSpecs:
      - controllerURI: '<https://prod-dp-pinot-controller.in>'
    
    # Push Job Configuration
    pushJobSpec:
      pushAttempts: 3
      pushRetryIntervalMillis: 15000
      pushParallelism: 2
    ERROR:
    Copy code
    java.lang.RuntimeException: org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 3 attempts
    	at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:130)
    	at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:118)
    	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
    	at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
    	at scala.collection.Iterator.foreach(Iterator.scala:943)
    	at scala.collection.Iterator.foreach$(Iterator.scala:943)
    	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
    	at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1028)
    	at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1028)
    	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2455)
    	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
    	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    	at org.apache.spark.scheduler.Task.run(Task.scala:141)
    	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
    	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
    	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:829)
    Caused by: org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 3 attempts
    	at org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:65)
    	at org.apache.pinot.segment.local.utils.SegmentPushUtils.sendSegmentUris(SegmentPushUtils.java:231)
    	at org.apache.pinot.segment.local.utils.SegmentPushUtils.sendSegmentUris(SegmentPushUtils.java:115)
    	at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:128)
    	... 20 more
    m
    • 2
    • 1
  • v

    Vitor Mattioli

    07/04/2025, 3:03 PM
    Hi all, Regarding the use of “stream.kafka.consumer.type” , the pinot documentation states that we don’t have support for using high level kafka consumer (HCL), however we believe it works. Do we have any other documentation about ou usage of high level in any application? If it actually works, what limitations would we have?
    k
    • 2
    • 1
  • k

    Kamil

    07/10/2025, 8:23 PM
    hi, does pinot automatically drops nodes from discovery when they get unreachable ? What i have been seeing is that dead nodes are kept in the cluster, they are part of an ideal state of an offline table, i can't drop them because they are part of the ideal state. I tried to rebalance to move segments from dead nodes to alive nodes as segments ultimately are stored in the deep store, to remove dead ones from an ideal state, it didn't work, as cluster didn't want to rebalance, saying that segments are well balanced across the cluster.
    m
    • 2
    • 4
  • k

    Kamil

    07/11/2025, 6:38 PM
    hi, i am curious, does usually segments weight 10x bytes on servers comparing to what we have in deep store ? Is there a default compression applied to dimensions, i read that they should use lz4 by default, but something makes my data explode in size on servers.
    m
    • 2
    • 3
  • k

    Kamil

    07/15/2025, 9:02 PM
    hi, how fragile is MergeRollupTask ? I have run this task twice, both attempts failed with timeout and thats fine with some tuning it would get better, but the table in the former case had too much data, in the latter some data went missing.
    m
    • 2
    • 1
  • k

    Kamil

    07/17/2025, 1:18 PM
    hi, what is the easiest way to copy or backup a table in Pinot ? I want to perform some experiments on my table, but i don't want to load it from source again, is there an easy way to duplicate a table ?
    m
    • 2
    • 7
  • n

    Nitish Goyal

    07/30/2025, 1:07 AM
    Hi. Going to start a POC in Pinot. Have questions regarding the setup. Can anyone help answer the below query - We want to start ingesting from a single kafka topic into 100s of Pinot tables (1 table per tenant). Data for the tables comes into single Kafka topic for the ease of management of single topic. What are our options for the above use case? 1. Re ingest the data from single Kafka topic into 100s of Kafka topics and then do ingestion 1-1 from Kafka topic to Pinot table 2. Use flink to do fanout from Kafka topic into multiple Pinot tables via HTTP requests
    /ingest/batch
    3. Use flink to do fanout from Kafka topic into multiple Pinot tables and push segments directly 4. Use Spark for option 2 and 3 list above I have put more details and pros and cons of each approach in the document attached. Can someone guide what is the right way forward
    Pinot-Ingestion-Approaches.pdf
    m
    m
    • 3
    • 30
  • z

    Zhuangda Z

    08/03/2025, 3:06 AM
    Hi team, I would like to hear your thoughts on our table design: when does a table become too big? Currently, we use one single table for all customers and the daily volume is approaching 1B. With a segment size 1M-1.5M, we are generating ~1K segments a day. And if we were to keep 2y of data, the table would have 730K segments with the current traffic. We have been exploring the idea of sharding it into smaller tables by customer hash. And this would introduce new operation/maintenance load, e.g., one Kafka topic for each shard, expand/shrink the num of shards(consistent hashing). Wonder if there is any prior example with a similar design or what are your recommendations on simplifying the design if possible? 🙏
    m
    • 2
    • 4
  • b

    Boris Tashkulov

    08/11/2025, 2:38 PM
    Hi team, I have local docker compose Pinot setup , zookeeper, controller and server have mounted volumes. So I restarted docker machine (i’m using mac) and now cluster contains each component twice(dead and alive). Queries doesn’t work cause all data was on dead server. Alive server has the same data cause mounted volume but Pinot doesn’t know about that. After that I’ve set static ip’s for each cluster node and did the same, and everything works well. Cluster started, all data is ok, ingessions works well too. It looks like i have miss configuration, could you help me with it please?That the best practice?
    m
    • 2
    • 1
  • s

    San Kumar

    08/12/2025, 3:24 AM
    Hello Team we want to replace /create a segment with combination of dd-mm-yy-hh-<productid>-<country> in offline table .is it possible to do so and help me how can i define segment
  • y

    Yeshwanth

    08/14/2025, 10:11 AM
    Hi Team, Does pinot publish a list of recommended alerts to be configured for proper observability ? I couldn't find anything in the docs.
    m
    • 2
    • 1
  • b

    Boris Tashkulov

    08/15/2025, 8:56 AM
    Hey team, how I can make sure star tree index using for a query? To double check I used star tree recipe, and here is plans for queries in inverted index table (FILTER_INVERTED_INDEX) and stree table(FILTER_FULL_SCAN).
    m
    • 2
    • 6