Peter Corless
02/27/2025, 7:46 PMPeter Corless
02/27/2025, 7:47 PMPeter Corless
02/27/2025, 7:48 PMWhy can't we use the copy already in deep-store?
The segment in deep store is compressed, and typically will not have all indexes, especially if you've added indexes or derived columns to the segment later on. We use a separate location and copy for the tiered storage query execution, so we can keep it uncompressed and exactly in sync with what would have been on the server. Think of this S3 bucket as an exact replacement for the storage that would've been on the servers locally, with one major difference that we'll keep only 1 copy in S3 vs replication number of copies locally.Source: https://dev.startree.ai/docs/manage-data/set-up-tiered-storage/architecture
Ravi Jain
03/02/2025, 8:53 PM{
"tableName": "ingestionClientAvro",
"tableType": "REALTIME",
"segmentsConfig": {
"schemaName": "ingestionClientAvro",
"replication": "1",
"replicasPerPartition": "1",
"timeColumnName": "timestamp",
"minimizeDataMovement": false
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant",
"tagOverrideConfig": {}
},
"tableIndexConfig": {
"noDictionaryColumns": [],
"invertedIndexColumns": [],
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LOWLEVEL",
"stream.kafka.topic.name": "genericUserEvents",
"stream.kafka.broker.list": "localhost:9092",
"stream.kafka.consumer.prop.auto.offset.reset": "largest",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka30.KafkaConsumerFactory",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.decoder.prop.format": "AVRO",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "100M",
"schema.registry.url": "<http://localhost:8081>",
"schema.registry.subject": "genericUserEvents-value"
},
"aggregateMetrics": false,
"enableDefaultStarTree": false,
"nullHandlingEnabled": false,
"bloomFilterColumns": [],
"onHeapDictionaryColumns": [],
"rangeIndexColumns": [],
"sortedColumn": [],
"varLengthDictionaryColumns": [],
"rangeIndexVersion": 2,
"optimizeDictionaryForMetrics": false,
"optimizeDictionary": false,
"autoGeneratedInvertedIndex": false,
"createInvertedIndexDuringSegmentGeneration": false,
"loadMode": "MMAP",
"enableDynamicStarTreeCreation": false,
"columnMajorSegmentBuilderEnabled": true,
"optimizeDictionaryType": false,
"noDictionarySizeRatioThreshold": 0.85
},
"metadata": {},
"quota": {},
"routing": {},
"query": {},
"ingestionConfig": {
"continueOnError": false,
"rowTimeValueCheck": false,
"segmentTimeValueCheck": true
},
"isDimTable": false
}
Schema:
{
"schemaName": "ingestionClientAvro",
"dimensionFieldSpecs": [
{
"name": "data",
"dataType": "STRING"
},
{
"name": "id",
"dataType": "STRING"
},
{
"name": "schemaName",
"dataType": "STRING"
},
{
"name": "schemaVersion",
"dataType": "STRING"
},
{
"name": "tz",
"dataType": "STRING"
}
],
"metricFieldSpecs": [],
"dateTimeFieldSpecs": [
{
"name": "timestamp",
"dataType": "LONG",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
Chao Cao
03/04/2025, 2:16 AMRajat
03/04/2025, 6:45 AMRajat
03/04/2025, 8:25 AMJonathan Baxter
03/04/2025, 3:58 PMChao Cao
03/04/2025, 5:20 PMChao Cao
03/04/2025, 6:04 PM{
"outer_key_1": {
"outer_key_2": "
[
{
\"inner_key_1\":
{
{
\"inner_key_2\":
{
\"value_key_1\":
{\"key\" : \"value\"}
}
}
}
}
]"
}
I suspect the answer is in ChainingTransformations. But I don't know how to keep chaining in the TransformConfigs
when I need to go over to ingestionConfig.complexTypeConfig
to put in arrays that need to be un-nested.Wilson
03/05/2025, 3:02 AMCaught exception while calculating target assignment: java.lang.IllegalStateException: There are less instances: 1 in instance partitions: ODS_TIRESIAS_MCD_BJ_SEGMENT_LIST_CONSUMING than the table replication: 2
My realtime table replication is set to 2, and I have 2 servers. I’m not sure what 'instance partitions' means — where can I find more information about it?
When I use the GET tables/{tableName}/instancePartitions
, it always responds
{
"code": 404,
"error": "Failed to find the instance partitions"
}
Amit
03/05/2025, 2:10 PMDvirZaken
03/06/2025, 8:04 PMKai D
03/13/2025, 1:20 PMRoshan
03/13/2025, 4:36 PMYarden Rokach
Venkat
03/18/2025, 7:36 AMKasinath Reddy
03/18/2025, 10:53 AMKasinath Reddy
03/18/2025, 10:53 AMCurrent upsert config
"upsertConfig": {
"mode": "PARTIAL",
"comparisonColumns": [
"processed_at"
],
"metadataTTL": 0,
"dropOutOfOrderRecord": false,
"enableSnapshot": true,
"deletedKeysTTL": 0,
"partialUpsertStrategies": {},
"enablePreload": false,
"consistencyMode": "SNAPSHOT",
"upsertViewRefreshIntervalMs": 3000,
"allowPartialUpsertConsumptionDuringCommit": false,
"hashFunction": "NONE",
"defaultPartialUpsertStrategy": "OVERWRITE"
},
"isDimTable": false
}
Roshan
03/18/2025, 3:35 PMRoshan
03/18/2025, 4:40 PMimport requests
import json
from pathlib import Path
import time
PINOT_CONTROLLER = "********"
PINOT_BROKER = "*********"
AUTH_HEADERS = {
"Authorization": "Basic YWRtaW46dmVyeXNlY3JldA==",
"Content-Type": "application/json"
}
def verify_segment(tenant_name, segment_name):
max_retries = 10
retry_interval = 2 # seconds
for i in range(max_retries):
print(f"\nChecking segment status (attempt {i+1}/{max_retries})...")
response = requests.get(f"{PINOT_CONTROLLER}/segments/{tenant_name}/{segment_name}/metadata",headers=AUTH_HEADERS)
if response.status_code == 200:
print("Segment is ready!")
return True
print(f"Segment not ready yet, waiting {retry_interval} seconds...")
time.sleep(retry_interval)
return False
def simple_ingest(tenant_name):
# Verify schema and table exist
schema_response = requests.get(f'{PINOT_CONTROLLER}/schemas/{tenant_name}',headers=AUTH_HEADERS)
table_response = requests.get(f'{PINOT_CONTROLLER}/tables/{tenant_name}',headers=AUTH_HEADERS)
if schema_response.status_code != 200 or table_response.status_code != 200:
print(f"Schema or table missing for tenant {tenant_name}. Please run create_schema.py and create_table.py first")
return
csv_path = Path(f"data/{tenant_name}_data.csv")
print(f"\nUploading data for tenant {tenant_name}...")
with open(csv_path, 'rb') as f:
files = {'file': (f'{tenant_name}_data.csv', f, 'text/csv')}
# Using a dictionary for column mapping first
column_map = {
"Ticket ID": "Ticket_ID",
"Customer Name": "Customer_Name",
"Customer Email": "Customer_Email",
"Company_name": "Company_name",
"Customer Age": "Customer_Age",
"Customer Gender": "Customer_Gender",
"Product purchased": "product_purchased",
"Date of Purchase": "Date_of_Purchase",
"Ticket Subject": "Ticket_Subject",
"Description": "Description",
"Ticket Status": "Ticket_Status",
"Resolution": "Resolution",
"Ticket Priority": "Ticket_Priority",
"Source": "Source",
"Created date": "Created_date",
"First Response Time": "First_Response_Time",
"Time to Resolution": "Time_of_Resolution",
"Number of conversations": "Number_of_conversations",
"Customer Satisfaction Rating": "Customer_Satisfaction_Rating",
"Category": "Category",
"Intent": "Intent",
"Type": "Type",
"Relevance": "Relevance",
"Escalate": "Escalate",
"Sentiment": "Sentiment",
"Tags": "Tags",
"Agent": "Agent",
"Agent politeness": "Agent_politeness",
"Agent communication": "Agent_communication",
"Agent Patience": "Agent_Patience",
"Agent overall performance": "Agent_overall_performance",
"Conversations": "Conversations"
}
config = {
"inputFormat": "csv",
"header": "true",
"delimiter": ",",
"fileFormat": "csv",
"multiValueDelimiter": ";",
"skipHeader": "false",
# Convert dictionary to proper JSON string
"columnHeaderMap": json.dumps(column_map)
}
params = {
'tableNameWithType': f'{tenant_name}_OFFLINE',
'batchConfigMapStr': json.dumps(config)
}
upload_headers = {
"Authorization": AUTH_HEADERS["Authorization"]
}
response = <http://requests.post|requests.post>(
f'{PINOT_CONTROLLER}/ingestFromFile',
files=files,
params=params,
headers=upload_headers
)
print(f"Upload response: {response.status_code}")
if response.status_code != 200:
print(f"Error: {response.text}")
return
if response.status_code == 200:
try:
response_data = json.loads(response.text)
print(f"Response data: {response_data}")
segment_name = response_data["status"].split("segment: ")[1]
print(f"\nWaiting for segment {segment_name} to be ready...")
if verify_segment(tenant_name, segment_name):
query = {
"sql": f"SELECT COUNT(*) FROM {tenant_name}_OFFLINE",
"trace": False
}
query_response = <http://requests.post|requests.post>(
f"{PINOT_BROKER}/query/sql",
json=query,
headers=AUTH_HEADERS
)
print("\nQuery response:", query_response.status_code)
if query_response.status_code == 200:
print(json.dumps(query_response.json(), indent=2))
else:
print("Segment verification timed out")
except Exception as e:
print(f"Error processing segment: {e}")
print(f"Full response text: {response.text}")
if __name__ == "__main__":
simple_ingest("test_tenant")
Slack ConversationSandeep Jain
03/20/2025, 4:33 AMKasinath Reddy
03/20/2025, 4:58 AMsegment uri : "<http://pinot-controller:9000/segments/test/test__0__101__20250315T0506Z>"
Mike Stewart
03/20/2025, 5:08 PM"starTreeIndexConfigs": [
{
"dimensionsSplitOrder": ["event_type"],
"functionColumnPairs": ["SUM__event_currency_amount"],
"maxLeafRecords": 10000
}
]
When I execute the following query:
SELECT event_type, SUM(event_currency_amount)
FROM [table]
GROUP BY event_type;
the Star Tree index is used, and only 9 documents are read.
However, when I introduce a time filter of any kind:
SELECT event_type, SUM(event_currency_amount)
FROM [table]
WHERE event_time >= 0 AND event_time < 1800000000000
GROUP BY event_type;
the Star Tree index is ignored, resulting in 14 million documents being scanned instead.
Since the event_time
range in the WHERE
clause fully encompasses the segment’s event_time
range, I expected the Star Tree index to still be utilized. Based on previous discussions, I was under the impression that this should be the case.
I also reviewed the below link, which I understood might specifically address this scenario in the product.
For fuller context, I currently have a single offline table with a single segment. Initially, I had a Hybrid table, but I removed it to minimize variables while troubleshooting this issue.
Any guidance on why this is happening and how I might ensure the Star Tree index is used would be greatly appreciated. Thanks in advance!
Improve star-tree to use star-node when the predicate matches all the non-star nodes by Jackie-Jiang · Pull Request #9667 · apache/pinot · GitHubManish G
03/22/2025, 3:04 PMManish G
03/22/2025, 3:46 PM{
"schemaName": "my-test-schema",
"enableColumnBasedNullHandling": true,
"dimensionFieldSpecs": [
{
"name": "field",
"dataType": "FLOAT",
"fieldType": "DIMENSION"
}
]
}
I want to insert null value in column:
field
1.43
null
It throws error:
Caused by: java.lang.NumberFormatException: For input string: "null"
at java.base/jdk.internal.math.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:2054)
at java.base/jdk.internal.math.FloatingDecimal.parseFloat(FloatingDecimal.java:122)
at java.base/java.lang.Float.parseFloat(Float.java:570)
at org.apache.pinot.common.utils.PinotDataType$11.toFloat(PinotDataType.java:617)
at org.apache.pinot.common.utils.PinotDataType$7.convert(PinotDataType.java:425)
at org.apache.pinot.common.utils.PinotDataType$7.convert(PinotDataType.java:375)
at org.apache.pinot.segment.local.recordtransformer.DataTypeTransformer.transform(DataTypeTransformer.java:118)
What is correct way of having null values?baarath
04/23/2025, 6:02 AMjava.lang.RuntimeException: Caught exception during running - org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:152)
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.runIngestionJob(IngestionJobLauncher.java:121)
at org.apache.pinot.tools.admin.command.LaunchDataIngestionJobCommand.execute(LaunchDataIngestionJobCommand.java:132)
at org.apache.pinot.tools.Command.call(Command.java:33)
at org.apache.pinot.tools.Command.call(Command.java:29)
at picocli.CommandLine.executeUserObject(CommandLine.java:2045)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
at picocli.CommandLine.execute(CommandLine.java:2174)
at org.apache.pinot.tools.admin.PinotAdministrator.execute(PinotAdministrator.java:173)
at org.apache.pinot.tools.admin.PinotAdministrator.main(PinotAdministrator.java:204)
Caused by: java.lang.IllegalArgumentException
at java.base/sun.nio.fs.UnixFileSystem.getPathMatcher(UnixFileSystem.java:286)
at org.apache.pinot.common.segment.generation.SegmentGenerationUtils.listMatchedFilesWithRecursiveOption(SegmentGenerationUtils.java:263)
at org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner.run(SegmentGenerationJobRunner.java:177)
at org.apache.pinot.spi.ingestion.batch.IngestionJobLauncher.kickoffIngestionJob(IngestionJobLauncher.java:150)
... 14 more
baarath
04/24/2025, 10:41 AMbaarath
04/28/2025, 8:54 AMRam Makireddi
04/28/2025, 3:17 PM