how do i solve this error: home_agent | {"message": "unhandled exception while running the job task\nTraceback (most recent call last):\n File \"/app/home_agent.py\", line 50, in entrypoint\n await ctx.room.local_participant.set_metadata({'role': 'speaker'})\n File \"/usr/local/lib/python3.11/site-packages/livekit/rtc/participant.py\", line 517, in set_metadata\n req.set_local_metadata.metadata = metadata\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\nTypeError: bad argument type for built-in operation", "level": "ERROR", "name": "livekit.agents", "pid": 11, "job_id": "AJ_AL4SHnUGw9Pc", "timestamp": "2025-02-25T20
4428.820136+00:00"} here is my code:
# Configure logging
logger = logging.getLogger("home-agent")
logger.setLevel(
logging.INFO)
class HomeAssistantFnc(llm.FunctionContext):
def __init__(self, ctx, weather_event, bakery_event):
super().__init__()
self.ctx = ctx
self.weather_event = weather_event
self.bakery_event = bakery_event
@llm.ai_callable(description="Dispatch the weather agent if not active.")
async def dispatch_weather_agent(self) -> str:
if not self.weather_event.is_set():
logger.info("Dispatching weather agent via function call...")
await dispatch_weather_agent(self.ctx.room.name)
self.weather_event.set()
return "Weather agent dispatched"
else:
logger.info("Weather agent already active, skipping dispatch.")
return "Weather agent already active"
@llm.ai_callable(description="Dispatch the bakery agent if not active.")
async def dispatch_bakery_agent(self) -> str:
if not self.bakery_event.is_set():
logger.info("Dispatching bakery agent via function call...")
await dispatch_bakery_agent(self.ctx.room.name)
self.bakery_event.set()
return "Bakery agent dispatched"
else:
logger.info("Bakery agent already active, skipping dispatch.")
return "Bakery agent already active"
async def entrypoint(ctx: JobContext):
logger.info("Connecting to room %s...", ctx.room.name)
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# Set agent metadata to indicate it's a speaker; pass a JSON string instead of a dict.
await ctx.room.local_participant.set_metadata(json.dumps({'role': 'speaker'}))
logger.info("Set agent metadata to speaker")
logger.info("Connected to room: %s", ctx.room.name)
# Wait for the user participant (not the agent itself)
participant = await ctx.wait_for_participant()
while participant.identity == ctx.room.local_participant.identity:
logger.info("Waiting for a user participant, skipping agent identity: %s", participant.identity)
participant = await ctx.wait_for_participant()
logger.info("User participant '%s' joined. Starting home agent...", participant.identity)
# Create events to track the active state of secondary agents
weather_event = asyncio.Event()
bakery_event = asyncio.Event()
fnc_ctx = HomeAssistantFnc(ctx, weather_event, bakery_event)
# Define the RealtimeModel with manual turn detection
model = openai.realtime.RealtimeModel(
instructions=(
"You are the main assistant. Answer queries normally. "
"Use the dispatch_weather_agent function for weather queries, "
"and dispatch_bakery_agent for bakery-related queries."
),
voice="alloy",
temperature=0.8,
modalities=["audio", "text"],
turn_detection=None, # Manual control over response generation
)
# Initialize chat context with system instructions and initial greeting
initial_ctx = llm.ChatContext().append(
role="system",
text=(
"You are the main assistant. Answer queries normally. "
"Dispatch the weather assistant for weather queries, and the bakery assistant for croissant or bakery advice."
),
).append(
role="assistant",
text="Hello! How can I assist you today?"
)
# Create and start the MultimodalAgent with the user participant
agent = MultimodalAgent(
model=model,
chat_ctx=initial_ctx,
fnc_ctx=fnc_ctx,
)
agent.start(ctx.room, participant)
logger.info("Home agent started for participant: %s", participant.identity)
# Log agent's audio tracks if available
if hasattr(ctx.room.local_participant, 'tracks'):
audio_tracks = [track for track in ctx.room.local_participant.tracks if track.kind == rtc.TrackKind.KIND_AUDIO]
logger.info("Agent audio tracks: %s", audio_tracks)
else:
logger.info("LocalParticipant has no attribute 'tracks'.")
# Flag to control when the home agent is active
is_active = True
# Handle user speech events to trigger responses only when active
@agent.on("user_stopped_speaking")
def on_user_stopped_speaking():
if is_active:
logger.info("User stopped speaking, generating reply.")
agent.generate_reply()
# Trigger the initial greeting
agent.generate_reply()
# Audio management helper
class AudioManager:
def __init__(self):
self.user_audio_track = None
self.subscription_set = asyncio.Event()
def set_user_audio_track(self, track):
self.user_audio_track = track
self.subscription_set.set()
logger.info("User audio track set: %s", track.sid)
async def wait_for_subscription(self):
await self.subscription_set.wait()
if self.user_audio_track is None:
logger.error("User audio track is None after subscription event")
raise ValueError("User audio track not set")
return self.user_audio_track
audio_mgr = AudioManager()
@ctx.room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, publication: rtc.TrackPublication, rp: rtc.RemoteParticipant):
if rp.identity == participant.identity and track.kind == rtc.TrackKind.KIND_AUDIO:
audio_mgr.set_user_audio_track(track)
logger.info("Subscribed to user audio track: %s for participant: %s", track.sid, rp.identity)
# Wait for the user's audio track subscription
try:
await asyncio.wait_for(audio_mgr.wait_for_subscription(), timeout=10.0)
except asyncio.TimeoutError:
logger.error("Timed out waiting for user audio track subscription")
return
# Unified handler for secondary agent completion
async def handle_agent_done(agent_name: str, event: asyncio.Event):
logger.info(f"{agent_name.capitalize()} agent done signal received. Transferring back to home agent.")
lkapi = api.LiveKitAPI()
try:
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=ctx.room.name)
for d in dispatches:
if d.agent_name == agent_name:
await lkapi.agent_dispatch.delete_dispatch(dispatch_id=d.id, room_name=ctx.room.name)
logger.info(f"Deleted {agent_name} agent dispatch: %d.id")
break
except Exception as e:
logger.error(f"Failed to delete {agent_name} agent dispatch: %s", e)
finally:
await lkapi.aclose()
# Resubscribe to the user's audio track
user_track = await audio_mgr.wait_for_subscription()
user_track.set_subscribed(True)
logger.info("Resubscribed to user audio track after %s agent completed.", agent_name)
# Clear the active event
event.clear()
# Reset conversation context
agent.chat_ctx.messages.clear()
agent.chat_ctx.append(role="system", text="You are the main assistant. Answer queries normally.")
agent.chat_ctx.append(role="assistant", text=f"Back from the {agent_name} assistant. How can I help you now?")
# Update active status
nonlocal is_active
is_active = True
# Wait to avoid overlapping speech
await asyncio.sleep(2)
# Trigger a follow-up message
agent.generate_reply()
# Data packet handler for agent signals
async def handle_data_received(data: rtc.DataPacket):
nonlocal is_active
try:
await audio_mgr.wait_for_subscription()
if data.data == b"weather_agent_joined":
logger.info("Weather agent joined, pausing home agent.")
if audio_mgr.user_audio_track:
audio_mgr.user_audio_track.set_subscribed(False)
agent.model.session.send({"type": "response.cancel"})
is_active = False
elif data.data == b"weather_agent_done":
await handle_agent_done("weather", weather_event)
elif data.data == b"bakery_agent_joined":
logger.info("Bakery agent joined, pausing home agent.")
if audio_mgr.user_audio_track:
audio_mgr.user_audio_track.set_subscribed(False)
agent.model.session.send({"type": "response.cancel"})
is_active = False
elif data.data == b"bakery_agent_done":
await handle_agent_done("bakery", bakery_event)
else:
logger.debug("Data received: %s", data.data)
except Exception as e:
logger.error("Error handling data packet: %s", e)
@ctx.room.on("data_received")
def on_data_received(data: rtc.DataPacket):
asyncio.create_task(handle_data_received(data))
async def dispatch_weather_agent(room_name: str):
from livekit import api
logger.info("Dispatching weather agent to room '%s'...", room_name)
lkapi = api.LiveKitAPI()
dispatch_request = api.CreateAgentDispatchRequest(
agent_name="weather_agent",
room=room_name,
metadata="Weather inquiry dispatched by home agent",
)
try:
dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
logger.info("Weather agent dispatched successfully: %s", dispatch)
except Exception as e:
logger.error("Failed to dispatch weather agent: %s", e)
finally:
await lkapi.aclose()
async def dispatch_bakery_agent(room_name: str):
from livekit import api
logger.info("Dispatching bakery agent to room '%s'...", room_name)
lkapi = api.LiveKitAPI()
dispatch_request = api.CreateAgentDispatchRequest(
agent_name="bakery_agent",
room=room_name,
metadata="Bakery inquiry dispatched by home agent",
)
try:
dispatch = await lkapi.agent_dispatch.create_dispatch(dispatch_request)
logger.info("Bakery agent dispatched successfully: %s", dispatch)
except Exception as e:
logger.error("Failed to dispatch bakery agent: %s", e)
finally:
await lkapi.aclose()
if
name == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
),
)