Hello all! I'm looking at an integration issue wit...
# ask-ai
q
Hello all! I'm looking at an integration issue with LiveKit agents and dispatching agents in the same room . So the objective is to get the weather agent entering the same room than the home agent, performing its task (give the weather), and then leave the room when it is not needed anymore. Can you give me your feedback on my code? Is it properly implemented? The home agent dispatches the weather agent when certain conditions are met, which makes sense. Below is a revised solution that uses a “hand‐back” data signal to reset the main agent’s context so that once the weather agent finishes its job the main agent won’t “re‐trigger” a weather dispatch unless the user explicitly asks for weather again. In this version the home agent’s data handler for the weather agent’s done signal clears the previous conversation context and replaces it with a system message that instructs the agent to behave as a “normal” main assistant without the weather dispatch instructions. (Also note that we use a simple keyword check in determine_intent.) ## home_agent.py: import asyncio import logging from dotenv import load_dotenv from livekit import rtc from livekit.agents import ( AutoSubscribe, JobContext, JobProcess, WorkerOptions, cli, llm, ) from livekit.agents.pipeline import VoicePipelineAgent from livekit.plugins import deepgram, openai, silero load_dotenv() logger = logging.getLogger("home-agent") logger.setLevel(logging.INFO) class HomeAssistantFnc(llm.FunctionContext): def __init__(self, ctx, weather_active_event): super().__init__() self.ctx = ctx self.weather_active_event = weather_active_event @llm.ai_callable(description="Dispatch the weather agent if not active.") async def dispatch_weather_agent(self): if not self.weather_active_event.is_set(): logger.info("Dispatching weather agent via function call...") await dispatch_weather_agent(self.ctx.room.name) self.weather_active_event.set() else: logger.info("Weather agent already active, skipping dispatch.") def prewarm(proc: JobProcess): logger.info("Prewarming: Loading VAD...") proc.userdata["vad"] = silero.VAD.load() logger.info("Prewarming complete.") async def entrypoint(ctx: JobContext): logger.info("Connecting to room %s...", ctx.room.name) await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) logger.info("Connected to room: %s", ctx.room.name) participant = await ctx.wait_for_participant() logger.info("Participant '%s' joined. Starting home agent...", participant.identity) dg_model = "nova-3-general" if participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP: dg_model = "nova-2-phonecall" logger.info("Using telephone-optimized STT model.") # Create an event to track weather agent state. weather_active_event = asyncio.Event() fnc_ctx = HomeAssistantFnc(ctx, weather_active_event) # Set a system prompt that does NOT include explicit weather dispatch instructions. initial_ctx = llm.ChatContext().append( role="system", text="You are the main assistant. Answer queries normally. Only dispatch weather functions if the user explicitly asks for weather." ) agent = VoicePipelineAgent( vad=ctx.proc.userdata["vad"], stt=deepgram.STT(model=dg_model), llm=openai.LLM(), tts=openai.TTS(), chat_ctx=initial_ctx, fnc_ctx=fnc_ctx, ) agent.start(ctx.room, participant) logger.info("Home agent started.") # Audio management helper. class AudioManager: def __init__(self): self.audio_publication = None self.subscription_set = asyncio.Event() def set_publication(self, publication): self.audio_publication = publication self.subscription_set.set() async def wait_for_subscription(self): await self.subscription_set.wait() if self.audio_publication is None: raise ValueError("Audio publication not set") audio_mgr = AudioManager() @ctx.room.on("track_subscribed") def on_track_subscribed(track: rtc.Track, publication: rtc.TrackPublication, rp: rtc.RemoteParticipant): if rp == participant and track.kind == rtc.TrackKind.KIND_AUDIO and audio_mgr.audio_publication is None: audio_mgr.set_publication(publication) logger.info("Audio track subscribed: %s", track.sid) try: await asyncio.wait_for(audio_mgr.wait_for_subscription(), timeout=10.0) except asyncio.TimeoutError: logger.error("Timed out waiting for audio track subscription") return async def handle_data_received(data: rtc.DataPacket): try: await audio_mgr.wait_for_subscription() if data.data == b"weather_agent_joined": logger.info("Weather agent joined, unsubscribing from audio track") res = audio_mgr.audio_publication.set_subscribed(False) if asyncio.iscoroutine(res): await res elif data.data == b"weather_agent_done": logger.info("Weather agent done, resubscribing to audio track and resetting conversation context") res = audio_mgr.audio_publication.set_subscribed(True) if asyncio.iscoroutine(res): await res weather_active_event.clear() # Reset the chat context so that no weather trigger remains. agent.chat_ctx.messages.clear() agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.") else: logger.debug("Data received: %s", data.data) except Exception as e: logger.error("Error handling data packet: %s", e) @ctx.room.on("data_received") def on_data_received(data: rtc.DataPacket): asyncio.create_task(handle_data_received(data)) async def answer_from_text(txt: str): chat_ctx = agent.chat_ctx.copy() chat_ctx.append(role="user", text=txt) logger.info("Answering user text: '%s'", txt) stream = agent.llm.chat(chat_ctx=chat_ctx) await agent.say(stream) async def determine_intent(message: str) -> str: logger.info("Determining intent for message: '%s'", message) lower_msg = message.lower() # If message contains return keywords, force general intent. if any(kw in lower_msg for kw in ["return", "main", "back"]): intent = "general" # Otherwise, if weather agent is not active and the message mentions weather-related terms, mark intent as weather. elif not weather_active_event.is_set() and any(kw in lower_msg for kw in ["weather", "forecast"]): intent = "weather" else: intent = "general" logger.info("Determined intent: '%s'", intent) return intent async def handle_incoming_message(message: str): logger.info("Handling incoming message: '%s'", message) intent = await determine_intent(message) if intent == "weather": logger.info("Intent 'weather' detected. Initiating dispatch to weather agent.") asyncio.create_task(dispatch_weather_agent(ctx.room.name)) await agent.say("Let me bring in the weather assistant.", allow_interruptions=True) else: logger.info("Processing message as general query.") await answer_from_text(message) @ctx.room.on("message_received") def on_message_received(msg: rtc.ChatMessage): if msg.message: logger.info("Chat message received: '%s'", msg.message) asyncio.create_task(handle_incoming_message(msg.message)) await agent.say("Hey, how can I help you today?", allow_interruptions=True) logger.info("Initial greeting sent by home agent.") async def dispatch_weather_agent(room_name: str): from livekit.api import CreateAgentDispatchRequest, LiveKitAPI logger.info("Dispatching weather agent to room '%s'...", room_name) lkapi = LiveKitAPI() dispatch_request = CreateAgentDispatchRequest( agent_name="weather_agent", room=room_name, metadata="Weather inquiry dispatched by home agent", ) try: dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request) logger.info("Weather agent dispatched successfully: %s", dispatch) except Exception as e: logger.error("Failed to dispatch weather agent: %s", e) finally: await lkapi.aclose() if name == "__main__": cli.run_app( WorkerOptions( entrypoint_fnc=entrypoint, prewarm_fnc=prewarm, ), )
g
It seems likely that your approach to dispatching a weather agent in the same room as the home agent is on the right track. The use of a "hand-back" data signal to reset the main agent's context after the weather agent completes its task is a practical solution. This ensures that the main agent does not re-trigger the weather dispatch unless explicitly requested by the user. Your implementation of the
HomeAssistantFnc
class and the
dispatch_weather_agent
function appears to be correctly set up to manage the dispatching of the weather agent. The use of an
asyncio.Event
to track the state of the weather agent is a good way to manage the agent's lifecycle. Additionally, the
handle_data_received
function effectively manages the audio track subscription based on the weather agent's state, ensuring that the main agent's context is reset appropriately. If you would like more detailed feedback or have specific concerns about certain parts of the code, please let me know. Sources: - https://livekit-users.slack.com/archives/C07FY8WHGPM/p1730325243.151429 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 ok could you also check the other agent, that works in combination with the home agent? What woudld you improve? : weather_agent: import asyncio import logging from typing import Annotated import aiohttp from dotenv import load_dotenv from livekit import rtc from livekit.agents import ( AutoSubscribe, JobContext, JobProcess, WorkerOptions, cli, llm, ) from livekit.agents.pipeline import VoicePipelineAgent from livekit.plugins import deepgram, openai, silero load_dotenv() logger = logging.getLogger("weather-agent") logger.setLevel(logging.INFO) class WeatherAssistantFnc(llm.FunctionContext): def __init__(self, room, ctx): super().__init__() self.room = room self.ctx = ctx @llm.ai_callable(description="Retrieve weather info for a location.") async def get_weather( self, location: Annotated[str, llm.TypeInfo(description="The location to get the weather for")], latitude: Annotated[str, llm.TypeInfo(description="The latitude of the location")], longitude: Annotated[str, llm.TypeInfo(description="The longitude of the location")], ): logger.info("Getting weather for coordinates: %s, %s", latitude, longitude) url = f"https://api.open-meteo.com/v1/forecast?latitude={latitude}&longitude={longitude}&current=temperature_2m" async with aiohttp.ClientSession() as session: async with session.get(url) as response: if response.status == 200: data = await response.json() weather_data = { "temperature": data["current"]["temperature_2m"], "temperature_unit": "Celsius", } logger.info("Weather data: %s", weather_data) return str(weather_data) # Return as string else: raise Exception(f"Failed to get weather data, status code: {response.status}") @llm.ai_callable(description="Signal handback to main assistant and shut down.") async def signal_hand_back(self): logger.info("Signaling intent to hand back to main assistant") try: # Publish the handback signal await self.room.local_participant.publish_data(b"weather_agent_done") logger.info("Successfully published 'weather_agent_done'") # Unsubscribe from audio track if available (using the audio_publication set later) if hasattr(self, "audio_publication") and self.audio_publication is not None: res = self.audio_publication.set_subscribed(False) if asyncio.iscoroutine(res): await res logger.info("Unsubscribed from audio track") # Shutdown the weather agent job context. # Note: ctx.shutdown is not an awaitable function, so we call it without await. self.ctx.shutdown("Handing back to main assistant") logger.info("Weather agent job context shut down successfully") except Exception as e: logger.error("Error during handback: %s", e, exc_info=True) return "Handback initiated" def prewarm_process(proc: JobProcess): logger.info("Weather Agent: Prewarming - Loading VAD...") proc.userdata["vad"] = silero.VAD.load() logger.info("Weather Agent: Prewarming complete.") async def entrypoint(ctx: JobContext): logger.info("Weather Agent: Connecting to room...") await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) logger.info("Weather Agent: Connected to room: %s", ctx.room.name) participant = await ctx.wait_for_participant() logger.info("Weather Agent: Participant joined: %s", participant.identity) audio_publication = None subscription_set = asyncio.Event() @ctx.room.on("track_subscribed") def on_track_subscribed(track: rtc.Track, publication: rtc.TrackPublication, rp: rtc.RemoteParticipant): nonlocal audio_publication if rp == participant and track.kind == rtc.TrackKind.KIND_AUDIO and audio_publication is None: audio_publication = publication logger.info("Audio track subscribed: %s", track.sid) subscription_set.set() try: await asyncio.wait_for(subscription_set.wait(), timeout=10.0) except asyncio.TimeoutError: logger.error("Timed out waiting for audio track subscription") return fnc_ctx = WeatherAssistantFnc(ctx.room, ctx) # Save the audio publication so that the function context can use it during handback. fnc_ctx.audio_publication = audio_publication initial_ctx = llm.ChatContext().append( role="system", text=( "You are a weather assistant created by LiveKit. Your interface with users is voice. " "Provide accurate and concise weather information for a given location. " "After answering, ask if the user needs anything else or wants to return to the main assistant." ), ) agent = VoicePipelineAgent( vad=ctx.proc.userdata["vad"], stt=deepgram.STT(), llm=openai.LLM(model="gpt-4o-mini"), tts=openai.TTS(voice="nova"), fnc_ctx=fnc_ctx, chat_ctx=initial_ctx, ) agent.start(ctx.room, participant) logger.info("Weather Agent: Agent started successfully in room '%s'.", ctx.room.name) try: await ctx.room.local_participant.publish_data(b"weather_agent_joined") logger.info("Successfully published 'weather_agent_joined'") except Exception as e: logger.error("Failed to publish 'weather_agent_joined': %s", e) await agent.say("Hello, I'm the weather assistant. Please tell me the location you'd like the weather for.", allow_interruptions=True) if name == "__main__": cli.run_app( WorkerOptions( entrypoint_fnc=entrypoint, prewarm_fnc=prewarm_process, agent_name="weather_agent", ), ) (editado)
g
It seems likely that your
weather_agent
implementation is well-structured for its intended purpose. Here are a few observations and potential improvements: 1. Error Handling: Your code already includes error handling for network requests and data publishing, which is good. Consider adding more specific error messages or retry logic for network requests to improve robustness. 2. Logging: The use of logging is consistent, which is helpful for debugging. Ensure that sensitive information is not logged, especially if this code is used in a production environment. 3. Function Context: The
WeatherAssistantFnc
class effectively manages the weather retrieval and handback signaling. Ensure that the
get_weather
function handles edge cases, such as invalid or missing data from the API. 4. Data Publishing: The use of data packets to signal the main agent is a good approach. Ensure that the main agent is set up to handle these signals correctly, as you have done in the
home_agent
. 5. VoicePipelineAgent: The setup of the
VoicePipelineAgent
with VAD, STT, LLM, and TTS is appropriate. Ensure that the models and voices used are optimal for your use case. 6. Async Operations: The use of
asyncio
for asynchronous operations is appropriate. Ensure that all asynchronous tasks are awaited properly to avoid potential issues with task completion. If you have specific concerns or areas you'd like more feedback on, please let me know. Sources: - https://livekit-users.slack.com/archives/C07FY8WHGPM/p1729242384.877639 - https://docs.livekit.io/agents/voice-agent/function-calling/ Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 do you have any example for using the dispatch approach and having agents entering and exiting the room on multiple occasions?
g
It seems likely that using the dispatch approach to have agents enter and exit a room multiple times can be managed through explicit agent dispatching. This involves creating and deleting agent dispatches as needed. Here's a basic outline of how you might implement this: 1. Explicit Dispatch: Use the
AgentDispatchService
to explicitly dispatch agents to a room. This allows you to control when agents join and leave the room. 2. Create Dispatch: When you need an agent to join a room, create a dispatch using the
create_dispatch
method. This method requires specifying the agent's name and the room name. 3. Delete Dispatch: To remove an agent from a room, use the
delete_dispatch
method. This will terminate the agent's session in the room. Here's a simplified example in Python:
Copy code
from livekit import api

