Vik Gamov
03/25/2024, 2:50 PMVansh Goel
03/29/2024, 2:31 PMHarish Bohara
04/03/2024, 8:09 AMVishal Bisht
04/11/2024, 10:08 AMprasanna
04/26/2024, 9:07 AMZach Lorimer
04/30/2024, 9:01 PMayush
05/02/2024, 5:37 AMAshley Allen
05/02/2024, 2:40 PMRashpal Singh
05/02/2024, 6:11 PMpiby
05/12/2024, 6:31 PMYusuf Nar
07/15/2024, 10:23 AMDor Levi
07/30/2024, 9:31 PMPhilippe Noël
08/08/2024, 11:54 PMSumitra Saksham
09/03/2024, 5:30 PMSumitra Saksham
09/04/2024, 2:24 PMHassan Ait Brik
10/09/2024, 12:54 PMAyoade Abel Adegbite
11/09/2024, 11:27 PMCristina Munteanu
11/13/2024, 9:40 PMIndira Vashisth
11/15/2024, 2:20 AMYusuf Nar
11/28/2024, 7:13 PMAkhil Dubey
01/05/2025, 7:38 AMSaurabh Lambe
01/23/2025, 3:24 AMNavdeep Gaur
03/11/2025, 11:18 AMPeter Corless
03/11/2025, 4:34 PMRoshan
03/18/2025, 3:35 PMRoshan
03/18/2025, 4:41 PMimport requests
import json
from pathlib import Path
import time
PINOT_CONTROLLER = "********"
PINOT_BROKER = "*********"
AUTH_HEADERS = {
"Authorization": "Basic YWRtaW46dmVyeXNlY3JldA==",
"Content-Type": "application/json"
}
def verify_segment(tenant_name, segment_name):
max_retries = 10
retry_interval = 2 # seconds
for i in range(max_retries):
print(f"\nChecking segment status (attempt {i+1}/{max_retries})...")
response = requests.get(f"{PINOT_CONTROLLER}/segments/{tenant_name}/{segment_name}/metadata",headers=AUTH_HEADERS)
if response.status_code == 200:
print("Segment is ready!")
return True
print(f"Segment not ready yet, waiting {retry_interval} seconds...")
time.sleep(retry_interval)
return False
def simple_ingest(tenant_name):
# Verify schema and table exist
schema_response = requests.get(f'{PINOT_CONTROLLER}/schemas/{tenant_name}',headers=AUTH_HEADERS)
table_response = requests.get(f'{PINOT_CONTROLLER}/tables/{tenant_name}',headers=AUTH_HEADERS)
if schema_response.status_code != 200 or table_response.status_code != 200:
print(f"Schema or table missing for tenant {tenant_name}. Please run create_schema.py and create_table.py first")
return
csv_path = Path(f"data/{tenant_name}_data.csv")
print(f"\nUploading data for tenant {tenant_name}...")
with open(csv_path, 'rb') as f:
files = {'file': (f'{tenant_name}_data.csv', f, 'text/csv')}
# Using a dictionary for column mapping first
column_map = {
"Ticket ID": "Ticket_ID",
"Customer Name": "Customer_Name",
"Customer Email": "Customer_Email",
"Company_name": "Company_name",
"Customer Age": "Customer_Age",
"Customer Gender": "Customer_Gender",
"Product purchased": "product_purchased",
"Date of Purchase": "Date_of_Purchase",
"Ticket Subject": "Ticket_Subject",
"Description": "Description",
"Ticket Status": "Ticket_Status",
"Resolution": "Resolution",
"Ticket Priority": "Ticket_Priority",
"Source": "Source",
"Created date": "Created_date",
"First Response Time": "First_Response_Time",
"Time to Resolution": "Time_of_Resolution",
"Number of conversations": "Number_of_conversations",
"Customer Satisfaction Rating": "Customer_Satisfaction_Rating",
"Category": "Category",
"Intent": "Intent",
"Type": "Type",
"Relevance": "Relevance",
"Escalate": "Escalate",
"Sentiment": "Sentiment",
"Tags": "Tags",
"Agent": "Agent",
"Agent politeness": "Agent_politeness",
"Agent communication": "Agent_communication",
"Agent Patience": "Agent_Patience",
"Agent overall performance": "Agent_overall_performance",
"Conversations": "Conversations"
}
config = {
"inputFormat": "csv",
"header": "true",
"delimiter": ",",
"fileFormat": "csv",
"multiValueDelimiter": ";",
"skipHeader": "false",
# Convert dictionary to proper JSON string
"columnHeaderMap": json.dumps(column_map)
}
params = {
'tableNameWithType': f'{tenant_name}_OFFLINE',
'batchConfigMapStr': json.dumps(config)
}
upload_headers = {
"Authorization": AUTH_HEADERS["Authorization"]
}
response = <http://requests.post|requests.post>(
f'{PINOT_CONTROLLER}/ingestFromFile',
files=files,
params=params,
headers=upload_headers
)
print(f"Upload response: {response.status_code}")
if response.status_code != 200:
print(f"Error: {response.text}")
return
if response.status_code == 200:
try:
response_data = json.loads(response.text)
print(f"Response data: {response_data}")
segment_name = response_data["status"].split("segment: ")[1]
print(f"\nWaiting for segment {segment_name} to be ready...")
if verify_segment(tenant_name, segment_name):
query = {
"sql": f"SELECT COUNT(*) FROM {tenant_name}_OFFLINE",
"trace": False
}
query_response = <http://requests.post|requests.post>(
f"{PINOT_BROKER}/query/sql",
json=query,
headers=AUTH_HEADERS
)
print("\nQuery response:", query_response.status_code)
if query_response.status_code == 200:
print(json.dumps(query_response.json(), indent=2))
else:
print("Segment verification timed out")
except Exception as e:
print(f"Error processing segment: {e}")
print(f"Full response text: {response.text}")
if __name__ == "__main__":
simple_ingest("test_tenant")
Slack Conversationtelugu bharadwaj
04/04/2025, 12:43 PMCristina Munteanu
04/30/2025, 8:00 PMSlackbot
05/12/2025, 7:34 PMBlux Chang
05/13/2025, 6:11 PM