quick-art-43720
02/23/2025, 1:25 PMgray-monkey-83525
02/23/2025, 1:25 PMHomeAssistantFnc
class and the dispatch_weather_agent
function appears to be correctly set up to manage the dispatching of the weather agent. The use of an asyncio.Event
to track the state of the weather agent is a good way to manage the agent's lifecycle. Additionally, the handle_data_received
function effectively manages the audio track subscription based on the weather agent's state, ensuring that the main agent's context is reset appropriately.
If you would like more detailed feedback or have specific concerns about certain parts of the code, please let me know.
Sources:
- https://livekit-users.slack.com/archives/C07FY8WHGPM/p1730325243.151429 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.quick-art-43720
02/23/2025, 1:34 PMgray-monkey-83525
02/23/2025, 1:34 PMweather_agent
implementation is well-structured for its intended purpose. Here are a few observations and potential improvements:
1. Error Handling: Your code already includes error handling for network requests and data publishing, which is good. Consider adding more specific error messages or retry logic for network requests to improve robustness.
2. Logging: The use of logging is consistent, which is helpful for debugging. Ensure that sensitive information is not logged, especially if this code is used in a production environment.
3. Function Context: The WeatherAssistantFnc
class effectively manages the weather retrieval and handback signaling. Ensure that the get_weather
function handles edge cases, such as invalid or missing data from the API.
4. Data Publishing: The use of data packets to signal the main agent is a good approach. Ensure that the main agent is set up to handle these signals correctly, as you have done in the home_agent
.
5. VoicePipelineAgent: The setup of the VoicePipelineAgent
with VAD, STT, LLM, and TTS is appropriate. Ensure that the models and voices used are optimal for your use case.
6. Async Operations: The use of asyncio
for asynchronous operations is appropriate. Ensure that all asynchronous tasks are awaited properly to avoid potential issues with task completion.
If you have specific concerns or areas you'd like more feedback on, please let me know.
Sources:
- https://livekit-users.slack.com/archives/C07FY8WHGPM/p1729242384.877639
- https://docs.livekit.io/agents/voice-agent/function-calling/
Note: I wasn’t able to find highly relevant data sources, but above are a few potentially relevant links. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.quick-art-43720
02/23/2025, 1:37 PMgray-monkey-83525
02/23/2025, 1:37 PMAgentDispatchService
to explicitly dispatch agents to a room. This allows you to control when agents join and leave the room.
2. Create Dispatch: When you need an agent to join a room, create a dispatch using the create_dispatch
method. This method requires specifying the agent's name and the room name.
3. Delete Dispatch: To remove an agent from a room, use the delete_dispatch
method. This will terminate the agent's session in the room.
Here's a simplified example in Python:
from livekit import api
async def manage_agent_dispatch(room_name, agent_name):
lkapi = api.LiveKitAPI()
# Create a dispatch to add the agent to the room
dispatch_request = api.CreateAgentDispatchRequest(agent_name=agent_name, room=room_name)
dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
print(f"Agent {agent_name} dispatched to room {room_name}")
# Perform tasks with the agent in the room...
# Delete the dispatch to remove the agent from the room
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=dispatch.id, room_name=room_name)
print(f"Agent {agent_name} removed from room {room_name}")
await lkapi.aclose()
# Example usage
# asyncio.run(manage_agent_dispatch("my-room", "weather_agent"))
This example demonstrates how to dispatch an agent to a room and then remove it after completing its task. If you need more detailed guidance or have specific questions, please let me know.
Sources:
- https://livekit-users.slack.com/archives/C07FY8WHGPM/p1732564590.153089 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.quick-art-43720
02/23/2025, 1:42 PMgray-monkey-83525
02/23/2025, 1:42 PMAgentDispatchService
source.
2. Load Balancing: LiveKit automatically distributes load across available workers, ensuring that no single worker is overwhelmed. This is managed through a job distribution system embedded within LiveKit servers, which routes jobs to workers based on availability and load source.
3. Autoscaling: For environments with fluctuating loads, you can configure autoscaling to dynamically adjust the number of workers based on demand. This ensures that resources are efficiently utilized and that the system can handle peak loads without degradation in performance source.
If you need more detailed guidance on implementing a scalable solution with LiveKit, please let me know.
Sources:
- Dispatching agents
- Deployment and scaling 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.quick-art-43720
02/23/2025, 3:18 PMweather_agent_done
or bakery_agent_done
) is received, it deletes the corresponding dispatch and resets its “active” flag and conversation context. In your opinion is it the best approach? : import asyncio
import logging
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import deepgram, openai, silero
load_dotenv()
logger = logging.getLogger("home-agent")
logger.setLevel(logging.INFO)
class HomeAssistantFnc(llm.FunctionContext):
def __init__(self, ctx, weather_active_event, bakery_active_event):
super().__init__()
self.ctx = ctx
self.weather_active_event = weather_active_event
self.bakery_active_event = bakery_active_event
@llm.ai_callable(description="Dispatch the weather agent if not active.")
async def dispatch_weather_agent(self):
if not self.weather_active_event.is_set():
logger.info("Dispatching weather agent via function call...")
await dispatch_weather_agent(self.ctx.room.name)
self.weather_active_event.set()
else:
logger.info("Weather agent already active, skipping dispatch.")
@llm.ai_callable(description="Dispatch the bakery agent if not active.")
async def dispatch_bakery_agent(self):
if not self.bakery_active_event.is_set():
logger.info("Dispatching bakery agent via function call...")
await dispatch_bakery_agent(self.ctx.room.name)
self.bakery_active_event.set()
else:
logger.info("Bakery agent already active, skipping dispatch.")
def prewarm(proc: JobProcess):
logger.info("Prewarming: Loading VAD...")
proc.userdata["vad"] = silero.VAD.load()
logger.info("Prewarming complete.")
async def entrypoint(ctx: JobContext):
logger.info("Connecting to room %s...", ctx.room.name)
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
logger.info("Connected to room: %s", ctx.room.name)
participant = await ctx.wait_for_participant()
logger.info("Participant '%s' joined. Starting home agent...", participant.identity)
dg_model = "nova-3-general"
if participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP:
dg_model = "nova-2-phonecall"
logger.info("Using telephone-optimized STT model.")
# Create events to track agent states.
weather_active_event = asyncio.Event()
bakery_active_event = asyncio.Event()
fnc_ctx = HomeAssistantFnc(ctx, weather_active_event, bakery_active_event)
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are the main assistant. Answer queries normally. "
"Dispatch the weather assistant if the user asks about weather, "
"and dispatch the bakery assistant if the user asks about croissants or bakery advice."
),
)
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=deepgram.STT(model=dg_model),
llm=openai.LLM(),
tts=openai.TTS(),
chat_ctx=initial_ctx,
fnc_ctx=fnc_ctx,
)
agent.start(ctx.room, participant)
logger.info("Home agent started.")
# Audio management helper.
class AudioManager:
def __init__(self):
self.audio_publication = None
self.subscription_set = asyncio.Event()
def set_publication(self, publication):
self.audio_publication = publication
self.subscription_set.set()
async def wait_for_subscription(self):
await self.subscription_set.wait()
if self.audio_publication is None:
logger.error("Audio publication is None after subscription event")
raise ValueError("Audio publication not set")
audio_mgr = AudioManager()
@ctx.room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, publication: rtc.TrackPublication, rp: rtc.RemoteParticipant):
if rp == participant and track.kind == rtc.TrackKind.KIND_AUDIO and audio_mgr.audio_publication is None:
audio_mgr.set_publication(publication)
logger.info("Audio track subscribed: %s", track.sid)
try:
await asyncio.wait_for(audio_mgr.wait_for_subscription(), timeout=10.0)
except asyncio.TimeoutError:
logger.error("Timed out waiting for audio track subscription")
return
async def handle_data_received(data: rtc.DataPacket):
try:
await audio_mgr.wait_for_subscription()
if data.data == b"weather_agent_joined":
logger.info("Weather agent joined, unsubscribing from audio track")
res = audio_mgr.audio_publication.set_subscribed(False)
if asyncio.iscoroutine(res):
await res
elif data.data == b"weather_agent_done":
logger.info("Weather agent done signal received. Deleting weather dispatch and resubscribing.")
from livekit import api
lkapi = api.LiveKitAPI()
try:
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
for d in dispatches:
if d.agent_name == "weather_agent":
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
logger.info("Deleted weather agent dispatch: %s", d.id)
break
except Exception as e:
logger.error("Failed to delete weather agent dispatch: %s", e)
finally:
await lkapi.aclose()
res = audio_mgr.audio_publication.set_subscribed(True)
if asyncio.iscoroutine(res):
await res
weather_active_event.clear()
agent.chat_ctx.messages.clear()
agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
elif data.data == b"bakery_agent_joined":
logger.info("Bakery agent joined, unsubscribing from audio track")
res = audio_mgr.audio_publication.set_subscribed(False)
if asyncio.iscoroutine(res):
await res
elif data.data == b"bakery_agent_done":
logger.info("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.")
from livekit import api
lkapi = api.LiveKitAPI()
try:
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
for d in dispatches:
if d.agent_name == "bakery_agent":
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
logger.info("Deleted bakery agent dispatch: %s", d.id)
break
except Exception as e:
logger.error("Failed to delete bakery agent dispatch: %s", e)
finally:
await lkapi.aclose()
res = audio_mgr.audio_publication.set_subscribed(True)
if asyncio.iscoroutine(res):
await res
bakery_active_event.clear()
agent.chat_ctx.messages.clear()
agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
else:
logger.debug("Data received: %s", data.data)
except Exception as e:
logger.error("Error handling data packet: %s", e)
@ctx.room.on("data_received")
def on_data_received(data: rtc.DataPacket):
asyncio.create_task(handle_data_received(data))
async def answer_from_text(txt: str):
chat_ctx = agent.chat_ctx.copy()
chat_ctx.append(role="user", text=txt)
logger.info("Answering user text: '%s'", txt)
stream = agent.llm.chat(chat_ctx=chat_ctx)
await agent.say(stream)
async def determine_intent(message: str) -> str:
logger.info("Determining intent for message: '%s'", message)
lower_msg = message.lower()
if any(kw in lower_msg for kw in ["return", "main", "back"]):
intent = "general"
elif not weather_active_event.is_set() and any(kw in lower_msg for kw in ["weather", "forecast"]):
intent = "weather"
elif not bakery_active_event.is_set() and any(kw in lower_msg for kw in ["croissant", "bakery", "pastry", "bread"]):
intent = "bakery"
else:
intent = "general"
logger.info("Determined intent: '%s'", intent)
return intent
async def handle_incoming_message(message: str):
logger.info("Handling incoming message: '%s'", message)
intent = await determine_intent(message)
if intent == "weather":
logger.info("Intent 'weather' detected. Initiating dispatch to weather agent.")
asyncio.create_task(dispatch_weather_agent(ctx.room.name))
await agent.say("Let me bring in the weather assistant.", allow_interruptions=True)
elif intent == "bakery":
logger.info("Intent 'bakery' detected. Initiating dispatch to bakery agent.")
asyncio.create_task(dispatch_bakery_agent(ctx.room.name))
await agent.say("Let me bring in the bakery assistant.", allow_interruptions=True)
else:
logger.info("Processing message as general query.")
await answer_from_text(message)
@ctx.room.on("message_received")
def on_message_received(msg: rtc.ChatMessage):
if msg.message:
logger.info("Chat message received: '%s'", msg.message)
asyncio.create_task(handle_incoming_message(msg.message))
await agent.say("Hey, how can I help you today?", allow_interruptions=True)
logger.info("Initial greeting sent by home agent.")
async def dispatch_weather_agent(room_name: str):
from livekit import api
logger.info("Dispatching weather agent to room '%s'...", room_name)
lkapi = api.LiveKitAPI()
dispatch_request = api.CreateAgentDispatchRequest(
agent_name="weather_agent",
room=room_name,
metadata="Weather inquiry dispatched by home agent",
)
try:
dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
logger.info("Weather agent dispatched successfully: %s", dispatch)
except Exception as e:
logger.error("Failed to dispatch weather agent: %s", e)
finally:
await lkapi.aclose()
async def dispatch_bakery_agent(room_name: str):
from livekit import api
logger.info("Dispatching bakery agent to room '%s'...", room_name)
lkapi = api.LiveKitAPI()
dispatch_request = api.CreateAgentDispatchRequest(
agent_name="bakery_agent",
room=room_name,
metadata="Bakery inquiry dispatched by home agent",
)
try:
dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
logger.info("Bakery agent dispatched successfully: %s", dispatch)
except Exception as e:
logger.error("Failed to dispatch bakery agent: %s", e)
finally:
await lkapi.aclose()
if name == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
),
)gray-monkey-83525
02/23/2025, 3:18 PMasyncio.Event
to track the active state of each agent is a good approach. It ensures that agents are not dispatched multiple times unnecessarily.
2. Dispatch and Cleanup: The logic for creating and deleting dispatches is sound. Ensure that the delete_dispatch
method is called correctly to clean up after an agent completes its task. This helps in managing resources efficiently.
3. Error Handling: You have implemented error handling for network requests and dispatch operations, which is crucial for robustness. Consider adding retries for network operations to handle transient failures.
4. Logging: Your use of logging is consistent and informative. Ensure that sensitive information is not logged, especially in production environments.
5. Concurrency: The use of asyncio.create_task
for handling incoming messages and dispatching agents is appropriate. Ensure that all asynchronous operations are awaited properly to avoid potential issues with task completion.
6. Code Organization: Consider organizing the code into separate modules or classes for better maintainability, especially as the complexity of the application grows.
7. Scalability: The approach should scale well with the number of agents and queries, given LiveKit's architecture. Ensure that the system is tested under load to verify performance and resource utilization.
8. Intent Determination: The determine_intent
function is straightforward and effective. Consider expanding the list of keywords or using a more sophisticated natural language processing approach if needed.
Overall, your implementation is well thought out and should work effectively for the use case described. If you have specific concerns or areas you'd like more feedback on, please let me know. 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.quick-art-43720
02/23/2025, 6:44 PMgray-monkey-83525
02/23/2025, 6:44 PMallow_interruptions
parameter in the say
method. This will stop the main agent from speaking when the secondary agent starts.
2. Manage Audio Tracks: Use the AudioManager
to manage audio subscriptions. When a secondary agent is dispatched, unsubscribe the main agent from the audio track to prevent it from speaking.
3. Use Events to Control Speech: Utilize the active_events
dictionary to control when the main agent should be silent. If a secondary agent is active, prevent the main agent from speaking.
Here's how you can modify your code to implement these suggestions:gray-monkey-83525
02/23/2025, 6:45 PM# Modify the handle_incoming_message function
async def handle_incoming_message(message: str):
<http://logger.info|logger.info>("Handling incoming message: '%s'", message)
intent = await determine_intent(message)
if intent == "weather":
if not active_events["weather"].is_set():
<http://logger.info|logger.info>("Intent 'weather' detected. Dispatching weather agent.")
await agent.say("Let me bring in the weather assistant.", allow_interruptions=True)
active_events["weather"].set() # Set the event to indicate the weather agent is active
asyncio.create_task(dispatch_weather_agent(ctx.room.name))
else:
<http://logger.info|logger.info>("Weather agent already active.")
return # Do not answer further.
elif intent == "bakery":
if not active_events["bakery"].is_set():
<http://logger.info|logger.info>("Intent 'bakery' detected. Dispatching bakery agent.")
await agent.say("Let me bring in the bakery assistant.", allow_interruptions=True)
active_events["bakery"].set() # Set the event to indicate the bakery agent is active
asyncio.create_task(dispatch_bakery_agent(ctx.room.name))
else:
<http://logger.info|logger.info>("Bakery agent already active.")
return # Do not answer further.
else:
<http://logger.info|logger.info>("Processing message as general query.")
await answer_from_text(message)
# Modify the handle_data_received function to clear events
async def handle_data_received(data: rtc.DataPacket):
try:
await audio_mgr.wait_for_subscription()
if data.data == b"weather_agent_done":
<http://logger.info|logger.info>("Weather agent done signal received. Deleting weather dispatch and resubscribing.")
# Delete dispatch and clear event
await delete_dispatch("weather_agent", ctx.room.name)
active_events["weather"].clear()
agent.chat_ctx.messages.clear()
agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
elif data.data == b"bakery_agent_done":
<http://logger.info|logger.info>("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.")
# Delete dispatch and clear event
await delete_dispatch("bakery_agent", ctx.room.name)
active_events["bakery"].clear()
agent.chat_ctx.messages.clear()
agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
else:
logger.debug("Data received: %s", data.data)
except Exception as e:
logger.error("Error handling data packet: %s", e)
# Helper function to delete dispatch
async def delete_dispatch(agent_name: str, room_name: str):
from livekit import api
lkapi = api.LiveKitAPI()
try:
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
for d in dispatches:
if d.agent_name == agent_name:
gray-monkey-83525
02/23/2025, 6:45 PMawait lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=room_name)
<http://logger.info|logger.info>("Deleted %s agent dispatch: %s", agent_name, d.id)
break
except Exception as e:
logger.error("Failed to delete %s agent dispatch: %s", agent_name, e)
finally:
await lkapi.aclose()
gray-monkey-83525
02/23/2025, 6:45 PMquick-art-43720
02/23/2025, 6:56 PMgray-monkey-83525
02/23/2025, 6:56 PMmax_nested_fnc_calls
parameter, which limits the number of nested function calls that can be made by the agent. When this limit is reached, the function context is dropped, which might be causing both agents to speak simultaneously.
To address this, you can increase the max_nested_fnc_calls
parameter in the VoicePipelineAgent
configuration. This will allow more nested function calls, which might help in managing the dispatch and speech of multiple agents more effectively.
Here's how you can modify the VoicePipelineAgent
initialization to increase the `max_nested_fnc_calls`:
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=deepgram.STT(model=dg_model),
llm=openai.LLM(),
tts=openai.TTS(),
chat_ctx=initial_ctx,
fnc_ctx=fnc_ctx,
max_nested_fnc_calls=2, # Increase this value as needed
)
By increasing the max_nested_fnc_calls
, you allow the agent to handle more complex interactions that involve multiple nested function calls. Adjust the value based on your specific use case and test to see if it resolves the issue.
If you need further assistance or have specific questions, please let me know.
Sources:
- https://livekit-users.slack.com/archives/C07FY8WHGPM/p1729822736.610669 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.quick-art-43720
02/24/2025, 4:53 PMpython
# --- Bakery Agent Transfer Logic ---
# 1. Dispatching the Bakery Agent (inside the HomeAssistantFnc function)
class HomeAssistantFnc(llm.FunctionContext):
def __init__(self, ctx, weather_active_event, bakery_active_event):
super().__init__()
self.ctx = ctx
self.weather_active_event = weather_active_event
self.bakery_active_event = bakery_active_event
@llm.ai_callable(description="Dispatch the bakery agent if not active.")
async def dispatch_bakery_agent(self) -> str:
if not self.bakery_active_event.is_set():
<http://logger.info|logger.info>("Dispatching bakery agent via function call...")
await dispatch_bakery_agent(self.ctx.room.name)
self.bakery_active_event.set()
return "Bakery agent dispatched"
else:
<http://logger.info|logger.info>("Bakery agent already active, skipping dispatch.")
return "Bakery agent already active"
# 2. Handling Bakery Agent Signals (inside the data_received handler)
# This logic listens for signals sent from secondary agents.
async def handle_data_received(data: rtc.DataPacket):
try:
await audio_mgr.wait_for_subscription()
if data.data == b"bakery_agent_joined":
# When the bakery agent joins, we unsubscribe the main agent from the audio track.
<http://logger.info|logger.info>("Bakery agent joined, unsubscribing from audio track")
res = audio_mgr.audio_publication.set_subscribed(False)
if asyncio.iscoroutine(res):
await res
elif data.data == b"bakery_agent_done":
# When the bakery agent signals that it is done,
# we delete its dispatch (thus removing it from the room),
# resubscribe the main agent's audio, and clear the active flag.
<http://logger.info|logger.info>("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.")
from livekit import api
lkapi = api.LiveKitAPI()
try:
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
for d in dispatches:
if d.agent_name == "bakery_agent":
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
<http://logger.info|logger.info>("Deleted bakery agent dispatch: %s", d.id)
break
except Exception as e:
logger.error("Failed to delete bakery agent dispatch: %s", e)
finally:
await lkapi.aclose()
res = audio_mgr.audio_publication.set_subscribed(True)
if asyncio.iscoroutine(res):
await res
bakery_active_event.clear()
# Optionally, reset the conversation context so the main assistant starts fresh.
agent.chat_ctx.messages.clear()
agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
else:
logger.debug("Data received: %s", data.data)
except Exception as e:
logger.error("Error handling data packet: %s", e)
# Helper function to dispatch the bakery agent.
async def dispatch_bakery_agent(room_name: str):
from livekit import api
<http://logger.info|logger.info>("Dispatching bakery agent to room '%s'...", room_name)
lkapi = api.LiveKitAPI()
dispatch_request = api.CreateAgentDispatchRequest(
agent_name="bakery_agent",
room=room_name,
metadata="Bakery inquiry dispatched by home agent",
)
try:
dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
<http://logger.info|logger.info>("Bakery agent dispatched successfully: %s", dispatch)
except Exception as e:
logger.error("Failed to dispatch bakery agent: %s", e)
finally:
await lkapi.aclose()
# In the main home-agent code, the handle_data_received function is registered with:
@ctx.room.on("data_received")
def on_data_received(data: rtc.DataPacket):
asyncio.create_task(handle_data_received(data))
# Additionally, the dispatch function is invoked when an incoming message is determined (via NLP-based intent detection)
# to have the "bakery" intent:
async def determine_intent(message: str) -> str:
# [Other logic may remain here...]
# For example, you might call an LLM with a prompt to classify the intent.
# For now, assume it returns one of "weather", "bakery", or "general".
# (This portion remains unchanged from your earlier implementation.)
...
async def handle_incoming_message(message: str):
<http://logger.info|logger.info>("Handling incoming message: '%s'", message)
intent = await determine_intent(message)
if intent == "bakery":
<http://logger.info|logger.info>("Intent 'bakery' detected. Initiating dispatch to bakery agent.")
asyncio.create_task(dispatch_bakery_agent(ctx.room.name))
await agent.say("Let me bring in the bakery assistant.", allow_interruptions=True)
elif intent == "weather":
# Weather dispatch logic (omitted here for brevity)
...
else:
<http://logger.info|logger.info>("Processing message as general query.")
await answer_from_text(message)
Explanation
1. Dispatching the Bakery Agent
2. In the HomeAssistantFnc
class, the dispatch_bakery_agent
method checks an event (bakery_active_event
) to see if the bakery agent is already active. If it isn’t, it calls the helper function dispatch_bakery_agent
with the room name to create a dispatch via the LiveKit API. It then sets the event to mark the bakery agent as active.
3. Handling Data Signals
4. The handle_data_received
asynchronous function listens for data packets received in the room.
◦ When it receives a packet with the content b"bakery_agent_joined"
, it unsubscribes the main agent from the audio track (to let the bakery agent speak without interference).
◦ When it receives a packet with the content b"bakery_agent_done"
, it calls a helper function that deletes the dispatch for the bakery agent (using the LiveKit API) to remove it from the room. Then it resubscribes the audio track, clears the bakery_active_event
, and resets the conversation context so that the main assistant can resume answering normally.
5. Integration with Intent Detection
6. In the handle_incoming_message
function, when the detected intent is "bakery"
, the main agent dispatches the bakery agent (if not already active) and then tells the user that it is bringing in the bakery assistant.gray-monkey-83525
02/24/2025, 4:53 PMagent.say()
method with a predefined message.
2. Use a Callback or Event: You can use an event or a callback function to handle the transition from the bakery agent back to the main agent. This ensures that the main agent resumes control and starts speaking as soon as the bakery agent finishes.
Here's how you can modify your code to implement these suggestions:
# Modify the handle_data_received function to trigger main agent speech
async def handle_data_received(data: rtc.DataPacket):
try:
await audio_mgr.wait_for_subscription()
if data.data == b"bakery_agent_joined":
logger.info("Bakery agent joined, unsubscribing from audio track")
res = audio_mgr.audio_publication.set_subscribed(False)
if asyncio.iscoroutine(res):
await res
elif data.data == b"bakery_agent_done":
logger.info("Bakery agent done signal received. Deleting bakery dispatch and resubscribing.")
from livekit import api
lkapi = api.LiveKitAPI()
try:
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
for d in dispatches:
if d.agent_name == "bakery_agent":
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
logger.info("Deleted bakery agent dispatch: %s", d.id)
break
except Exception as e:
logger.error("Failed to delete bakery agent dispatch: %s", e)
finally:
await lkapi.aclose()
res = audio_mgr.audio_publication.set_subscribed(True)
if asyncio.iscoroutine(res):
await res
bakery_active_event.clear()
agent.chat_ctx.messages.clear()
agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
# Trigger the main agent to speak immediately after the bakery agent is done
await agent.say("The bakery assistant has completed its task. How else can I assist you today?", allow_interruptions=True)
else:
logger.debug("Data received: %s", data.data)
except Exception as e:
logger.error("Error handling data packet: %s", e)
gray-monkey-83525
02/24/2025, 4:53 PMquick-art-43720
02/24/2025, 4:58 PMagent.say()
method with a predefined message.gray-monkey-83525
02/24/2025, 4:58 PM# Unified function to handle agent completion signals
async def handle_agent_done(agent_name: str, event: asyncio.Event):
logger.info(f"{agent_name.capitalize()} agent done signal received. Deleting {agent_name} dispatch and resubscribing.")
from livekit import api
lkapi = api.LiveKitAPI()
try:
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
for d in dispatches:
if d.agent_name == agent_name:
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
logger.info(f"Deleted {agent_name} agent dispatch: {d.id}")
break
except Exception as e:
logger.error(f"Failed to delete {agent_name} agent dispatch: {e}")
finally:
await lkapi.aclose()
res = audio_mgr.audio_publication.set_subscribed(True)
if asyncio.iscoroutine(res):
await res
event.clear()
agent.chat_ctx.messages.clear()
agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
# Trigger the main agent to speak
await agent.say(f"The {agent_name} assistant has completed its task. How else can I assist you today?", allow_interruptions=True)
# Modify the handle_data_received function to use the unified handler
async def handle_data_received(data: rtc.DataPacket):
try:
await audio_mgr.wait_for_subscription()
if data.data == b"bakery_agent_joined":
logger.info("Bakery agent joined, unsubscribing from audio track")
res = audio_mgr.audio_publication.set_subscribed(False)
if asyncio.iscoroutine(res):
await res
elif data.data == b"bakery_agent_done":
await handle_agent_done("bakery", bakery_active_event)
elif data.data == b"weather_agent_done":
await handle_agent_done("weather", weather_active_event)
else:
logger.debug("Data received: %s", data.data)
except Exception as e:
logger.error("Error handling data packet: %s", e)
gray-monkey-83525
02/24/2025, 4:59 PMhandle_agent_done
function is called whenever a secondary agent signals that it is done. This function handles the cleanup and triggers the main agent to speak, ensuring a consistent and efficient process for managing multiple agents.
Sources:
- https://livekit-users.slack.com/archives/C07FY8WHGPM/p1728987953.956549
- https://livekit-users.slack.com/archives/C07FY8WHGPM/p1730355369.957779 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.quick-art-43720
02/24/2025, 11:44 PMgray-monkey-83525
02/24/2025, 11:44 PMasyncio.CancelledError
, which typically occurs when an asynchronous task is cancelled. This can happen for various reasons, such as when a task is explicitly cancelled or when the event loop is shutting down.
Here are a few steps you can take to address this issue:
1. Check Task Cancellation: Ensure that tasks are not being cancelled prematurely. Review your code to see if there are any explicit calls to task.cancel()
that might be affecting the task in question.
2. Handle Task Cancellation: If a task is expected to be cancelled, you can handle the CancelledError
exception gracefully. This can be done by wrapping the task in a try-except block and catching the CancelledError
.
3. Review WebSocket Connection: The error occurs in the recv_task
function, which is likely part of a WebSocket connection. Ensure that the WebSocket connection is being managed correctly and that it is not being closed unexpectedly.
4. Debugging: Add additional logging to help identify when and why the task is being cancelled. This can provide more context and help pinpoint the root cause of the issue.
Here's an example of how you might handle the `CancelledError`:
async def recv_task():
try:
while True:
msg = await ws.receive()
# Process the message
except asyncio.CancelledError:
logger.info("recv_task was cancelled")
# Perform any necessary cleanup here
except Exception as e:
logger.error(f"Error in recv_task: {e}")
If you need further assistance or have specific questions, please let me know.
Sources:
- https://docs.livekit.io/python/livekit/plugins/deepgram/index.html#livekit.plugins.deepgram.SpeechStream
- https://livekit-users.slack.com/archives/C07FY8WHGPM/p1732682471.692099 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.