async def manage_agent_dispatch(room_name, agent_name):
    lkapi = api.LiveKitAPI()

    # Create a dispatch to add the agent to the room
    dispatch_request = api.CreateAgentDispatchRequest(agent_name=agent_name, room=room_name)
    dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
    print(f"Agent {agent_name} dispatched to room {room_name}")

    # Perform tasks with the agent in the room...

    # Delete the dispatch to remove the agent from the room
    await lkapi.agent_dispatch.delete_dispatch(dispatch_id=dispatch.id, room_name=room_name)
    print(f"Agent {agent_name} removed from room {room_name}")

    await lkapi.aclose()

# Example usage
# asyncio.run(manage_agent_dispatch("my-room", "weather_agent"))
This example demonstrates how to dispatch an agent to a room and then remove it after completing its task. If you need more detailed guidance or have specific questions, please let me know. Sources: - https://livekit-users.slack.com/archives/C07FY8WHGPM/p1732564590.153089 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 t this really scalable? can i have multiple agents entering and going out of the room when they finish their tasks?
g
Yes, LiveKit's architecture supports scalability, allowing multiple agents to enter and exit rooms as needed. The system is designed to handle a large number of concurrent connections and agent dispatches efficiently. Here are some key points to consider for scalability: 1. Agent Dispatching: LiveKit's dispatch system is optimized for low latency and high concurrency, supporting hundreds of thousands of new connections per second. Agents can be dispatched to rooms dynamically based on demand, and you can manage their lifecycle using the
AgentDispatchService
source. 2. Load Balancing: LiveKit automatically distributes load across available workers, ensuring that no single worker is overwhelmed. This is managed through a job distribution system embedded within LiveKit servers, which routes jobs to workers based on availability and load source. 3. Autoscaling: For environments with fluctuating loads, you can configure autoscaling to dynamically adjust the number of workers based on demand. This ensures that resources are efficiently utilized and that the system can handle peak loads without degradation in performance source. If you need more detailed guidance on implementing a scalable solution with LiveKit, please let me know. Sources: - Dispatching agents - Deployment and scaling 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 So i made some modifications. Below is the updated home agent code that now handles two types of queries. It dispatches the weather agent for weather‑related queries and the bakery agent for queries about croissants (or related bakery topics). When a dispatch signal (either
weather_agent_done
or
bakery_agent_done
) is received, it deletes the corresponding dispatch and resets its “active” flag and conversation context. In your opinion is it the best approach? : import asyncio import logging from dotenv import load_dotenv from livekit import rtc from livekit.agents import ( AutoSubscribe, JobContext, JobProcess, WorkerOptions, cli, llm, ) from livekit.agents.pipeline import VoicePipelineAgent from livekit.plugins import deepgram, openai, silero load_dotenv() logger = logging.getLogger("home-agent") logger.setLevel(logging.INFO) class HomeAssistantFnc(llm.FunctionContext): def __init__(self, ctx, weather_active_event, bakery_active_event): super().__init__() self.ctx = ctx self.weather_active_event = weather_active_event self.bakery_active_event = bakery_active_event @llm.ai_callable(description="Dispatch the weather agent if not active.") async def dispatch_weather_agent(self): if not self.weather_active_event.is_set(): logger.info("Dispatching weather agent via function call...") await dispatch_weather_agent(self.ctx.room.name) self.weather_active_event.set() else: logger.info("Weather agent already active, skipping dispatch.") @llm.ai_callable(description="Dispatch the bakery agent if not active.") async def dispatch_bakery_agent(self): if not self.bakery_active_event.is_set(): logger.info("Dispatching bakery agent via function call...") await dispatch_bakery_agent(self.ctx.room.name) self.bakery_active_event.set() else: logger.info("Bakery agent already active, skipping dispatch.") def prewarm(proc: JobProcess): logger.info("Prewarming: Loading VAD...") proc.userdata["vad"] = silero.VAD.load() logger.info("Prewarming complete.") async def entrypoint(ctx: JobContext): logger.info("Connecting to room %s...", ctx.room.name) await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) logger.info("Connected to room: %s", ctx.room.name) participant = await ctx.wait_for_participant() logger.info("Participant '%s' joined. Starting home agent...", participant.identity) dg_model = "nova-3-general" if participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP: dg_model = "nova-2-phonecall" logger.info("Using telephone-optimized STT model.") # Create events to track agent states. weather_active_event = asyncio.Event() bakery_active_event = asyncio.Event() fnc_ctx = HomeAssistantFnc(ctx, weather_active_event, bakery_active_event) initial_ctx = llm.ChatContext().append( role="system", text=( "You are the main assistant. Answer queries normally. " "Dispatch the weather assistant if the user asks about weather, " "and dispatch the bakery assistant if the user asks about croissants or bakery advice." ), ) agent = VoicePipelineAgent( vad=ctx.proc.userdata["vad"], stt=deepgram.STT(model=dg_model), llm=openai.LLM(), tts=openai.TTS(), chat_ctx=initial_ctx, fnc_ctx=fnc_ctx, ) agent.start(ctx.room, participant) logger.info("Home agent started.") # Audio management helper. class AudioManager: def __init__(self): self.audio_publication = None self.subscription_set = asyncio.Event() def set_publication(self, publication): self.audio_publication = publication self.subscription_set.set() async def wait_for_subscription(self): await self.subscription_set.wait() if self.audio_publication is None: logger.error("Audio publication is None after subscription event") raise ValueError("Audio publication not set") audio_mgr = AudioManager() @ctx.room.on("track_subscribed") def on_track_subscribed(track: rtc.Track, publication: rtc.TrackPublication, rp: rtc.RemoteParticipant): if rp == participant and track.kind == rtc.TrackKind.KIND_AUDIO and audio_mgr.audio_publication is None: audio_mgr.set_publication(publication) logger.info("Audio track subscribed: %s", track.sid) try: await asyncio.wait_for(audio_mgr.wait_for_subscription(), timeout=10.0) except asyncio.TimeoutError: logger.error("Timed out waiting for audio track subscription") return async def handle_data_received(data: rtc.DataPacket): try: await audio_mgr.wait_for_subscription() if data.data == b"weather_agent_joined": logger.info("Weather agent joined, unsubscribing from audio track") res = audio_mgr.audio_publication.set_subscribed(False) if asyncio.iscoroutine(res): await res elif data.data == b"weather_agent_done": logger.info("Weather agent done signal received. Deleting weather dispatch and resubscribing.") from livekit import api lkapi = api.LiveKitAPI() try: dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name) for d in dispatches: if d.agent_name == "weather_agent": await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name) logger.info("Deleted weather agent dispatch: %s", d.id) break except Exception as e: logger.error("Failed to delete weather agent dispatch: %s", e) finally: await lkapi.aclose() res = audio_mgr.audio_publication.set_subscribed(True) if asyncio.iscoroutine(res): await res weather_active_event.clear() agent.chat_ctx.messages.clear() agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.") elif data.data == b"bakery_agent_joined": logger.info("Bakery agent joined, unsubscribing from audio track") res = audio_mgr.audio_publication.set_subscribed(False) if asyncio.iscoroutine(res): await res elif data.data == b"bakery_agent_done": logger.info("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.") from livekit import api lkapi = api.LiveKitAPI() try: dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name) for d in dispatches: if d.agent_name == "bakery_agent": await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name) logger.info("Deleted bakery agent dispatch: %s", d.id) break except Exception as e: logger.error("Failed to delete bakery agent dispatch: %s", e) finally: await lkapi.aclose() res = audio_mgr.audio_publication.set_subscribed(True) if asyncio.iscoroutine(res): await res bakery_active_event.clear() agent.chat_ctx.messages.clear() agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.") else: logger.debug("Data received: %s", data.data) except Exception as e: logger.error("Error handling data packet: %s", e) @ctx.room.on("data_received") def on_data_received(data: rtc.DataPacket): asyncio.create_task(handle_data_received(data)) async def answer_from_text(txt: str): chat_ctx = agent.chat_ctx.copy() chat_ctx.append(role="user", text=txt) logger.info("Answering user text: '%s'", txt) stream = agent.llm.chat(chat_ctx=chat_ctx) await agent.say(stream) async def determine_intent(message: str) -> str: logger.info("Determining intent for message: '%s'", message) lower_msg = message.lower() if any(kw in lower_msg for kw in ["return", "main", "back"]): intent = "general" elif not weather_active_event.is_set() and any(kw in lower_msg for kw in ["weather", "forecast"]): intent = "weather" elif not bakery_active_event.is_set() and any(kw in lower_msg for kw in ["croissant", "bakery", "pastry", "bread"]): intent = "bakery" else: intent = "general" logger.info("Determined intent: '%s'", intent) return intent async def handle_incoming_message(message: str): logger.info("Handling incoming message: '%s'", message) intent = await determine_intent(message) if intent == "weather": logger.info("Intent 'weather' detected. Initiating dispatch to weather agent.") asyncio.create_task(dispatch_weather_agent(ctx.room.name)) await agent.say("Let me bring in the weather assistant.", allow_interruptions=True) elif intent == "bakery": logger.info("Intent 'bakery' detected. Initiating dispatch to bakery agent.") asyncio.create_task(dispatch_bakery_agent(ctx.room.name)) await agent.say("Let me bring in the bakery assistant.", allow_interruptions=True) else: logger.info("Processing message as general query.") await answer_from_text(message) @ctx.room.on("message_received") def on_message_received(msg: rtc.ChatMessage): if msg.message: logger.info("Chat message received: '%s'", msg.message) asyncio.create_task(handle_incoming_message(msg.message)) await agent.say("Hey, how can I help you today?", allow_interruptions=True) logger.info("Initial greeting sent by home agent.") async def dispatch_weather_agent(room_name: str): from livekit import api logger.info("Dispatching weather agent to room '%s'...", room_name) lkapi = api.LiveKitAPI() dispatch_request = api.CreateAgentDispatchRequest( agent_name="weather_agent", room=room_name, metadata="Weather inquiry dispatched by home agent", ) try: dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request) logger.info("Weather agent dispatched successfully: %s", dispatch) except Exception as e: logger.error("Failed to dispatch weather agent: %s", e) finally: await lkapi.aclose() async def dispatch_bakery_agent(room_name: str): from livekit import api logger.info("Dispatching bakery agent to room '%s'...", room_name) lkapi = api.LiveKitAPI() dispatch_request = api.CreateAgentDispatchRequest( agent_name="bakery_agent", room=room_name, metadata="Bakery inquiry dispatched by home agent", ) try: dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request) logger.info("Bakery agent dispatched successfully: %s", dispatch) except Exception as e: logger.error("Failed to dispatch bakery agent: %s", e) finally: await lkapi.aclose() if name == "__main__": cli.run_app( WorkerOptions( entrypoint_fnc=entrypoint, prewarm_fnc=prewarm, ), )
g
Your approach to handling multiple types of queries by dispatching different agents (weather and bakery) and managing their lifecycle through dispatch signals is well-structured. Here are some observations and suggestions for improvement: 1. Event Management: Using
asyncio.Event
to track the active state of each agent is a good approach. It ensures that agents are not dispatched multiple times unnecessarily. 2. Dispatch and Cleanup: The logic for creating and deleting dispatches is sound. Ensure that the
delete_dispatch
method is called correctly to clean up after an agent completes its task. This helps in managing resources efficiently. 3. Error Handling: You have implemented error handling for network requests and dispatch operations, which is crucial for robustness. Consider adding retries for network operations to handle transient failures. 4. Logging: Your use of logging is consistent and informative. Ensure that sensitive information is not logged, especially in production environments. 5. Concurrency: The use of
asyncio.create_task
for handling incoming messages and dispatching agents is appropriate. Ensure that all asynchronous operations are awaited properly to avoid potential issues with task completion. 6. Code Organization: Consider organizing the code into separate modules or classes for better maintainability, especially as the complexity of the application grows. 7. Scalability: The approach should scale well with the number of agents and queries, given LiveKit's architecture. Ensure that the system is tested under load to verify performance and resource utilization. 8. Intent Determination: The
determine_intent
function is straightforward and effective. Consider expanding the list of keywords or using a more sophisticated natural language processing approach if needed. Overall, your implementation is well thought out and should work effectively for the use case described. If you have specific concerns or areas you'd like more feedback on, please let me know. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 the issue that i have now is that the main agent keeps talking while the bakery agent talks too, after the dispatch. how can i make sure that the main again stops talking while the agent that just entered in the rooms talks? # Create active events for secondary agents. active_events = { "weather": asyncio.Event(), "bakery": asyncio.Event(), } fnc_ctx = HomeAssistantFnc(ctx, active_events) initial_ctx = llm.ChatContext().append( role="system", text=( "You are the main assistant. Answer queries normally. " "Dispatch the weather assistant for weather inquiries and the bakery assistant for bakery advice." ) ) agent = VoicePipelineAgent( vad=ctx.proc.userdata["vad"], stt=deepgram.STT(model=dg_model), llm=openai.LLM(), tts=openai.TTS(), chat_ctx=initial_ctx, fnc_ctx=fnc_ctx, ) agent.start(ctx.room, participant) logger.info("Home agent started.") # ---------------- Audio Management ---------------- class AudioManager: def __init__(self): self.audio_publication = None self.subscription_set = asyncio.Event() def set_publication(self, publication): self.audio_publication = publication self.subscription_set.set() async def wait_for_subscription(self): await self.subscription_set.wait() if self.audio_publication is None: logger.error("Audio publication is None after subscription event") raise ValueError("Audio publication not set") audio_mgr = AudioManager() @ctx.room.on("track_subscribed") def on_track_subscribed(track: rtc.Track, publication: rtc.TrackPublication, rp: rtc.RemoteParticipant): if rp == participant and track.kind == rtc.TrackKind.KIND_AUDIO and audio_mgr.audio_publication is None: audio_mgr.set_publication(publication) logger.info("Audio track subscribed: %s", track.sid) try: await asyncio.wait_for(audio_mgr.wait_for_subscription(), timeout=10.0) except asyncio.TimeoutError: logger.error("Timed out waiting for audio track subscription") return # ---------------- Data Packet Handling ---------------- async def handle_data_received(data: rtc.DataPacket): try: await audio_mgr.wait_for_subscription() if data.data == b"weather_agent_done": logger.info("Weather agent done signal received. Deleting weather dispatch and resubscribing.") from livekit import api lkapi = api.LiveKitAPI() try: dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name) for d in dispatches: if d.agent_name == "weather_agent": await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name) logger.info("Deleted weather agent dispatch: %s", d.id) break except Exception as e: logger.error("Failed to delete weather agent dispatch: %s", e) finally: await lkapi.aclose() res = audio_mgr.audio_publication.set_subscribed(True) if asyncio.iscoroutine(res): await res active_events["weather"].clear() agent.chat_ctx.messages.clear() agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.") elif data.data == b"bakery_agent_done": logger.info("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.") from livekit import api lkapi = api.LiveKitAPI() try: dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name) for d in dispatches: if d.agent_name == "bakery_agent": await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name) logger.info("Deleted bakery agent dispatch: %s", d.id) break except Exception as e: logger.error("Failed to delete bakery agent dispatch: %s", e) finally: await lkapi.aclose() res = audio_mgr.audio_publication.set_subscribed(True) if asyncio.iscoroutine(res): await res active_events["bakery"].clear() agent.chat_ctx.messages.clear() agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.") else: logger.debug("Data received: %s", data.data) except Exception as e: logger.error("Error handling data packet: %s", e) @ctx.room.on("data_received") def on_data_received(data: rtc.DataPacket): asyncio.create_task(handle_data_received(data)) # ---------------- Intent Detection via NLP ---------------- async def determine_intent(message: str) -> str: prompt = ( "Classify the following user message into one of these categories: weather, bakery, or general. " "Return only one word. Message: \"" + message + "\"" ) response = await openai.LLM().chat( chat_ctx=llm.ChatContext().append(role="system", text=prompt) ) intent = response.strip().lower() if intent not in {"weather", "bakery", "general"}: intent = "general" logger.info("Determined intent via NLP: '%s'", intent) return intent # ---------------- Handling Incoming Messages ---------------- async def handle_incoming_message(message: str): logger.info("Handling incoming message: '%s'", message) intent = await determine_intent(message) # For secondary intents, dispatch the corresponding agent and return immediately. if intent == "weather": if not active_events["weather"].is_set(): logger.info("Intent 'weather' detected. Dispatching weather agent.") await agent.say("Let me bring in the weather assistant.", allow_interruptions=True) asyncio.create_task(dispatch_weather_agent(ctx.room.name)) else: logger.info("Weather agent already active.") return # Do not answer further. elif intent == "bakery": if not active_events["bakery"].is_set(): logger.info("Intent 'bakery' detected. Dispatching bakery agent.") await agent.say("Let me bring in the bakery assistant.", allow_interruptions=True) asyncio.create_task(dispatch_bakery_agent(ctx.room.name)) else: logger.info("Bakery agent already active.") return # Do not answer further. else: logger.info("Processing message as general query.") await answer_from_text(message) async def answer_from_text(txt: str): chat_ctx = agent.chat_ctx.copy() chat_ctx.append(role="user", text=txt) logger.info("Answering user text: '%s'", txt) stream = agent.llm.chat(chat_ctx=chat_ctx) await agent.say(stream) @ctx.room.on("message_received") def on_message_received(msg: rtc.ChatMessage): if msg.message: logger.info("Chat message received: '%s'", msg.message) asyncio.create_task(handle_incoming_message(msg.message)) await agent.say("Hey, how can I help you today?", allow_interruptions=True) logger.info("Initial greeting sent by home agent.") # ---------------- Dispatch Functions for Home Agent ---------------- async def dispatch_weather_agent(room_name: str): from livekit import api logger.info("Dispatching weather agent to room '%s'...", room_name) lkapi = api.LiveKitAPI() dispatch_request = api.CreateAgentDispatchRequest( agent_name="weather_agent", room=room_name, metadata="Weather inquiry dispatched by home agent", ) try: dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request) logger.info("Weather agent dispatched successfully: %s", dispatch) except Exception as e: logger.error("Failed to dispatch weather agent: %s", e) finally: await lkapi.aclose() async def dispatch_bakery_agent(room_name: str): from livekit import api logger.info("Dispatching bakery agent to room '%s'...", room_name) lkapi = api.LiveKitAPI() dispatch_request = api.CreateAgentDispatchRequest( agent_name="bakery_agent", room=room_name, metadata="Bakery inquiry dispatched by home agent", ) try: dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request) logger.info("Bakery agent dispatched successfully: %s", dispatch) except Exception as e: logger.error("Failed to dispatch bakery agent: %s", e) finally: await lkapi.aclose()
g
To ensure that the main agent stops talking when a secondary agent (like the bakery or weather agent) is dispatched and starts speaking, you can implement a mechanism to pause or interrupt the main agent's speech. Here are some suggestions to achieve this: 1. Interrupt Main Agent Speech: When dispatching a secondary agent, you can interrupt the main agent's speech by using the
allow_interruptions
parameter in the
say
method. This will stop the main agent from speaking when the secondary agent starts. 2. Manage Audio Tracks: Use the
AudioManager
to manage audio subscriptions. When a secondary agent is dispatched, unsubscribe the main agent from the audio track to prevent it from speaking. 3. Use Events to Control Speech: Utilize the
active_events
dictionary to control when the main agent should be silent. If a secondary agent is active, prevent the main agent from speaking. Here's how you can modify your code to implement these suggestions:
Copy code
# Modify the handle_incoming_message function
async def handle_incoming_message(message: str):
    <http://logger.info|logger.info>("Handling incoming message: '%s'", message)
    intent = await determine_intent(message)
    if intent == "weather":
        if not active_events["weather"].is_set():
            <http://logger.info|logger.info>("Intent 'weather' detected. Dispatching weather agent.")
            await agent.say("Let me bring in the weather assistant.", allow_interruptions=True)
            active_events["weather"].set()  # Set the event to indicate the weather agent is active
            asyncio.create_task(dispatch_weather_agent(ctx.room.name))
        else:
            <http://logger.info|logger.info>("Weather agent already active.")
        return  # Do not answer further.
    elif intent == "bakery":
        if not active_events["bakery"].is_set():
            <http://logger.info|logger.info>("Intent 'bakery' detected. Dispatching bakery agent.")
            await agent.say("Let me bring in the bakery assistant.", allow_interruptions=True)
            active_events["bakery"].set()  # Set the event to indicate the bakery agent is active
            asyncio.create_task(dispatch_bakery_agent(ctx.room.name))
        else:
            <http://logger.info|logger.info>("Bakery agent already active.")
        return  # Do not answer further.
    else:
        <http://logger.info|logger.info>("Processing message as general query.")
        await answer_from_text(message)

