import json import requests import time import log...
# ask-ai
a
import json import requests import time import logging import asyncio from livekit import api, rtc from livekit.agents.llm import function_tool from livekit.agents import RunContext, Agent from app.config.config import get_settings settings = get_settings() class AgentActions(Agent): def __init__(self, lkapi: api.LiveKitAPI, logger: logging.Logger, settings): self.lkapi = lkapi self.logger = logger self.settings = settings self.last_dtmf_press = 0 async def hangup(self, room: rtc.Room, participant: rtc.RemoteParticipant): """Disconnects the participant from the room and updates call status.""" try: await asyncio.sleep(3) try: await self.state.lkapi.room.remove_participant(api.RoomParticipantIdentity( room=room.name, identity=participant.identity, )) except Exception as e: self.state.logger.info(f"Participant already left the call: {e}") finally: response = requests.post( f"{self.state.settings.WEB_CALLING_BASE_URL}/api/setCallStatus", data=json.dumps({ "roomId": room.name, "status": "disconnected" }), headers={"Content-Type": "application/json"}, ) self.state.logger.info(f"Update call status to disconnected: {response.status_code}") except Exception as e: self.state.logger.error(f"Received error while ending call: {e}") @function_tool async def end_call(self, run_context: RunContext): """ONLY call this when the user explicitly wants to end the conversation or says goodbye. Do NOT call this after providing information or completing other tasks.""" state = run_context.agent.userdata logger = state.logger room = run_context.room participant = run_context.participant logger.info(f"Ending the call for {participant.identity}") run_context.agent.chat_ctx.add_message( role="system", content="[call ended by AI]", created_at=time.time() ) await self.hangup(room, participant) @function_tool async def delayed_response(self, run_context: RunContext, text: str): """Called when there needs to be a delay before the text is said to the user.""" state = run_context.agent.userdata logger = state.logger logger.info("Delaying this message") await asyncio.sleep(0.1) run_context.agent.chat_ctx.add_message( role="system", content=f"[Delayed Response: {text}]", ) @function_tool async def send_dtmf_code(self, run_context: RunContext, code: int): """Called when you need to send a DTMF code to the phone number for the current step.""" state = run_context.agent.userdata logger = state.logger room = run_context.room current_time = time.time() if current_time - state.last_dtmf_press < 7: logger.info("DTMF code rejected due to cooldown") run_context.agent.chat_ctx.add_message( role="system", content=f"[DTMF Rejected due to cooldown: {code}]", ) return None logger.info(f"Sending DTMF code {code} to the phone number for the current step.") state.last_dtmf_press = current_time try: await room.local_participant.publish_dtmf(code=code, digit=str(code)) run_context.agent.chat_ctx.add_message( role="system", content=f"[DTMF Pressed: {code}]", ) except Exception as e: logger.error(f"Exception in send_dtmf_code function: {e}") run_context.agent.chat_ctx.add_message( role="system", content=f"[DTMF Error: {str(e)}]", ) return None What fixes are needed?
t
Thanks for reaching out, giri. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
The provided code for handling LiveKit API methods has several areas that can be improved for better reliability and adherence to best practices: • Room and Participant Management: ◦ Ensure proper initialization of the room with event loop handling and connection options. For example:
Copy code
room = rtc.Room(loop=asyncio.get_event_loop())
await room.connect(url, token, options=rtc.RoomOptions(auto_subscribe=True))
◦ Set up event handlers for participant lifecycle events to handle connections and disconnections effectively. ◦ Access participants safely with error handling to avoid issues when the room is not connected. • DTMF Code Handling: ◦ Implement a cooldown mechanism for sending DTMF codes to prevent rapid presses. For example:
Copy code
if current_time - self.last_dtmf_press < 3:
    <http://logger.info|logger.info>("DTMF code rejected due to cooldown")
    return None
