Venkat
03/18/2025, 7:36 AMKasinath Reddy
03/18/2025, 10:53 AMKasinath Reddy
03/18/2025, 10:53 AMCurrent upsert config
"upsertConfig": {
"mode": "PARTIAL",
"comparisonColumns": [
"processed_at"
],
"metadataTTL": 0,
"dropOutOfOrderRecord": false,
"enableSnapshot": true,
"deletedKeysTTL": 0,
"partialUpsertStrategies": {},
"enablePreload": false,
"consistencyMode": "SNAPSHOT",
"upsertViewRefreshIntervalMs": 3000,
"allowPartialUpsertConsumptionDuringCommit": false,
"hashFunction": "NONE",
"defaultPartialUpsertStrategy": "OVERWRITE"
},
"isDimTable": false
}
Roshan
03/18/2025, 3:35 PMRoshan
03/18/2025, 4:40 PMimport requests
import json
from pathlib import Path
import time
PINOT_CONTROLLER = "********"
PINOT_BROKER = "*********"
AUTH_HEADERS = {
"Authorization": "Basic YWRtaW46dmVyeXNlY3JldA==",
"Content-Type": "application/json"
}
def verify_segment(tenant_name, segment_name):
max_retries = 10
retry_interval = 2 # seconds
for i in range(max_retries):
print(f"\nChecking segment status (attempt {i+1}/{max_retries})...")
response = requests.get(f"{PINOT_CONTROLLER}/segments/{tenant_name}/{segment_name}/metadata",headers=AUTH_HEADERS)
if response.status_code == 200:
print("Segment is ready!")
return True
print(f"Segment not ready yet, waiting {retry_interval} seconds...")
time.sleep(retry_interval)
return False
def simple_ingest(tenant_name):
# Verify schema and table exist
schema_response = requests.get(f'{PINOT_CONTROLLER}/schemas/{tenant_name}',headers=AUTH_HEADERS)
table_response = requests.get(f'{PINOT_CONTROLLER}/tables/{tenant_name}',headers=AUTH_HEADERS)
if schema_response.status_code != 200 or table_response.status_code != 200:
print(f"Schema or table missing for tenant {tenant_name}. Please run create_schema.py and create_table.py first")
return
csv_path = Path(f"data/{tenant_name}_data.csv")
print(f"\nUploading data for tenant {tenant_name}...")
with open(csv_path, 'rb') as f:
files = {'file': (f'{tenant_name}_data.csv', f, 'text/csv')}
# Using a dictionary for column mapping first
column_map = {
"Ticket ID": "Ticket_ID",
"Customer Name": "Customer_Name",
"Customer Email": "Customer_Email",
"Company_name": "Company_name",
"Customer Age": "Customer_Age",
"Customer Gender": "Customer_Gender",
"Product purchased": "product_purchased",
"Date of Purchase": "Date_of_Purchase",
"Ticket Subject": "Ticket_Subject",
"Description": "Description",
"Ticket Status": "Ticket_Status",
"Resolution": "Resolution",
"Ticket Priority": "Ticket_Priority",
"Source": "Source",
"Created date": "Created_date",
"First Response Time": "First_Response_Time",
"Time to Resolution": "Time_of_Resolution",
"Number of conversations": "Number_of_conversations",
"Customer Satisfaction Rating": "Customer_Satisfaction_Rating",
"Category": "Category",
"Intent": "Intent",
"Type": "Type",
"Relevance": "Relevance",
"Escalate": "Escalate",
"Sentiment": "Sentiment",
"Tags": "Tags",
"Agent": "Agent",
"Agent politeness": "Agent_politeness",
"Agent communication": "Agent_communication",
"Agent Patience": "Agent_Patience",
"Agent overall performance": "Agent_overall_performance",
"Conversations": "Conversations"
}
config = {
"inputFormat": "csv",
"header": "true",
"delimiter": ",",
"fileFormat": "csv",
"multiValueDelimiter": ";",
"skipHeader": "false",
# Convert dictionary to proper JSON string
"columnHeaderMap": json.dumps(column_map)
}
params = {
'tableNameWithType': f'{tenant_name}_OFFLINE',
'batchConfigMapStr': json.dumps(config)
}
upload_headers = {
"Authorization": AUTH_HEADERS["Authorization"]
}
response = <http://requests.post|requests.post>(
f'{PINOT_CONTROLLER}/ingestFromFile',
files=files,
params=params,
headers=upload_headers
)
print(f"Upload response: {response.status_code}")
if response.status_code != 200:
print(f"Error: {response.text}")
return
if response.status_code == 200:
try:
response_data = json.loads(response.text)
print(f"Response data: {response_data}")
segment_name = response_data["status"].split("segment: ")[1]
print(f"\nWaiting for segment {segment_name} to be ready...")
if verify_segment(tenant_name, segment_name):
query = {
"sql": f"SELECT COUNT(*) FROM {tenant_name}_OFFLINE",
"trace": False
}
query_response = <http://requests.post|requests.post>(
f"{PINOT_BROKER}/query/sql",
json=query,
headers=AUTH_HEADERS
)
print("\nQuery response:", query_response.status_code)
if query_response.status_code == 200:
print(json.dumps(query_response.json(), indent=2))
else:
print("Segment verification timed out")
except Exception as e:
print(f"Error processing segment: {e}")
print(f"Full response text: {response.text}")
if __name__ == "__main__":
simple_ingest("test_tenant")
Slack ConversationSandeep Jain
03/20/2025, 4:33 AMKasinath Reddy
03/20/2025, 4:58 AMsegment uri : "<http://pinot-controller:9000/segments/test/test__0__101__20250315T0506Z>"
Mike Stewart
03/20/2025, 5:08 PM"starTreeIndexConfigs": [
{
"dimensionsSplitOrder": ["event_type"],
"functionColumnPairs": ["SUM__event_currency_amount"],
"maxLeafRecords": 10000
}
]
When I execute the following query:
SELECT event_type, SUM(event_currency_amount)
FROM [table]
GROUP BY event_type;
the Star Tree index is used, and only 9 documents are read.
However, when I introduce a time filter of any kind:
SELECT event_type, SUM(event_currency_amount)
FROM [table]
WHERE event_time >= 0 AND event_time < 1800000000000
GROUP BY event_type;
the Star Tree index is ignored, resulting in 14 million documents being scanned instead.
Since the event_time
range in the WHERE
clause fully encompasses the segment’s event_time
range, I expected the Star Tree index to still be utilized. Based on previous discussions, I was under the impression that this should be the case.
I also reviewed the below link, which I understood might specifically address this scenario in the product.
For fuller context, I currently have a single offline table with a single segment. Initially, I had a Hybrid table, but I removed it to minimize variables while troubleshooting this issue.
Any guidance on why this is happening and how I might ensure the Star Tree index is used would be greatly appreciated. Thanks in advance!
Improve star-tree to use star-node when the predicate matches all the non-star nodes by Jackie-Jiang · Pull Request #9667 · apache/pinot · GitHubManish G
03/22/2025, 3:04 PMManish G
03/22/2025, 3:46 PM{
"schemaName": "my-test-schema",
"enableColumnBasedNullHandling": true,
"dimensionFieldSpecs": [
{
"name": "field",
"dataType": "FLOAT",
"fieldType": "DIMENSION"
}
]
}
I want to insert null value in column:
field
1.43
null
It throws error:
Caused by: java.lang.NumberFormatException: For input string: "null"
at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054)
at java.base/jdk.internal.math.FloatingDecimal.parseFloat(FloatingDecimal.java:122)
at java.base/java.lang.Float.parseFloat(Float.java:570)
at org.apache.pinot.common.utils.PinotDataType$11.toFloat(PinotDataType.java:617)
at org.apache.pinot.common.utils.PinotDataType$7.convert(PinotDataType.java:425)
at org.apache.pinot.common.utils.PinotDataType$7.convert(PinotDataType.java:375)
at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:118)
What is correct way of having null values?baarath
04/23/2025, 6:02 AMjava.lang.RuntimeException: Caught exception during running - org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:152)
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.runIngestionJob(IngestionJobLauncher.java:121)
at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:132)
at org.apache.pinot.tools.Command.call(Command.java:33)
at org.apache.pinot.tools.Command.call(Command.java:29)
at picocli.CommandLine.executeUserObject(CommandLine.java:2045)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
at picocli.CommandLine.execute(CommandLine.java:2174)
at org.apache.pinot.tools.admin.PinotAdministrator.execute(PinotAdministrator.java:173)
at org.apache.pinot.tools.admin.PinotAdministrator.main(PinotAdministrator.java:204)
Caused by: java.lang.IllegalArgumentException
at java.base/sun.nio.fs.UnixFileSystem.getPathMatcher(UnixFileSystem.java:286)
at org.apache.pinot.common.segment.generation.SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(SegmentGenerationUtils.java:263)
at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.run(SegmentGenerationJobRunner.java:177)
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:150)
... 14 more
baarath
04/24/2025, 10:41 AMbaarath
04/28/2025, 8:54 AMRam Makireddi
04/28/2025, 3:17 PMVivekanand
05/12/2025, 7:17 AMVishruth Raj
06/09/2025, 1:40 AMRoss Morrow
06/09/2025, 1:14 PMAjinkya
06/17/2025, 8:50 AMNithish
07/04/2025, 9:29 AMSegmentCreationAndTarPush
but getting 413 error for large tar files
• SegmentCreationAndMetadataPush
this works fine but it has known issue as per the thread - https://apache-pinot.slack.com/archives/CDRCA57FC/p1715293105121389
jobType: SegmentCreationAndUriPush
inputDirURI: '<gs://bucket-name/warehouse/dataengineering.db/ems_attributes/data>'
includeFileNamePattern: 'glob:**/*.parquet'
excludeFileNamePattern: 'glob:**/_SUCCESS,glob:**/*.crc,glob:**/*metadata*,glob:**/*.json'
outputDirURI: '<gs://bucket-name/pinot-segments/poc_ems_attributes>'
overwriteOutput: true
# Execution Framework
executionFrameworkSpec:
name: 'spark'
# replace spark with spark3 for versions > 3.2.0
segmentGenerationJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentGenerationJobRunner'
segmentTarPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentTarPushJobRunner'
segmentUriPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner'
segmentMetadataPushJobRunnerClassName: 'org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentMetadataPushJobRunner'
extraConfigs:
stagingDir: '<gs://bucket-name/pinot-batch-ingestion/staging>'
# Record Reader Configuration for Parquet
recordReaderSpec:
dataFormat: 'parquet'
className: 'org.apache.pinot.plugin.inputformat.parquet.ParquetRecordReader'
# Pinot File System
pinotFSSpecs:
- scheme: 'gs'
className: 'org.apache.pinot.plugin.filesystem.GcsPinotFS'
# Table Configuration
tableSpec:
tableName: 'poc_ems_attributes'
schemaURI: '<https://prod-dp-pinot-controller.in/schemas/poc_ems_attributes>'
tableConfigURI: '<https://prod-dp-pinot-controller.in/tables/poc_ems_attributes>'
# Segment Name Generation
segmentNameGeneratorSpec:
type: simple
configs:
segment.name.prefix: 'poc_ems_attributes'
segment.name.postfix: 'uri_push'
exclude.sequence.id: false
# Pinot Cluster Configuration
pinotClusterSpecs:
- controllerURI: '<https://prod-dp-pinot-controller.in>'
# Push Job Configuration
pushJobSpec:
pushAttempts: 3
pushRetryIntervalMillis: 15000
pushParallelism: 2
ERROR:
java.lang.RuntimeException: org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 3 attempts
at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:130)
at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:118)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1(JavaRDDLike.scala:352)
at org.apache.spark.api.java.JavaRDDLike.$anonfun$foreach$1$adapted(JavaRDDLike.scala:352)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2(RDD.scala:1028)
at org.apache.spark.rdd.RDD.$anonfun$foreach$2$adapted(RDD.scala:1028)
at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2455)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.pinot.spi.utils.retry.AttemptsExceededException: Operation failed after 3 attempts
at org.apache.pinot.spi.utils.retry.BaseRetryPolicy.attempt(BaseRetryPolicy.java:65)
at org.apache.pinot.segment.local.utils.SegmentPushUtils.sendSegmentUris(SegmentPushUtils.java:231)
at org.apache.pinot.segment.local.utils.SegmentPushUtils.sendSegmentUris(SegmentPushUtils.java:115)
at org.apache.pinot.plugin.ingestion.batch.spark3.SparkSegmentUriPushJobRunner$1.call(SparkSegmentUriPushJobRunner.java:128)
... 20 more
Vitor Mattioli
07/04/2025, 3:03 PMKamil
07/10/2025, 8:23 PMKamil
07/11/2025, 6:38 PMKamil
07/15/2025, 9:02 PMKamil
07/17/2025, 1:18 PMNitish Goyal
07/30/2025, 1:07 AM/ingest/batch
3. Use flink to do fanout from Kafka topic into multiple Pinot tables and push segments directly
4. Use Spark for option 2 and 3 list above
I have put more details and pros and cons of each approach in the document attached. Can someone guide what is the right way forwardZhuangda Z
08/03/2025, 3:06 AMBoris Tashkulov
08/11/2025, 2:38 PMSan Kumar
08/12/2025, 3:24 AMYeshwanth
08/14/2025, 10:11 AMBoris Tashkulov
08/15/2025, 8:56 AM