# Modify the handle_data_received function to clear events
async def handle_data_received(data: rtc.DataPacket):
    try:
        await audio_mgr.wait_for_subscription()
        if data.data == b"weather_agent_done":
            <http://logger.info|logger.info>("Weather agent done signal received. Deleting weather dispatch and resubscribing.")
            # Delete dispatch and clear event
            await delete_dispatch("weather_agent", ctx.room.name)
            active_events["weather"].clear()
            agent.chat_ctx.messages.clear()
            agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
        elif data.data == b"bakery_agent_done":
            <http://logger.info|logger.info>("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.")
            # Delete dispatch and clear event
            await delete_dispatch("bakery_agent", ctx.room.name)
            active_events["bakery"].clear()
            agent.chat_ctx.messages.clear()
            agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
        else:
            logger.debug("Data received: %s", data.data)
    except Exception as e:
        logger.error("Error handling data packet: %s", e)

# Helper function to delete dispatch
async def delete_dispatch(agent_name: str, room_name: str):
    from livekit import api
    lkapi = api.LiveKitAPI()
    try:
        dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
        for d in dispatches:
            if d.agent_name == agent_name:
Copy code
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=room_name)
                <http://logger.info|logger.info>("Deleted %s agent dispatch: %s", agent_name, d.id)
                break
    except Exception as e:
        logger.error("Failed to delete %s agent dispatch: %s", agent_name, e)
    finally:
        await lkapi.aclose()