◦ Use proper asynchronous patterns for sending DTMF codes and handling errors. ◦ Consider publishing DTMF codes as data for additional handling if needed. • Error Handling and Logging: ◦ Ensure comprehensive logging for all operations, including successful and failed attempts. ◦ Handle exceptions gracefully to avoid crashes and provide meaningful error messages. • Resource Management: ◦ Ensure proper cleanup of resources, such as unsubscribing from queues or releasing handles when objects are deleted. • General Improvements: ◦ Use structured DTMF processing for menu systems, mapping digits to specific actions or departments. ◦ Adopt async context management for room lifecycle to ensure proper handling of connections and disconnections. By implementing these best practices, the code will be more robust, maintainable, and aligned with LiveKit's recommended usage patterns. Sources: Company directory phone assistant | LiveKit Docs | agent.py | sip-refer/agent.py | livekit-api/livekit/api/room_service.py | complex-agents/ivr-agent/agent.py
a
import os import sys import json from typing import AsyncIterable import yaml import httpx import logging import asyncio import requests import openai as open_ai from datetime import datetime from livekit import rtc import traceback from livekit.agents import ( AgentSession, AutoSubscribe, JobContext, JobProcess, WorkerOptions, RoomInputOptions, RoomOutputOptions, cli, llm, Agent, ) from livekit.plugins import openai, deepgram, silero, turn_detector #, google from livekit.api import LiveKitAPI, DirectFileOutput, S3Upload, TrackEgressRequest from livekit.rtc import RemoteParticipant, LocalTrackPublication, RemoteTrackPublication, Track, Participant from app.config.config import get_settings from app.core.constants import CALL_WAIT_TIME, DEFAULT_LLM_PROVIDER, SIP_MODE from app.providers.llm.llm_factory import LLMFactory from app.providers.livekit.agent_actions import AgentActions from app.utils.prompt_formatter import process_metadata from livekit.agents.llm import chat_context # from dataclasses import dataclass, field nc_available=True try: logger = logging.getLogger("voice-agent") settings = get_settings() def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load( max_buffered_speech=45.0, min_silence_duration=1.0, ) async def entrypoint(ctx: JobContext): try: reconf_parameters_text=None local_audio_track_id=None remote_audio_track_id=None recording_started=False lk_api = LiveKitAPI( url=settings.LIVEKIT_URL, api_key=settings.LIVEKIT_API_KEY, api_secret=settings.LIVEKIT_API_SECRET ) recording_file_names = [] # Store the timeout task globally or per participant sip_timeout_tasks = {} # loading the prompt from the YAML file with open(settings.CHAT_ASSISTANT_PROMPT, 'r') as file: prompt = yaml.safe_load(file) metadata_str = ctx.job.metadata metadata = json.loads(metadata_str) booking_id = metadata.get("booking") booking_details = metadata.get("bookingDetails") calling_mode = metadata.get("mode") queued_call = metadata.get("useQueue") record_session = metadata.get("record") # Process the metadata JSON (assuming metadata_json is defined) result = process_metadata(metadata) reconf_parameters_text = result["reconfirmation_parameters"] final_prompt = prompt["chat_assistant_prompt"].format(**result) # initial_ctx = chat_context.ChatContext().add_message( # role="system", # content=final_prompt # ) # creating a object OpenAI instance json_formatter = LLMFactory.get_model( provider=DEFAULT_LLM_PROVIDER, booking_id=booking_id, booking_details=booking_details, ) logger.info( f"booking details: {booking_details}") # connecting agent to the room logger.info(f"connecting to room {ctx.room.name}") await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) # Wait for the first participant to connect participant = await ctx.wait_for_participant( ) logger.info(f"starting voice assistant for participant {participant.identity}") if (calling_mode): # sending API request to web frontend to update that the user has picked up the call response = requests.post( f"{settings.WEB_CALLING_BASE_URL}/api/setCallStatus", data=json.dumps({ "roomId": ctx.room.name, "status": "connected" }), headers={"Content-Type": "application/json"}, ) logger.debug(f"Update call status to connected: {response.status_code}") # async def before_tts_callback(agent: AgentSession, text: str | AsyncIterable[str]): # if isinstance(text, str): # if text.strip() == "SEE YOU AGAIN": # logger.info("Ending the call, since the AI wants to end the call") # # llm is indicating to end the call # await AgentActions.hangup(lkapi=lk_api, agent=agent, logger=logger, participant=participant, room=ctx.room) # return text # else: # # This is an async generator since we use yield # async def process_async_text(): # text_content = "" # async for chunk in text: # text_content += chunk # yield chunk # # Check the complete text after processing # if text_content.strip() == "SEE YOU AGAIN": # logger.info("Ending the call, since the AI wants to end the call") # AgentActions.hangup(lkapi=lk_api, agent=agent, logger=logger, participant=participant, room=ctx.room) # return process_async_text() session = AgentSession( vad=ctx.proc.userdata["vad"], stt=deepgram.STT( api_key=settings.DEEPGRAM_KEY, model="nova-3-general", punctuate=True, ), llm=openai.LLM( model=settings.OPENAI_MODEL_NAME, store=False, temperature=0.1, max_completion_tokens=150, # presence_penalty=1.4, # frequency_penalty=1.2, client=open_ai.AsyncClient( api_key=settings.OPENAI_API_KEY, max_retries=0, http_client=httpx.AsyncClient( timeout=httpx.Timeout(1.8, pool=None), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ), ), # llm=google.LLM( # model=settings.GEMINI_MODEL_NAME, # api_key=settings.GEMINI_CONV_API_KEY, # temperature=0.3, # max_output_tokens=150, # presence_penalty=1.4, # frequency_penalty=1.2, # ), tts=deepgram.TTS( model="aura-2-amalthea-en", # use_streaming=FalsSSe, api_key=settings.DEEPGRAM_KEY, # container="none" ), # _chat_ctx=initial_ctx, preemptive_generation=True, # interrupt_speech_duration=0.35, min_endpointing_delay=0.05, max_endpointing_delay=2.0, # noise_cancellation= noise_cancellation.BVC() if nc_available else None, use_tts_aligned_transcript=True, # turn_detector=turn_detector.EOUModel(), # agent = AgentActions(lkapi=lk_api, logger=logger, settings=settings) ) session._chat_ctx.add_message( role="system", content=final_prompt ) await session.start(room=ctx.room,agent=Agent(instructions=final_prompt),room_input_options=RoomInputOptions(audio_enabled=True),room_output_options=RoomOutputOptions(audio_enabled=True)) @ Not able to execute the function
t
Room Connection Issues: ◦ Hey there! Let's make sure your room is set up just right. Here's a friendly tip for connecting:
Copy code
room = rtc.Room(loop=asyncio.get_event_loop())
await room.connect(url, token, options=rtc.RoomOptions(auto_subscribe=True))
◦ Keep an eye on those connections and be ready to catch any disconnections – think of it like checking in with a friend. • Agent Session Execution: ◦ Quick check: Are all your environment variables playing nice? Double-check those LiveKit URL, API key, and secret – they can be tricky sometimes! ◦ Let's make sure your participant identity is spot on. It's like making sure you're introducing yourself correctly at a party. • Error Handling and Logging: ◦ Time to play detective! Dive into those logs and look for any hidden clues about connection hiccups. ◦ Pro tip for handling those unexpected moments:
Copy code
try:
    await room.connect(url, token)
except Exception as e:
    logger.error(f"Failed to connect to room: {e}")
    raise
API and Network Configuration: ◦ Let's make sure your LiveKit API can actually say hello – check those network connections! ◦ For SIP lovers, take a moment to verify NAT traversal, codec compatibility, and SIP trunk setup. • General Debugging Tips: ◦ Library versions can be like complicated relationships – make sure they're all compatible! ◦ When you're stuck, the more details, the better. Think of it like telling a friend a story – context is key! If you're still scratching your head, don't worry! I'm here to help. Could you share a bit more about what's going on? Any error messages, room ID, or recent changes would be super helpful in solving this puzzle together. We'll get to the bottom of this! Sources: Community &amp; Support | livekit/livekit_composite | DeepWiki | Web and mobile frontends | LiveKit Docs | livekit-plugins/livekit-plugins-hedra/livekit/plugins/hedra/avatar.py | Still Have Questions | SIP Integration | livekit/livekit_composite | DeepWiki