Let me should you my code, I am using livekit free tier, Facing this error ofently,
from dotenv
import load_dotenv
import json
import asyncio
import logging
import aiohttp
from livekit
import agents
from livekit*.*agents
import AgentSession*,* Agent*,* RoomInputOptions*,* function_tool*,* get_job_context*,* RunContext
from openai*.
types.
beta.
realtime.*session
import TurnDetection
from livekit*.
agents.
voice.*events
import ErrorEvent
from livekit*.*plugins
import (
openai*,*
noise_cancellation*,*
)
load_dotenv()
logger
= logging*.*getLogger("my-agent")
logger*.
setLevel(logging.*WARNING)
class *Assistant*(
Agent)*:*
def __init__(
self)
-> None*:*
super()*.*__init__(*instructions=*"You are a helpful voice AI assistant For Allied Bank, So only answer user about allied bank. Answer in Roman Urdu Language")
@function_tool()
async
def context_lookup(_self_*,*
context: RunContext*,*
query: str)*->* dict[_str_*,*
str]*:*
"""
Call this tool when a user query requires context about Allied bank.
Args:
query (str): The full question asked by the user in Roman Urdu.
"""
api_url
= "
http://localhost:8000/process-text"
input_data
= {
"chat_history"*:* []*,*
"question"*:* query
}
try:
async with aiohttp*.*ClientSession()
as session*:*
async with session*.*post(
api_url*,*
*headers=
{"Content-Type":* "application/json"}*,*
*json=
input_data,*
*timeout=
aiohttp.*ClientTimeout(*total=*100)
)
as response*:*
if response*.*status
!= 200*:*
logger*.
error("RAG API error: HTTP %d",* response*.*status)
return "I apologize, but I'm having trouble accessing the required information."
response_json
= await response*.*json()
return {
"context"*:* response_json*,*
"status"*:* "successful"
}
except aiohttp*.*ClientError
as http_err*:*
logger*.
error("HTTP error occurred while fetching RAG context: %s",* str(http_err))
except asyncio*.
TimeoutError:*
logger*.*error("Timeout error while calling RAG API.")
except Exception
as e*:*
logger*.
error("Unexpected error fetching RAG context: %s",* str(e))
return "I apologize, but there was an error processing your request."
async
def entrypoint(
ctx: agents*.
JobContext):*
await ctx*.*connect()
agent
= AgentSession(
*llm=
openai.
realtime.*RealtimeModel(
*model=
"gpt-4o-mini-realtime-preview",*
*voice=
"coral",*
*turn_detection=*TurnDetection(
*type=
"server_vad",*
*threshold=
0.8,*
*prefix_padding_ms=
300,*
*silence_duration_ms=
500,*
*create_response=
True,*
*interrupt_response=
True,*
)
)
)
await agent*.*start(
*agent=
Assistant(),*
*room=
ctx.
room,*
*room_input_options=*RoomInputOptions(
*noise_cancellation=
noise_cancellation.
BVC(),*
)*,*
)
@agent*.*on("error")
def on_error(
ev: ErrorEvent)*:*
if ev*.
error.
recoverable:*
return
logger*.
warning(f"session is closing due to unrecoverable error: Hassan Mahmood {ev.*error}")
_# asyncio.create_task(_
_# agent.generate_reply(_
# instructions="I'm having trouble connecting right now. Let me transfer your call."
_# # allow_interruptions=False,_
# )
# )
_# await agent.generate_reply(_
# instructions="Greet the user and offer your assistance about Allied Bank in Roman Urdu Langauge."
# )
if name == "__main__"*:*
agents*.
cli.
run_app(agents.*WorkerOptions(*entrypoint_fnc=*entrypoint))