This approach ensures that the main agent stops speaking when a secondary agent is active, and resumes normal operation once the secondary agent completes its task. Sources: - https://docs.livekit.io/agents/voice-agent/voice-pipeline/#Interruption-handling - https://livekit-users.slack.com/archives/C07FY8WHGPM/p1729822736.610669 Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 no luck, both agents talk at the smae time in logs i get: 02-23 195219,276 - INFO livekit.agents.pipeline - max function calls nested depth reached, dropping function context. increase max_nested_fnc_calls to enable additional nesting. {"speech_id": "3267deedb0e9", "fnc_nested_depth": 0}
g
It seems likely that the issue you're encountering is related to the
max_nested_fnc_calls
parameter, which limits the number of nested function calls that can be made by the agent. When this limit is reached, the function context is dropped, which might be causing both agents to speak simultaneously. To address this, you can increase the
max_nested_fnc_calls
parameter in the
VoicePipelineAgent
configuration. This will allow more nested function calls, which might help in managing the dispatch and speech of multiple agents more effectively. Here's how you can modify the
VoicePipelineAgent
initialization to increase the `max_nested_fnc_calls`:
Copy code
agent = VoicePipelineAgent(
    vad=ctx.proc.userdata["vad"],
    stt=deepgram.STT(model=dg_model),
    llm=openai.LLM(),
    tts=openai.TTS(),
    chat_ctx=initial_ctx,
    fnc_ctx=fnc_ctx,
    max_nested_fnc_calls=2,  # Increase this value as needed
)
By increasing the
max_nested_fnc_calls
, you allow the agent to handle more complex interactions that involve multiple nested function calls. Adjust the value based on your specific use case and test to see if it resolves the issue. If you need further assistance or have specific questions, please let me know. Sources: - https://livekit-users.slack.com/archives/C07FY8WHGPM/p1729822736.610669 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 The logic for having an agent enter the room, complete its task, and then exit to return control to the main agent is working as expected. However, when it's the main agent's turn to speak again, it remains silent until the user says something. Only then does the home agent start responding. How can I ensure that the home agent automatically resumes speaking as soon as the bakery agent has finished its turn? Here is the logic I’m using—an isolated version of the bakery transfer logic extracted from the home: it is an isolated version of the bakery transfer logic extracted from the home‐agent code. This snippet includes only the parts that handle: 1. Dispatching the Bakery Agent 2. When an intent is determined to be “bakery,” the main assistant calls a function (via an LLM callable function) that dispatches the bakery agent to the room. This is done by creating a dispatch request via the LiveKit API. 3. Handling Bakery Agent Signals 4. Once the bakery agent has joined or finished its task, it publishes signals (via data packets) back to the room. The main assistant’s logic listens for these signals. When it receives a “bakery_agent_joined” message, it unsubscribes its own audio track (to avoid overlapping speech). When it receives a “bakery_agent_done” message, it: ◦ Deletes the bakery agent’s dispatch (removing the agent from the room) ◦ Resubscribes the main agent’s audio track ◦ Clears the “bakery active” event so that future bakery queries can be dispatched ◦ Optionally resets the conversation context Below is the code snippet (with inline comments) that isolates and explains these two parts:
Copy code
python


