@millions-hair-24956 here is my file if that helps: import asyncio
import logging
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import AutoSubscribe, JobContext, WorkerOptions, cli, llm
from livekit.agents.multimodal import MultimodalAgent
from livekit.plugins import openai
load_dotenv()
logger = logging.getLogger("home-agent")
logger.setLevel(
logging.INFO)
class HomeAssistantFnc(llm.FunctionContext):
def __init__(self, ctx, weather_event, bakery_event):
super().__init__()
self.ctx = ctx
self.weather_event = weather_event
self.bakery_event = bakery_event
@llm.ai_callable(description="Dispatch the weather agent if not active.")
async def dispatch_weather_agent(self) -> str:
if not self.weather_event.is_set():
logger.info("Dispatching weather agent via function call...")
await dispatch_weather_agent(self.ctx.room.name)
self.weather_event.set()
return "Weather agent dispatched"
else:
logger.info("Weather agent already active, skipping dispatch.")
return "Weather agent already active"
@llm.ai_callable(description="Dispatch the bakery agent if not active.")
async def dispatch_bakery_agent(self) -> str:
if not self.bakery_event.is_set():
logger.info("Dispatching bakery agent via function call...")
await dispatch_bakery_agent(self.ctx.room.name)
self.bakery_event.set()
return "Bakery agent dispatched"
else:
logger.info("Bakery agent already active, skipping dispatch.")
return "Bakery agent already active"
def prewarm(proc):
logger.info("Prewarming: No VAD needed for multimodal agent.")
async def entrypoint(ctx: JobContext):
logger.info("Connecting to room %s...", ctx.room.name)
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
logger.info("Connected to room: %s", ctx.room.name)
participant = await ctx.wait_for_participant()
logger.info("Participant '%s' joined. Starting home agent...", participant.identity)
# Create events to track the active state of secondary agents.
weather_event = asyncio.Event()
bakery_event = asyncio.Event()
fnc_ctx = HomeAssistantFnc(ctx, weather_event, bakery_event)
# Initial chat context.
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are the main assistant. Answer queries normally. "
"Dispatch the weather assistant for weather queries, and the bakery assistant for croissant or bakery advice."
)
)
# Create a realtime model for multimodal interactions using the preview model.
model = openai.realtime.RealtimeModel(
model="gpt-4o-mini-realtime-preview-2024-12-17",
instructions=(
"You are the main assistant. Answer queries normally. "
"Dispatch the weather assistant for weather queries, and the bakery assistant for croissant or bakery advice."
),
voice="alloy",
temperature=0.8,
modalities=["audio", "text"],
turn_detection=openai.realtime.ServerVadOptions(
threshold=0.6, prefix_padding_ms=200, silence_duration_ms=500
)
)
agent = MultimodalAgent(model=model, chat_ctx=initial_ctx)
agent.start(ctx.room, participant)
logger.info("Home agent started.")
# Helper function to send a message.
async def say_message(message: str):
logger.info("Home Agent speaking: '%s'", message)
# Obtain a copy of the current chat context.
chat_ctx = agent.chat_ctx_copy()
chat_ctx.append(role="assistant", text=message)
await agent.set_chat_ctx(chat_ctx)
# Trigger the realtime response.
agent.generate_reply()
# --- Handle secondary agent handbacks ---
async def handle_agent_done(agent_name: str, event: asyncio.Event):
logger.info(f"{agent_name.capitalize()} agent done signal received. Handling transfer back to main assistant.")
from livekit import api
lkapi = api.LiveKitAPI()
try:
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
logger.debug("Current dispatches: %s", dispatches)
for d in dispatches:
if d.agent_name == agent_name:
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
logger.info(f"Deleted {agent_name} agent dispatch: {d.id}")
break
except Exception as e:
logger.error(f"Failed to delete {agent_name} agent dispatch: {e}")
finally:
await lkapi.aclose()
# Reset conversation context.
new_ctx = llm.ChatContext().append(
role="system", text="You are the main assistant. Answer queries normally."
)
await agent.set_chat_ctx(new_ctx)
await asyncio.sleep(2) # Allow overlapping speech to finish.
await say_message("Ok, I got it.")
event.clear()
async def handle_data_received(data: rtc.DataPacket):
try:
if data.data == b"weather_agent_joined":
logger.info("Weather agent joined; no subscription toggling needed for multimodal agent.")
elif data.data == b"weather_agent_done":
await handle_agent_done("weather", weather_event)
elif data.data == b"bakery_agent_joined":
logger.info("Bakery agent joined; no subscription toggling needed for multimodal agent.")
elif data.data == b"bakery_agent_done":
await handle_agent_done("bakery", bakery_event)
else:
logger.debug("Data received: %s", data.data)
except Exception as e:
logger.error("Error handling data packet: %s", e)
@ctx.room.on("data_received")
def on_data_received(data: rtc.DataPacket):
asyncio.create_task(handle_data_received(data))
async def determine_intent(message: str) -> str:
logger.info("Determining intent via NLP for message: '%s'", message)
prompt = (
"Classify the following user message into one of these categories: weather, bakery, or general. "
"Return only one word as the intent. Message: \"" + message + "\""
)
nlp_ctx = llm.ChatContext().append(role="system", text=prompt)
response = await openai.LLM().chat(chat_ctx=nlp_ctx)
intent = response.strip().lower()
if intent not in {"weather", "bakery", "general"}:
intent = "general"
logger.info("Determined intent via NLP: '%s'", intent)
return intent
async def handle_incoming_message(message: str):
logger.info("Handling incoming message: '%s'", message)
intent = await determine_intent(message)
if intent == "weather":
logger.info("Intent 'weather' detected. Initiating dispatch to weather agent.")
asyncio.create_task(dispatch_weather_agent(ctx.room.name))
await say_message("Ok, I got it.")
elif intent == "bakery":
logger.info("Intent 'bakery' detected. Initiating dispatch to bakery agent.")
asyncio.create_task(dispatch_bakery_agent(ctx.room.name))
await say_message("Ok, I got it.")
else:
logger.info("Processing message as general query.")
# Generate reply using the multimodal agent.
agent.generate_reply()
@ctx.room.on("message_received")
def on_message_received(msg: rtc.ChatMessage):
if msg.message:
logger.info("Chat message received: '%s'", msg.message)
asyncio.create_task(handle_incoming_message(msg.message))
await say_message("Hey, how can I help you today?")
logger.info("Initial greeting sent by home agent.")
async def dispatch_weather_agent(room_name: str):
from livekit import api
logger.info("Dispatching weather agent to room '%s'...", room_name)
lkapi = api.LiveKitAPI()
dispatch_request = api.CreateAgentDispatchRequest(
agent_name="weather_agent",
room=room_name,
metadata="Weather inquiry dispatched by home agent",
)
logger.debug("Weather dispatch request: %s", dispatch_request)
try:
dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
logger.info("Weather agent dispatched successfully: %s", dispatch)
except Exception as e:
logger.error("Failed to dispatch weather agent: %s", e)
finally:
await lkapi.aclose()
async def dispatch_bakery_agent(room_name: str):
from livekit import api
logger.info("Dispatching bakery agent to room '%s'...", room_name)
lkapi = api.LiveKitAPI()
dispatch_request = api.CreateAgentDispatchRequest(
agent_name="bakery_agent",
room=room_name,
metadata="Bakery inquiry dispatched by home agent",
)
logger.debug("Bakery dispatch request: %s", dispatch_request)
try:
dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
logger.info("Bakery agent dispatched successfully: %s", dispatch)
except Exception as e:
logger.error("Failed to dispatch bakery agent: %s", e)
finally:
await lkapi.aclose()
if
name == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
),
)