ancient-king-38046
06/25/2025, 11:35 AMtall-airline-16756
06/25/2025, 11:35 AMtall-airline-16756
06/25/2025, 11:35 AMancient-king-38046
06/25/2025, 11:35 AMfrom __future__ import annotations
import collections.abc
import logging
import asyncio
import aiohttp
from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import JobContext, WorkerOptions, cli, llm, voice
from livekit.agents.llm import function_tool
from livekit.plugins import google
import json
load_dotenv()
logger = logging.getLogger("livekit-agent")
logger.setLevel(logging.DEBUG)
def get_metadata(participant: rtc.RemoteParticipant) -> dict:
"""
Parses the participant's metadata.
The participant's metadata is expected to be a JSON string. If it's not,
or if it's missing, an empty dictionary is returned.
Args:
participant: The remote participant.
Returns:
A dictionary containing the metadata.
"""
if not participant.metadata:
<http://logger.info|logger.info>("Participant has no metadata, using empty metadata.")
return {}
<http://logger.info|logger.info>(
f"Participant {participant.identity} joined with metadata: {participant.metadata}"
)
try:
return json.loads(participant.metadata)
except json.JSONDecodeError:
logger.warning(
"Could not parse participant metadata as JSON, using empty metadata."
)
return {}
async def send_message_to_frontend(room: rtc.Room, message_type: str, message: str, extra_data: dict = None):
"""
Helper function to send structured messages to the frontend.
Args:
room: The LiveKit room instance
message_type: Type of message (e.g., 'call_ending', 'status_update', 'error')
message: The message content
extra_data: Additional data to include in the message
"""
try:
payload = {
"type": message_type,
"message": message,
"timestamp": asyncio.get_event_loop().time()
}
if extra_data:
payload.update(extra_data)
await room.local_participant.publish_data(
json.dumps(payload).encode('utf-8'),
reliable=True
)
<http://logger.info|logger.info>(f"Sent {message_type} message to frontend: {message}")
except Exception as e:
logger.error(f"Failed to send {message_type} message: {e}")
class MyAgent(voice.Agent):
def __init__(self, ctx: JobContext, **kwargs):
super().__init__(**kwargs)
self.ctx = ctx
@function_tool()
async def finish_call(self):
"""
AI-callable function to gracefully end the conversation and hang up the call.
This function will gracefully terminate the current session and disconnect
the participant from the room. Use this when the conversation has naturally
concluded or when the user explicitly requests to end the call.
"""
<http://logger.info|logger.info>("Ending the call.")
await send_message_to_frontend(
self.ctx.room,
"call_ending",
"The call will end in a few seconds. Thank you for your time!",
)
await asyncio.sleep(2)
asyncio.create_task(self.session.aclose())
def create_llm_model(metadata: dict) -> google.beta.realtime.RealtimeModel:
"""
Creates and configures a Gemini RealtimeModel.
Args:
metadata: A dictionary containing configuration like 'voice' and 'temperature'.
Returns:
An instance of google.beta.realtime.RealtimeModel.
"""
return google.beta.realtime.RealtimeModel(
model="gemini-2.5-flash-preview-native-audio-dialog",
voice="Puck",
temperature=0.8,
# Pass an empty list to disable default tools like Google Search
_gemini_tools=[],
)
async def post_transcript(callback_url: str, transcript: list[dict]):
"""
Posts the conversation transcript to a callback URL.
Args:
callback_url: The URL to post the transcript to.
transcript: A list of message dictionaries.
"""
try:
async with aiohttp.ClientSession() as http_session:
async with <http://http_session.post|http_session.post>(
callback_url,
json={"transcript": transcript},
headers={"Content-Type": "application/json"},
) as response:
if response.status == 200:
<http://logger.info|logger.info>("Successfully posted transcript to callback URL.")
else:
logger.error(
f"Failed to post transcript. Status: {response.status} Body: {await response.text()}"
)
except Exception as e:
logger.error(f"Error posting transcript to callback URL: {e}")
def register_session_handlers(session: voice.AgentSession, metadata: dict):
"""
Registers event handlers for the agent session.
Args:
session: The voice.AgentSession instance.
metadata: The session metadata containing the transcript_callback_url.
"""
@session.on("conversation_item_added")
def on_speech_committed(event: voice.ConversationItemAddedEvent):
"""Log committed conversation items."""
logger.debug(f"Conversation item added: {event}")
@session.on("close")
def on_session_close(event: voice.CloseEvent):
"""
Handles the session close event by logging, formatting the chat history,
and posting the transcript to a callback URL if provided.
"""
<http://logger.info|logger.info>(f"The agent session has closed. Reason: {event.reason}")
sorted_items = sorted(
session.history.items,
key=lambda item: getattr(item, "end_time", getattr(item, "timestamp", 0)),
)
transcript = []
for item in sorted_items:
if item.type == "message" and item.text_content:
text = item.text_content
if item.interrupted:
text += " (interrupted)"
transcript.append(
{
"role": str(item.role),
"content": text,
}
)
# Post transcript to callback URL
callback_url = metadata.get("transcript_callback_url", "")
if callback_url:
<http://logger.info|logger.info>(f"Posting transcript to callback URL: {callback_url}")
asyncio.create_task(post_transcript(callback_url, transcript))
async def entrypoint(ctx: JobContext):
"""
The main entrypoint for the agent. This function is called by the
LiveKit agent framework when a new job is created.
"""
<http://logger.info|logger.info>("starting agent entrypoint")
await ctx.connect()
<http://logger.info|logger.info>("connected to room %s", ctx.room.name)
try:
participant = await ctx.wait_for_participant()
metadata = get_metadata(participant)
<http://logger.info|logger.info>(f"Participant metadata: {metadata}")
instructions = metadata.get("instructions", "")
instructions += " When the conversation is over, say your goodbyes and then call the `finish_call` function."
llm_model = create_llm_model(metadata)
chat_ctx = llm.ChatContext.empty()
chat_ctx.add_message(
content="Hello! I just joined the call. Please greet me and start our conversation.",
role="user",
)
agent = MyAgent(
ctx=ctx,
llm=llm_model,
chat_ctx=chat_ctx,
instructions=instructions,
)
session = voice.AgentSession()
register_session_handlers(session, metadata)
await session.start(agent, room=ctx.room)
<http://logger.info|logger.info>("Agent session started")
await session.generate_reply()
except Exception as e:
logger.error("An error occurred during agent execution: %s", e, exc_info=True)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
tall-airline-16756
06/25/2025, 11:35 AM