# --- Bakery Agent Transfer Logic ---

# 1. Dispatching the Bakery Agent (inside the HomeAssistantFnc function)
class HomeAssistantFnc(llm.FunctionContext):
    def __init__(self, ctx, weather_active_event, bakery_active_event):
        super().__init__()
        self.ctx = ctx
        self.weather_active_event = weather_active_event
        self.bakery_active_event = bakery_active_event

    @llm.ai_callable(description="Dispatch the bakery agent if not active.")
    async def dispatch_bakery_agent(self) -> str:
        if not self.bakery_active_event.is_set():
            <http://logger.info|logger.info>("Dispatching bakery agent via function call...")
            await dispatch_bakery_agent(self.ctx.room.name)
            self.bakery_active_event.set()
            return "Bakery agent dispatched"
        else:
            <http://logger.info|logger.info>("Bakery agent already active, skipping dispatch.")
            return "Bakery agent already active"


# 2. Handling Bakery Agent Signals (inside the data_received handler)
# This logic listens for signals sent from secondary agents.
async def handle_data_received(data: rtc.DataPacket):
    try:
        await audio_mgr.wait_for_subscription()
        if data.data == b"bakery_agent_joined":
            # When the bakery agent joins, we unsubscribe the main agent from the audio track.
            <http://logger.info|logger.info>("Bakery agent joined, unsubscribing from audio track")
            res = audio_mgr.audio_publication.set_subscribed(False)
            if asyncio.iscoroutine(res):
                await res

        elif data.data == b"bakery_agent_done":
            # When the bakery agent signals that it is done,
            # we delete its dispatch (thus removing it from the room),
            # resubscribe the main agent's audio, and clear the active flag.
            <http://logger.info|logger.info>("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.")
            from livekit import api
            lkapi = api.LiveKitAPI()
            try:
                dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
                for d in dispatches:
                    if d.agent_name == "bakery_agent":
                        await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
                        <http://logger.info|logger.info>("Deleted bakery agent dispatch: %s", d.id)
                        break
            except Exception as e:
                logger.error("Failed to delete bakery agent dispatch: %s", e)
            finally:
                await lkapi.aclose()

            res = audio_mgr.audio_publication.set_subscribed(True)
            if asyncio.iscoroutine(res):
                await res
            bakery_active_event.clear()
            # Optionally, reset the conversation context so the main assistant starts fresh.
            agent.chat_ctx.messages.clear()
            agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
        else:
            logger.debug("Data received: %s", data.data)
    except Exception as e:
        logger.error("Error handling data packet: %s", e)


# Helper function to dispatch the bakery agent.
async def dispatch_bakery_agent(room_name: str):
    from livekit import api
    <http://logger.info|logger.info>("Dispatching bakery agent to room '%s'...", room_name)
    lkapi = api.LiveKitAPI()
    dispatch_request = api.CreateAgentDispatchRequest(
        agent_name="bakery_agent",
        room=room_name,
        metadata="Bakery inquiry dispatched by home agent",
    )
    try:
        dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
        <http://logger.info|logger.info>("Bakery agent dispatched successfully: %s", dispatch)
    except Exception as e:
        logger.error("Failed to dispatch bakery agent: %s", e)
    finally:
        await lkapi.aclose()


# In the main home-agent code, the handle_data_received function is registered with:
@ctx.room.on("data_received")
def on_data_received(data: rtc.DataPacket):
    asyncio.create_task(handle_data_received(data))


# Additionally, the dispatch function is invoked when an incoming message is determined (via NLP-based intent detection)
# to have the "bakery" intent:
async def determine_intent(message: str) -> str:
    # [Other logic may remain here...]
    # For example, you might call an LLM with a prompt to classify the intent.
    # For now, assume it returns one of "weather", "bakery", or "general".
    # (This portion remains unchanged from your earlier implementation.)
    ...

async def handle_incoming_message(message: str):
    <http://logger.info|logger.info>("Handling incoming message: '%s'", message)
    intent = await determine_intent(message)
    if intent == "bakery":
        <http://logger.info|logger.info>("Intent 'bakery' detected. Initiating dispatch to bakery agent.")
        asyncio.create_task(dispatch_bakery_agent(ctx.room.name))
        await agent.say("Let me bring in the bakery assistant.", allow_interruptions=True)
    elif intent == "weather":
        # Weather dispatch logic (omitted here for brevity)
        ...
    else:
        <http://logger.info|logger.info>("Processing message as general query.")
    await answer_from_text(message)
Explanation 1. Dispatching the Bakery Agent 2. In the
HomeAssistantFnc
class, the
dispatch_bakery_agent
method checks an event (
bakery_active_event
) to see if the bakery agent is already active. If it isn’t, it calls the helper function
dispatch_bakery_agent
with the room name to create a dispatch via the LiveKit API. It then sets the event to mark the bakery agent as active. 3. Handling Data Signals 4. The
handle_data_received
asynchronous function listens for data packets received in the room. ◦ When it receives a packet with the content
b"bakery_agent_joined"
, it unsubscribes the main agent from the audio track (to let the bakery agent speak without interference). ◦ When it receives a packet with the content
b"bakery_agent_done"
, it calls a helper function that deletes the dispatch for the bakery agent (using the LiveKit API) to remove it from the room. Then it resubscribes the audio track, clears the
bakery_active_event
, and resets the conversation context so that the main assistant can resume answering normally. 5. Integration with Intent Detection 6. In the
handle_incoming_message
function, when the detected intent is
"bakery"
, the main agent dispatches the bakery agent (if not already active) and then tells the user that it is bringing in the bakery assistant.
g
To ensure that the main agent automatically resumes speaking as soon as the bakery agent has finished its task, you can implement a mechanism to trigger the main agent to speak immediately after the bakery agent's task is completed. Here are some suggestions to achieve this: 1. Trigger Main Agent Speech: After the bakery agent signals that it is done, you can immediately trigger the main agent to say something, such as a follow-up question or a prompt to the user. This can be done by calling the
agent.say()
method with a predefined message. 2. Use a Callback or Event: You can use an event or a callback function to handle the transition from the bakery agent back to the main agent. This ensures that the main agent resumes control and starts speaking as soon as the bakery agent finishes. Here's how you can modify your code to implement these suggestions:
Copy code
# Modify the handle_data_received function to trigger main agent speech
async def handle_data_received(data: rtc.DataPacket):
    try:
        await audio_mgr.wait_for_subscription()
        if data.data == b"bakery_agent_joined":
            logger.info("Bakery agent joined, unsubscribing from audio track")
            res = audio_mgr.audio_publication.set_subscribed(False)
            if asyncio.iscoroutine(res):
                await res

        elif data.data == b"bakery_agent_done":
            logger.info("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.")
            from livekit import api
            lkapi = api.LiveKitAPI()
            try:
                dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
                for d in dispatches:
                    if d.agent_name == "bakery_agent":
                        await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
                        logger.info("Deleted bakery agent dispatch: %s", d.id)
                        break
            except Exception as e:
                logger.error("Failed to delete bakery agent dispatch: %s", e)
            finally:
                await lkapi.aclose()

            res = audio_mgr.audio_publication.set_subscribed(True)
            if asyncio.iscoroutine(res):
                await res
            bakery_active_event.clear()
            agent.chat_ctx.messages.clear()
            agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")

            # Trigger the main agent to speak immediately after the bakery agent is done
            await agent.say("The bakery assistant has completed its task. How else can I assist you today?", allow_interruptions=True)

        else:
            logger.debug("Data received: %s", data.data)
    except Exception as e:
        logger.error("Error handling data packet: %s", e)
In this modification, after the bakery agent signals that it is done, the main agent is triggered to speak immediately with a follow-up message. This ensures that the main agent resumes control and engages with the user without waiting for the user to say something first. If you have specific concerns or need further assistance, please let me know. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 i will have several agents, so maybe it sill be easier to use 1 after each agent signal that is is done? 1. Trigger Main Agent Speech: After the bakery agent signals that it is done, you can immediately trigger the main agent to say something, such as a follow-up question or a prompt to the user. This can be done by calling the
agent.say()
method with a predefined message.
g
To efficiently manage multiple agents and ensure the main agent resumes speaking after any secondary agent completes its task, you can implement a unified handler for agent completion signals. This approach will streamline the process and reduce redundancy in your code. Here's how you can achieve this: 1. Unified Signal Handling: Create a single function to handle the completion signals from all secondary agents. This function will manage the resubscription of the main agent's audio track, clear the active event, and trigger the main agent to speak. 2. Dynamic Message Handling: Use a dynamic message or a predefined prompt for the main agent to say after any secondary agent completes its task. This can be customized based on the agent that just finished. Here's an example implementation:
Copy code
# Unified function to handle agent completion signals
async def handle_agent_done(agent_name: str, event: asyncio.Event):
    logger.info(f"{agent_name.capitalize()} agent done signal received. Deleting {agent_name} dispatch and resubscribing.")
    from livekit import api
    lkapi = api.LiveKitAPI()
    try:
        dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
        for d in dispatches:
            if d.agent_name == agent_name:
                await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
                logger.info(f"Deleted {agent_name} agent dispatch: {d.id}")
                break
    except Exception as e:
        logger.error(f"Failed to delete {agent_name} agent dispatch: {e}")
    finally:
        await lkapi.aclose()

    res = audio_mgr.audio_publication.set_subscribed(True)
    if asyncio.iscoroutine(res):
        await res
    event.clear()
    agent.chat_ctx.messages.clear()
    agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")

    # Trigger the main agent to speak
    await agent.say(f"The {agent_name} assistant has completed its task. How else can I assist you today?", allow_interruptions=True)

# Modify the handle_data_received function to use the unified handler
async def handle_data_received(data: rtc.DataPacket):
    try:
        await audio_mgr.wait_for_subscription()
        if data.data == b"bakery_agent_joined":
            logger.info("Bakery agent joined, unsubscribing from audio track")
            res = audio_mgr.audio_publication.set_subscribed(False)
            if asyncio.iscoroutine(res):
                await res

        elif data.data == b"bakery_agent_done":
            await handle_agent_done("bakery", bakery_active_event)

        elif data.data == b"weather_agent_done":
            await handle_agent_done("weather", weather_active_event)

        else:
            logger.debug("Data received: %s", data.data)
    except Exception as e:
        logger.error("Error handling data packet: %s", e)
In this implementation, the
handle_agent_done
function is called whenever a secondary agent signals that it is done. This function handles the cleanup and triggers the main agent to speak, ensuring a consistent and efficient process for managing multiple agents. Sources: - https://livekit-users.slack.com/archives/C07FY8WHGPM/p1728987953.956549 - https://livekit-users.slack.com/archives/C07FY8WHGPM/p1730355369.957779 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
q
@millions-hair-24956 i am getting this error: 2025-02-25 003302,569 - DEBUG livekit.agents.pipeline - speech playout finished {"speech_id": "9cea28240059", "interrupted": false} 2025-02-25 003302,569 - DEBUG livekit.agents.pipeline - committed agent speech {"agent_transcript": "Ok, I got it.", "interrupted": false, "speech_id": "9cea28240059"} 2025-02-25 003307,892 - ERROR asyncio - _GatheringFuture exception was never retrieved future: <_GatheringFuture finished exception=CancelledError()> Traceback (most recent call last): File "C:\Users\Usuario\my-app\outbound-caller-python-main\venv\Lib\site-packages\livekit\plugins\deepgram\stt.py", line 500, in recv_task msg = await ws.receive() ^^^^^^^^^^^^^^^^^^ File "C:\Users\Usuario\my-app\outbound-caller-python-main\venv\Lib\site-packages\aiohttp\client_ws.py", line 334, in receive msg = await self._reader.read() ^^^^^^^^^^^^^^^^^^^^^^^^^ File "aiohttp\\_websocket\\reader_c.py", line 112, in read File "aiohttp\\_websocket\\reader_c.py", line 109, in aiohttp._websocket.reader_c.WebSocketDataQueue.read asyncio.exceptions.CancelledError 2025-02-25 003307,893 - ERROR asyncio - _GatheringFuture exception was never retrieved future: <_GatheringFuture finished exception=CancelledError()> Traceback (most recent call last):
g
The error you're encountering seems to be related to an
asyncio.CancelledError
, which typically occurs when an asynchronous task is cancelled. This can happen for various reasons, such as when a task is explicitly cancelled or when the event loop is shutting down. Here are a few steps you can take to address this issue: 1. Check Task Cancellation: Ensure that tasks are not being cancelled prematurely. Review your code to see if there are any explicit calls to
task.cancel()
that might be affecting the task in question. 2. Handle Task Cancellation: If a task is expected to be cancelled, you can handle the
CancelledError
exception gracefully. This can be done by wrapping the task in a try-except block and catching the
CancelledError
. 3. Review WebSocket Connection: The error occurs in the
recv_task
function, which is likely part of a WebSocket connection. Ensure that the WebSocket connection is being managed correctly and that it is not being closed unexpectedly. 4. Debugging: Add additional logging to help identify when and why the task is being cancelled. This can provide more context and help pinpoint the root cause of the issue. Here's an example of how you might handle the `CancelledError`:
Copy code
async def recv_task():
    try:
        while True:
            msg = await ws.receive()
            # Process the message
    except asyncio.CancelledError:
        logger.info("recv_task was cancelled")
        # Perform any necessary cleanup here
    except Exception as e:
        logger.error(f"Error in recv_task: {e}")
If you need further assistance or have specific questions, please let me know. Sources: - https://docs.livekit.io/python/livekit/plugins/deepgram/index.html#livekit.plugins.deepgram.SpeechStream - https://livekit-users.slack.com/archives/C07FY8WHGPM/p1732682471.692099 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.