polite-dog-12152
08/14/2025, 3:47 AMtall-airline-16756
08/14/2025, 3:47 AMtall-airline-16756
08/14/2025, 3:48 AMuser_state_changed
event to notice when a user goes inactive (like switching to "away").
2. Handle the Inactivity: Try reaching out to the inactive user a few times. If they don’t respond, it’s probably time to close the session.
3. Wrap Up the Session and Delete the Room: Once the user doesn’t respond, close the session and delete the room to free up resources.
Here’s a simple example in Python to get you started:
async def user_presence_task():
# Try to ping the user 3 times
for _ in range(3):
await session.generate_reply(
instructions="The user has been inactive. Please respond if you are still present."
)
await asyncio.sleep(10)
# Close the session and delete the room
await session.aclose()
ctx.delete_room()
# Monitor user state changes
@session.on("user_state_changed")
def _user_state_changed(ev: UserStateChangedEvent):
nonlocal inactivity_task
if ev.new_state == "away":
# Start inactivity check when user becomes away
inactivity_task = asyncio.create_task(user_presence_task())
return
# Cancel inactivity task if user becomes active again
if inactivity_task is not None:
inactivity_task.cancel()
This method helps keep things running smoothly by cleaning up the room when it’s no longer needed. If you want to chat more about it or need a hand, just let me know!
Sources: Plugin System | livekit/livekit_composite | DeepWiki | Agents Core Architecture | livekit/livekit_composite | DeepWiki | Voice Assistants | livekit/livekit_composite | DeepWiki | examples/voice_agents/inactive_user.py | Voice Assistants | livekit/livekit_composite | DeepWikipolite-dog-12152
08/14/2025, 4:16 AMtall-airline-16756
08/14/2025, 4:16 AMif len(room.remote_participants) == 0:
print("No participants left in the room")
2. Keep an eye on Participant Disconnections: By using the participant_disconnected
event, you can catch when someone leaves and then see if the room is empty: @room.on("participant_disconnected")
def on_participant_disconnected(participant):
if len(room.remote_participants) == 0:
print("No participants left in the room")
If you want a hand with setting this up or have any questions, just give me a shout—I’m here to help!
Sources: Anatomy of an Agent | LiveKit Docs | livekit-rtc/livekit/rtc/room.pypolite-dog-12152
08/14/2025, 4:17 AMtall-airline-16756
08/14/2025, 4:18 AMimport asyncio
from livekit import rtc
# Create a future that will be set when the room is empty
room_empty_future = asyncio.get_running_loop().create_future()
# Define a callback for when a participant disconnects
def on_participant_disconnected(_: rtc.Participant, fut=room_empty_future):
if len(ctx.room.remote_participants) == 0 and not fut.done():
fut.set_result(None)
# Attach the callback to the room's participant_disconnected event
ctx.room.on("participant_disconnected", on_participant_disconnected)
try:
# Wait for the future to be resolved
await asyncio.wait([room_empty_future], return_when=asyncio.FIRST_COMPLETED)
finally:
# Cleanup the event listener
ctx.room.off("participant_disconnected", on_participant_disconnected)
# Ensure the session is closed properly
await session.aclose()
Explanation:
• Future Setup: We create a future to keep an eye on when the room becomes empty.
• Event Listener: We listen for the participant_disconnected
event to check how many participants are still around.
• Room Cleanup: Once the room is empty, the future is resolved, and we remove the event listener to tidy things up.
This way, you’ll know exactly when the room is empty and can handle any cleanup or next steps smoothly. If anything’s unclear or you want to dive deeper, just let me know—I’m here to help!
Sources: Anatomy of an Agent | LiveKit Docspolite-dog-12152
08/21/2025, 10:02 AMtall-airline-16756
08/21/2025, 10:03 AMstt_node
method in your custom Agent class. This lets you tweak the transcribed text just the way you want before passing it along. Here’s a simple example to get you started:
from livekit import rtc
from livekit.agents import ModelSettings, stt, Agent
from typing import AsyncIterable, Optional
class MaskingAgent(Agent):
def __init__(self):
super().__init__(
instructions="Your agent instructions here",
stt=your_stt_provider,
llm=your_llm_provider,
tts=your_tts_provider,
vad=your_vad_provider
)
async def stt_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> Optional[AsyncIterable[stt.SpeechEvent]]:
# First, get the default STT processing
async for event in Agent.default.stt_node(self, audio, model_settings):
# Here’s where you can post-process the transcribed text
if hasattr(event, 'alternatives') and event.alternatives:
# Let’s mask any sensitive info in the transcript
original_text = event.alternatives[0].text
masked_text = self.mask_sensitive_content(original_text)
event.alternatives[0].text = masked_text
yield event
def mask_sensitive_content(self, text: str) -> str:
# This is where your masking magic happens
import re
# Mask credit card numbers
text = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CREDIT_CARD]', text)
# Mask phone numbers
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE_NUMBER]', text)
# Feel free to add more patterns to mask as needed
return text
Here’s a quick rundown:
• Override the stt_node
method to catch and tweak the STT output.
• Create a handy function like mask_sensitive_content
to swap out sensitive info (think credit cards, phone numbers) with placeholders.
• Yield the updated events so the LLM only sees the masked text.
This way, you’ve got full control over what the LLM receives from the STT. If you want me to walk you through anything or have questions, just give me a shout—I’m here to help!
Sources: Pipeline nodes and hooks | LiveKit Docs | Voice pipeline nodes | LiveKit Docspolite-dog-12152
08/21/2025, 10:49 AMtall-airline-16756
08/21/2025, 10:50 AMpolite-dog-12152
08/21/2025, 11:34 AMtall-airline-16756
08/21/2025, 11:35 AMdef setup_langfuse(
metadata: dict[str, AttributeValue] | None = None,
*,
host: str | None = None,
public_key: str | None = None,
secret_key: str | None = None,
) -> TracerProvider:
Just make sure you have these environment variables set up: LANGFUSE_PUBLIC_KEY
, LANGFUSE_SECRET_KEY
, and LANGFUSE_HOST
.
Want to see it in action? Check out the LiveKit repo’s example where they trace agents like:
• Kelly: Powered by GPT-4o-mini, Deepgram STT, and OpenAI TTS.
• Alloy: Running on OpenAI’s real-time model.
The traces capture user turns and session details, giving you a clear, full picture of what’s happening in your voice agent pipeline. If you have any questions or want a hand setting it up, just let me know—I’m here to help!
Sources: Logs, metrics, and telemetry | LiveKit Docs | examples/voice_agents/langfuse_trace.pypolite-dog-12152
08/21/2025, 11:43 AMtall-airline-16756
08/21/2025, 11:44 AMbefore_tts_cb
with VoicePipelineAgent: This handy callback lets you catch the text before it goes to the TTS engine. Here's a simple example: async def _before_tts_cb(agent: VoicePipelineAgent, text: str | AsyncIterable[str]):
if isinstance(text, str):
# For non-streaming text
return text.replace("[thinking]", "")
else:
# For streaming text
async def process_stream():
async for chunk in text:
yield chunk.replace("[thinking]", "")
return process_stream()
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=openai.STT(),
llm=openai.LLM.with_groq(model="deepseek-r1-distill-llama-70b"),
tts=openai.TTS(),
before_tts_cb=_before_tts_cb
)
2. Overriding llm_node
in a Custom Agent Class: If you want more control, you can tweak the llm_node
method to clean up the LLM output before it reaches the TTS engine. Here's how: class CustomAgent(Agent):
async def llm_node(self, chat_ctx, tools, model_settings=None):
async def process_stream():
async with self.llm.chat(chat_ctx=chat_ctx, tools=tools) as stream:
async for chunk in stream:
content = getattr(chunk.delta, 'content', None)
if content:
chunk.delta.content = content.replace("[thinking]", "")
yield chunk
return process_stream()
3. Built-in Thinking Token Handling: LiveKit also has a strip_thinking_tokens
function that conveniently removes those "thinking" tags from the content for you.
4. Removing Tool Mentions: You can expand your text-cleaning logic to get rid of tool-related phrases as well: def clean_text(text: str) -> str:
text = text.replace("[thinking]", "")
tool_phrases = ["I'll use the", "Let me call the", "Using the tool"]
for phrase in tool_phrases:
text = text.replace(phrase, "")
return text.strip()
These tips should help make your agent’s responses feel smoother and more natural. If you want a hand implementing any of these or have more questions, just let me know—I’m here to help!
Sources: chain-of-thought-tts/agent.py | Modifying LLM output before TTS | LiveKit Docs | livekit-agents/livekit/agents/llm/utils.pypolite-dog-12152
08/23/2025, 9:02 AMtall-airline-16756
08/23/2025, 9:03 AMtts_node
method in LiveKit is a super handy, customizable pipeline node that helps you tweak text before it goes to the Text-To-Speech (TTS) model to create audio. It gives you the power to adjust the text output from the Large Language Model (LLM) just the way you want before it’s turned into speech.
How It Works:
1. The method takes in an asynchronous stream of text (AsyncIterable[str]
) and processes it however you need.
2. In the example here, the text gets filtered with a function like filter_markdown
to clean out any markdown formatting:
async def tts_node(
self, text: AsyncIterable[str], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
# Process the text to remove markdown
filtered_text = filter_markdown(text)
# Pass the processed text to the default TTS implementation
return super().tts_node(filtered_text, model_settings)
Key Features:
• Text Preprocessing: You can change the text before it reaches the TTS model—whether that’s removing markdown, swapping out words, or tweaking pronunciation.
• Custom TTS Logic: Feel free to build your own TTS engines or add special audio processing after the speech is generated.
• Streaming Support: It works smoothly with asynchronous text streams, so you can process text in real-time.
Use Case: In this example, the filter_markdown
function makes sure the TTS model gets clean text without markdown, which helps the speech sound more natural and clear.
If you want me to walk you through anything else or share more examples, just let me know—I’m here to help!
Sources: pipeline-tts/short_replies_only.py | Agents 0.x migration guide | LiveKit Docs | Voice pipeline nodes | LiveKit Docs | livekit-agents/livekit/agents/voice/generation.pypolite-dog-12152
08/23/2025, 9:05 AMtall-airline-16756
08/23/2025, 9:07 AMstt_node()
or transcription_node()
methods in your custom Agent class. Here’s a quick rundown on how to do it:
1. Using the stt_node()
Method: This one lets you customize the whole STT process. Here’s an example: async def stt_node(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings) -> Optional[AsyncIterable[stt.SpeechEvent]]:
# Add your custom processing before STT here
events = Agent.default.stt_node(self, audio, model_settings)
# Add your custom processing after STT here
return events
2. Using the transcription_node()
Method: This method is perfect if you want to modify the transcription text right before it hits the LLM. Check this out: async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings):
replacements = {
"hello": "👋 HELLO",
"goodbye": "GOODBYE 👋",
}
async def process_text():
async for chunk in text:
modified_chunk = chunk
for word, replacement in replacements.items():
modified_chunk = modified_chunk.replace(word, replacement)
yield modified_chunk
return process_text()
Some handy features to keep in mind:
• You can clean up transcription text by removing filler words, fixing pronunciations, or even adding some fun formatting.
• It works with streaming chunks, so your real-time performance stays smooth.
• Go for stt_node()
if you want to customize the whole STT flow, or transcription_node()
if you just want to tweak the text.
If anything’s unclear or you want more examples, just give me a shout—I’m happy to help!
Sources: Pipeline nodes and hooks | LiveKit Docs | pipeline-llm/replacing_llm_output.pypolite-dog-12152
08/23/2025, 10:05 AMtall-airline-16756
08/23/2025, 10:05 AMstt_node
method actually handles audio frames, not text, so the parameter should be audio
instead of text
. Also, the mask_sensitive_content
function needs to be applied to the transcribed text, not the raw audio frames. Here's an updated version for you:
async def stt_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
"""Content Filtering"""
async for event in super().stt_node(audio, model_settings):
if hasattr(event, 'alternatives') and event.alternatives:
# Apply masking to the transcribed text
original_text = event.alternatives[0].text
filtered_text = self.mask_sensitive_content(original_text)
event.alternatives[0].text = filtered_text
yield event
Here’s what to keep in mind:
• The stt_node
method works with audio frames, so make sure the input parameter is audio
.
• Apply mask_sensitive_content
to the transcribed text inside the STT events—not the audio frames themselves.
• Don’t forget to yield the updated events back to the pipeline.
If anything’s unclear or you want to dive deeper, just let me know—I’m here to help!polite-dog-12152
08/23/2025, 10:09 AMtall-airline-16756
08/23/2025, 10:09 AMstt_node
method to tweak the STT output in LiveKit—it’s simpler than it sounds!
What’s the stt_node
for?
The stt_node
method is a key part of the LiveKit voice pipeline. It takes in audio frames (rtc.AudioFrame
) and turns them into speech events (stt.SpeechEvent
) with the transcribed text. By customizing this method, you get to catch and adjust the transcribed text before it moves on to the next step (like feeding into an LLM).
Here’s how you can implement it:
async def stt_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
"""Custom STT Node with Content Filtering"""
# Start by calling the default STT processing to get speech events
async for event in super().stt_node(audio, model_settings):
if hasattr(event, 'alternatives') and event.alternatives:
# Grab the transcribed text from the first alternative
original_text = event.alternatives[0].text
# Apply your own filtering or masking here
filtered_text = self.mask_sensitive_content(original_text)
# Swap out the original text for your filtered version
event.alternatives[0].text = filtered_text
# Send the updated event back into the pipeline
yield event
What’s going on here?
• Audio Input: The audio
parameter is an async stream of audio frames (AsyncIterable[rtc.AudioFrame]
).
• Speech Events: The default stt_node
processes these frames and produces stt.SpeechEvent
objects, which have the transcribed text inside the alternatives
attribute.
• Text Modification: The mask_sensitive_content
function is where you can filter or adjust the text—like hiding phone numbers or credit card info.
• Yielding Events: After you tweak the text, the updated event is sent back to keep the pipeline moving.
Here’s a quick example of `mask_sensitive_content`:
def mask_sensitive_content(self, text: str) -> str:
"""Mask sensitive information in the transcribed text."""
import re
# Hide credit card numbers
text = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CREDIT_CARD]', text)
# Hide phone numbers
text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE_NUMBER]', text)
# You can add more patterns here as needed
return text
How it all fits together:
1. The stt_node
method gets audio frames and runs them through the default STT processing.
2. For each speech event, it grabs the transcribed text and runs it through your mask_sensitive_content
function.
3. The updated event—with filtered text—is yielded back, so only the cleaned-up version moves forward to the LLM.
Why this is great:
• You can easily preprocess transcribed text to protect privacy or meet other custom needs.
• It keeps things running smoothly in real-time by handling text in streaming chunks.
• It fits right into the LiveKit voice pipeline without a hitch.
If you want me to go over anything in more detail or need more examples, just let me know—I’m happy to help!polite-dog-12152
08/23/2025, 10:14 AMtall-airline-16756
08/23/2025, 10:14 AMstt_node
method:
Key Features of Your Masking Class:
• Banned Words: A handy list of sensitive words (like "password," "credit_card") that get masked automatically.
• Regex Patterns: Smart patterns to spot sensitive info such as email addresses, credit card numbers, and government IDs.
• Normalization: A neat method to fix mispronounced or misheard words using a dictionary of replacements.
• Masking: Replaces sensitive details with a default mask (like "****") to keep things safe.
Integration with `stt_node`:
You can easily plug the ContentOptimizer
class into your stt_node
method to clean up the transcribed text. Here’s a simple way to do it:
from typing import AsyncIterable
from livekit import rtc
from livekit.agents import ModelSettings
class CustomAgent(Agent):
def __init__(self):
super().__init__(
instructions="Your agent instructions here",
stt=your_stt_provider,
llm=your_llm_provider,
tts=your_tts_provider,
vad=your_vad_provider
)
self.content_optimizer = ContentOptimizer()
async def stt_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
async for event in super().stt_node(audio, model_settings):
if hasattr(event, 'alternatives') and event.alternatives:
# Grab the transcribed text
original_text = event.alternatives[0].text
# Run it through ContentOptimizer
filtered_text = self.content_optimizer.process(original_text)
# Swap out the original text with the cleaned-up version
event.alternatives[0].text = filtered_text
yield event
How It Works:
1. The stt_node
method takes in audio frames and produces speech events.
2. Each speech event’s transcribed text gets sent to ContentOptimizer.process()
.
3. process()
normalizes the text and masks any sensitive info using your patterns and banned words.
4. The updated text replaces the original in the speech event, which then continues through the pipeline.
Benefits:
• Keeps sensitive info safe before it reaches the LLM.
• Makes the text more accurate by fixing common errors.
• Offers a clean, modular way to handle text processing that you can reuse easily.
If you want to dive deeper or add more features, just let me know—I’m happy to help!polite-dog-12152
08/23/2025, 10:17 AMtall-airline-16756
08/23/2025, 10:17 AMpolite-dog-12152
08/23/2025, 10:19 AMtall-airline-16756
08/23/2025, 10:19 AMclass STTProcessor:
def __init__(self, stt_provider, content_optimizer):
self.stt_provider = stt_provider
self.content_optimizer = content_optimizer
async def process_audio(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings) -> str:
async for event in self.stt_provider.stt_node(audio, model_settings):
if hasattr(event, 'alternatives') and event.alternatives:
text = event.alternatives[0].text
# Apply masking or preprocessing
return self.content_optimizer.process(text)
return ""
2. Language Module:
This module takes the text from the STT and chats with the language model to get the response you want.
class LanguageProcessor:
def __init__(self, llm_provider):
self.llm_provider = llm_provider
async def process_text(self, text: str, language: str) -> str:
# Use the selected language model to process the text
response = await self.llm_provider.chat(text, language=language)
return response
3. Integration:
The main agent ties everything together. After you choose the language, it sends the STT output to the right language processor.
class ModularAgent:
def __init__(self, stt_processor, language_processors):
self.stt_processor = stt_processor
self.language_processors = language_processors
async def handle_audio(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings, language: str):
# Process audio to get text
text = await self.stt_processor.process_audio(audio, model_settings)
# Pass text to the selected language processor
if language in self.language_processors:
response = await self.language_processors[language].process_text(text, language)
return response
else:
raise ValueError(f"Unsupported language: {language}")
Why this is great:
• Modularity: STT and language parts work independently, so you can update or swap them out easily.
• Flexibility: You can support lots of languages by picking the right processor on the fly.
• Scalability: Each module can grow or shrink based on what you need.
If you want, I’m here to help you get this up and running or answer any questions you have!polite-dog-12152
08/23/2025, 10:23 AMtall-airline-16756
08/23/2025, 10:23 AMclass LanguageAgent:
def __init__(self, stt_processor, llm_processor, tools):
self.stt_processor = stt_processor
self.llm_processor = llm_processor
self.tools = tools
async def handle_user_input(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings):
# Turn the audio into text
text = await self.stt_processor.process_audio(audio, model_settings)
# Send the text to the LLM for processing
response = await self.llm_processor.process_text(text)
# Check the response and decide what to do
if "tool" in response:
tool_name = self.extract_tool_name(response)
result = await self.tools[tool_name].execute(response)
return result
else:
return response
def extract_tool_name(self, response: str) -> str:
# Grab the tool name from the LLM response
return response.split()[1] # Just an example
4. Benefits of Multi-Agent Design:
• Modularity: Each agent has its own job, which makes everything easier to manage and grow.
• Scalability: You can scale language agents up or down depending on how many users you have.
• Flexibility: Agents can handle all kinds of user inputs and take the right actions on the fly.
• Seamless Human Transfer: The primary agent smoothly connects users to humans without messing with the language agents.
If you want, I’m here to help you dive deeper into setting this up or tackling any specific parts. Just let me know!polite-dog-12152
08/23/2025, 10:42 AMtall-airline-16756
08/23/2025, 10:43 AM{
"action": "call_tool",
"tool_name": "weather_api",
"parameters": {
"location": "New York"
}
}
Your language agent can then read this and know exactly what to do next.
2. Tool Registry:
Keep a handy list of all your tools, each with a name and a way to run it. Something like this:
tools = {
"weather_api": WeatherAPI(),
"calendar": CalendarTool(),
"email": EmailTool()
}
3. Decision-Making Logic:
Inside your language agent, you’d have some logic that looks at the LLM’s response and calls the right tool:
class LanguageAgent:
def __init__(self, stt_processor, llm_processor, tools):
self.stt_processor = stt_processor
self.llm_processor = llm_processor
self.tools = tools
async def handle_user_input(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings):
# Turn audio into text
text = await self.stt_processor.process_audio(audio, model_settings)
# Send the text to the LLM and get a response
response = await self.llm_processor.process_text(text)
# Check if the response says to call a tool
if "action" in response and response["action"] == "call_tool":
tool_name = response["tool_name"]
parameters = response.get("parameters", {})
# Call the tool if it exists
if tool_name in self.tools:
result = await self.tools[tool_name].execute(**parameters)
return result
else:
return f"Tool {tool_name} not found."
else:
return response
4. Tool Implementation:
Each tool should have an execute
method that takes the right parameters and does its job. For example:
class WeatherAPI:
async def execute(self, location):
# Fetch the weather and return it
return f"The weather in {location} is sunny."
class CalendarTool:
async def execute(self, event_name, date):
# Add an event to the calendar
return f"Event '{event_name}' added to your calendar on {date}."
5. Training the LLM:
To help the LLM get the hang of when to call a tool, you can give it examples and instructions during fine-tuning or prompt engineering. For instance:
• Example Prompt: "If the user asks about the weather, respond with an action to call the weather API."
• Example Input: "What’s the weather in New York?"
• Example Output:{
"action": "call_tool",
"tool_name": "weather_api",
"parameters": {
"location": "New York"
}
}
Why this approach is pretty great:
• It’s dynamic and flexible — the LLM decides when a tool is needed based on what the user says.
• It’s scalable — you can easily add new tools without changing the core logic.
• It’s customizable — you can fine-tune the LLM to handle specific tools and use cases.
If you want me to help you dive deeper or get this set up, just let me know! I’m here to help.polite-dog-12152
08/23/2025, 10:49 AMtall-airline-16756
08/23/2025, 10:50 AMfrom livekit import api
from livekit.protocol import sip as proto_sip
async def transfer_call(participant_identity: str, transfer_to: str):
"""Transfer a SIP call to another number using SIP REFER."""
# Initialize LiveKit API client
livekit_api = api.LiveKitAPI(
url=os.getenv('LIVEKIT_URL'),
api_key=os.getenv('LIVEKIT_API_KEY'),
api_secret=os.getenv('LIVEKIT_API_SECRET')
)
# Create transfer request
transfer_request = proto_sip.TransferSIPParticipantRequest(
participant_identity=participant_identity,
room_name="your_room_name", # Replace with the actual room name
transfer_to=transfer_to, # e.g., "tel:+15105550100"
play_dialtone=True
)
# Execute transfer
await livekit_api.sip.transfer_sip_participant(transfer_request)
Warm Transfer (Supervisor Escalation):
If you want to pass the call along with some context, you can first connect to a supervisor and then merge the calls.
class SessionManager:
async def start_transfer(self):
# Put customer on hold
self.start_hold()
# Create supervisor room
supervisor_room_name = self.customer_room.name + "-supervisor"
self.supervisor_room = rtc.Room()
# Connect to supervisor room
await self.supervisor_room.connect(os.getenv("LIVEKIT_URL"), token.to_jwt())
# Dial supervisor
await self.lkapi.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
sip_trunk_id="your_sip_trunk_id",
sip_call_to="supervisor_number",
room_name=supervisor_room_name,
participant_identity="supervisor-sip",
wait_until_answered=True,
)
)
async def merge_calls(self):
# Move supervisor to customer room
await self.lkapi.room.move_participant(
api.MoveParticipantRequest(
room=self.supervisor_room.name,
identity="supervisor-sip",
destination_room=self.customer_room.name,
)
)
self.stop_hold()
await self.customer_session.say("You are on the line with my supervisor.")
DTMF-Based Transfer:
For those phone menu systems, you can listen for DTMF tones to trigger transfers easily.
def _setup_event_handlers(self, room: rtc.Room):
@room.on("sip_dtmf_received")
def handle_dtmf(dtmf_event: rtc.SipDTMF):
digit = dtmf_event.digit
identity = dtmf_event.participant.identity
department_numbers = {
"1": ("BILLING_PHONE_NUMBER", "Billing"),
"2": ("TECH_SUPPORT_PHONE_NUMBER", "Tech Support"),
"3": ("CUSTOMER_SERVICE_PHONE_NUMBER", "Customer Service")
}
if digit in department_numbers:
env_var, dept_name = department_numbers[digit]
transfer_number = f"tel:{os.getenv(env_var)}"
asyncio.create_task(self._handle_transfer(identity, transfer_number, dept_name))
If you want a hand with setting this up or tweaking it to fit your needs, just let me know—I’m happy to help!
Sources: Transferring calls | LiveKit Docs | Company directory phone assistant | LiveKit Docs | examples/warm-transfer/README.md | agent.pypolite-dog-12152
08/23/2025, 1:22 PMtall-airline-16756
08/23/2025, 1:23 PMupdate_options
Method:
You can change the TTS voice on the fly with the update_options
method. The new voice will kick in with the next thing that’s spoken.
session.tts.update_options(
voice=hume.VoiceByName(name="Colton Rivers", provider=hume.VoiceProvider.hume),
description="The voice exudes calm, serene, and peaceful qualities, like a gentle stream flowing through a quiet forest.",
speed=2,
)
2. Using Function Tools for Agent Switching:
If you want to have multiple voices ready, you can create different agents and switch between them using function tools. Here’s a quick example:
class RimeAgent(Agent):
def __init__(self):
super().__init__(
tts=rime.TTS(model="mistv2", speaker="abbie"),
# ... other config
)
@function_tool
async def switch_to_elevenlabs(self):
"""Switch to ElevenLabs TTS voice"""
return ElevenLabsAgent()
@function_tool
async def switch_to_cartesia(self):
"""Switch to Cartesia TTS voice"""
return CartesiaAgent()
3. Using Participant Attributes:
You can also listen for changes in participant attributes and update the voice dynamically like this:
@ctx.room.on("participant_attributes_changed")
def on_participant_attributes_changed(changed_attributes: dict[str, str], participant: rtc.Participant):
if "voice" in changed_attributes:
voice_id = participant.attributes.get("voice")
# Update TTS options dynamically
tts._opts.voice = voice_id
asyncio.create_task(agent.say("How do I sound now?", allow_interruptions=True))
4. Language Switching:
Want to switch languages during your session? You can use update_options
for that too:
async def _switch_language(self, language_code: str):
if self.tts is not None:
self.tts.update_options(language=language_code)
if self.stt is not None:
self.stt.update_options(language=deepgram_language)
await self.session.say(self.greetings[language_code])
Just a couple of things to keep in mind:
• Voice changes will apply on the next utterance, not in the middle of speaking.
• Different TTS providers might have different ways they handle voice switching.
• You can mix voice changes with other settings like speed or language for a custom feel.
If you want me to walk you through any of these or need examples tailored to what you’re working on, just let me know—I’m here to help!
Sources: Hume TTS integration guide | LiveKit Docs | pipeline-tts/elevenlabs_change_language.py | agent/main.pypolite-dog-12152
08/23/2025, 1:27 PMtall-airline-16756
08/23/2025, 1:27 PMupdate_options
method to change the voice on the fly during a session. AWS TTS offers a bunch of voices, and you just need to specify which one you want in the options. Here’s a simple way to do it:
1. Update TTS Options:
Just use the update_options
method to switch voices dynamically. For example:
session.tts.update_options(
voice="Joanna", # Swap this out for whichever AWS Polly voice you like
language="en-US" # Optional: specify the language if you want
)
This will switch the voice to "Joanna" for the next thing it says.
2. Integrate Voice Switching:
If you want to let users switch voices whenever they want, you can set up a function or a command handler like this:
class LanguageAgent:
def __init__(self, tts):
self.tts = tts
async def switch_voice(self, voice_name: str):
"""Dynamically switch the TTS voice."""
self.tts.update_options(voice=voice_name)
await self.session.say(f"Voice switched to {voice_name}.")
3. Example Voice List:
Here are some popular AWS Polly voices you might want to try:
• English (US): Joanna, Matthew, Ivy, Justin
• English (UK): Amy, Brian, Emma
• Spanish: Conchita, Enrique, Lucia
• French: Celine, Mathieu
If you want to explore all the voices, you can check out the AWS Polly documentation.
4. Combine with Language Switching:
If you’re already handling language switching, you can easily add voice options too:
async def switch_language_and_voice(self, language_code: str, voice_name: str):
self.tts.update_options(language=language_code, voice=voice_name)
await self.session.say(f"Language switched to {language_code} and voice to {voice_name}.")
5. User Interaction:
Let your users ask to change voices naturally or through a menu. For example:
if user_input == "Change voice to Joanna":
await agent.switch_voice("Joanna")
If you want a hand with setting this up or tweaking it to fit your needs, just let me know—I’m happy to help!polite-dog-12152
08/24/2025, 3:20 AMtall-airline-16756
08/24/2025, 3:21 AMlanguage_code
, voice
, and speech_engine
you get are valid before updating anything. Here’s a simple way to do that: if language_code not in self.aws_languages:
raise ValueError(f"Invalid language code: {language_code}")
if voice not in self.aws_voice:
raise ValueError(f"Invalid voice: {voice}")
if speech_engine not in self.aws_speech_engine:
raise ValueError(f"Invalid speech engine: {speech_engine}")
2. Feedback for Voice and Engine Changes: Letting the user know about the voice and speech engine in the confirmation message adds a nice touch of clarity: await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.")
3. Default Fallbacks: Just in case the requested voice or speech engine isn’t found, having a default fallback can keep things running smoothly: aws_voice = self.aws_voice.get(voice, "default_voice")
aws_speech_engine = self.aws_speech_engine.get(speech_engine, "standard")
4. Logging Enhancements: Adding more context in your error logs can make troubleshooting a breeze: logging.error(f"Error switching language to {language_code}, voice: {voice}, engine: {speech_engine}: {e}")
5. Asynchronous Safety: To keep your speech output smooth and uninterrupted, you might want to use a lock around self.session.say
calls if multiple async tasks could interfere.
Example with Improvements:
async def _switch_language(self, language_code: str, voice: str, speech_engine: str):
"""Helper Function To Switch Languages"""
try:
if language_code == self.current_language:
await self.session.say("You are already using this language.")
return
# Validate inputs
if language_code not in self.aws_languages:
raise ValueError(f"Invalid language code: {language_code}")
if voice not in self.aws_voice:
raise ValueError(f"Invalid voice: {voice}")
if speech_engine not in self.aws_speech_engine:
raise ValueError(f"Invalid speech engine: {speech_engine}")
# Update TTS options
if self.tts is not None:
self.tts.update_options(language_code=language_code, voice=voice, speech_engine=speech_engine)
# Update STT options
if self.stt is not None:
aws_languages = self.aws_languages.get(language_code)
aws_voice = self.aws_voice.get(voice)
aws_speech_engine = self.aws_speech_engine.get(speech_engine)
self.stt.update_options(language_code=aws_languages)
# Update current language
self.current_language = language_code
# Confirmation message
await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.")
except Exception as e:
logging.error(f"Error switching language to {language_code}, voice: {voice}, engine: {speech_engine}: {e}")
raise CustomException(e, sys)
If you want me to help with anything else or add more enhancements, just let me know! I’m here to help.polite-dog-12152
08/24/2025, 3:25 AMtall-airline-16756
08/24/2025, 3:25 AMIrishAgent
class looks really solid and thoughtfully put together, especially with the language and voice switching features. Here’s a little review along with some friendly suggestions to make it even better:
Strengths:
• Great job supporting multiple languages and voices, complete with mappings for AWS TTS and STT options.
• Nice touch with validation checks to gracefully handle any invalid inputs for language codes, voices, and speech engines.
• Clear, user-friendly confirmation messages after switching language and voice—super helpful!
• Using @function_tool
decorators makes integrating language-switching commands a breeze.
• Thoughtful exception handling paired with detailed logging to make debugging easier.
Suggestions for Improvement:
1. Default Fallbacks: It might be handy to add default fallbacks for voice and speech engine in case the provided values aren’t found. Something like: aws_voice = self.aws_voice.get(voice, "default_voice")
aws_speech_engine = self.aws_speech_engine.get(speech_engine, "standard")
2. Dynamic Language and Voice Switching: Instead of having separate hardcoded methods (like switch_to_en_gb
), how about a single dynamic method that accepts language code, voice, and speech engine as parameters? For example: @function_tool
async def switch_language(self, language_code: str, voice: str, speech_engine: str):
await self._switch_language(language_code, voice, speech_engine)
3. STT Node: It would be great to uncomment and integrate that custom stt_node
method for content filtering. This way, any sensitive info gets masked before the LLM processes it.
4. Greeting Customization: Including the selected voice and speech engine in the greeting message can give users a clearer sense of what’s happening. Something like: await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.")
5. Code Organization: For easier maintenance, consider moving those mappings (like aws_languages
, aws_voice
, etc.) into a config file or constants module.
Example of a Dynamic Language-Switching Method:
@function_tool
async def switch_language(self, language_code: str, voice: str, speech_engine: str):
"""Switch to a specified language, voice, and speech engine."""
await self._switch_language(language_code, voice, speech_engine)
Next Steps:
• Integrate the stt_node
method for content filtering.
• Test the language and voice switching with different combinations to make sure everything runs smoothly.
• Think about adding more languages and voices to your mappings to support even more users.
Feel free to reach out if you want to chat more or need any help tweaking things!polite-dog-12152
08/24/2025, 4:09 AMtall-airline-16756
08/24/2025, 4:10 AMstt_node
method into your IrishAgent
class to help with content filtering and customization:
What’s the stt_node
method for?
The stt_node
method takes in audio frames and turns them into speech events. By customizing this method, you get to peek at and tweak the transcribed text before it heads over to the LLM.
How to implement it:
async def stt_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
"""Custom STT Node with Content Filtering"""
# First, call the default STT processing to grab speech events
async for event in super().stt_node(audio, model_settings):
if hasattr(event, 'alternatives') and event.alternatives:
# Pull out the transcribed text from the first alternative
original_text = event.alternatives[0].text
# Here’s where you can filter or mask the content
filtered_text = content_optimizer.process(original_text)
# Swap the original text with your filtered version
event.alternatives[0].text = filtered_text
# Send the updated event back down the pipeline
yield event
What’s happening step-by-step:
1. We start by calling the usual stt_node
method to turn audio into speech events.
2. Next, we grab the transcribed text from the alternatives
part of the speech event.
3. Then, we use the ContentOptimizer
to clean up or mask anything sensitive in that text.
4. After that, we replace the original text with the cleaned-up version in the event.
5. Finally, we send that updated event back into the pipeline for the next steps.
Why this is great:
• It makes sure any sensitive info is hidden before the LLM sees it.
• You can customize the transcribed text on the fly.
• It fits right in with the existing LiveKit voice pipeline without a hitch.
What to do next:
• Try out the stt_node
method with different audio clips to make sure the filtering works smoothly.
• If you want, expand the ContentOptimizer
to catch more patterns or handle extra cases.
Feel free to reach out if you want a hand with anything else or want to add more features!polite-dog-12152
08/24/2025, 4:23 AMtall-airline-16756
08/24/2025, 4:24 AMfrom livekit.plugins import silero
vad = silero.VAD.load(
min_speech_duration=2.0, # At least 2 seconds of speech to start
min_silence_duration=1.5, # Wait 1.5 seconds of silence before ending
max_buffered_speech=10.0 # Buffer up to 10 seconds of speech
)
Why this helps:
• Users get to speak longer without interruptions.
• Short pauses won’t cut off their speech prematurely.
If you want a hand setting this up or have any questions, just let me know—I’m here to help!
Sources: Silero VAD plugin | LiveKit Docspolite-dog-12152
08/24/2025, 4:29 AMtall-airline-16756
08/24/2025, 4:29 AMupdate_options
method to easily switch between languages and voices whenever needed.
class MultiLanguageAgent(Agent):
def __init__(self):
super().__init__(
stt=aws.STT(language="en-US", region="us-east-1"),
tts=aws.TTS(region="us-east-1", api_key=os.getenv("AWS_ACCESS_KEY_ID"), api_secret=os.getenv("AWS_SECRET_ACCESS_KEY"))
)
self.current_language = "en-US"
self.language_mappings = {
"en-US": {"voice": "Joanna", "stt_language": "en-US"},
"fr-FR": {"voice": "Celine", "stt_language": "fr-FR"},
"de-DE": {"voice": "Hans", "stt_language": "de-DE"},
"hi-IN": {"voice": "Aditi", "stt_language": "hi-IN"}
}
async def switch_language(self, language_code: str):
if language_code not in self.language_mappings:
raise ValueError(f"Unsupported language: {language_code}")
# Update TTS options
language_settings = self.language_mappings[language_code]
self.tts.update_options(language=language_code, voice=language_settings["voice"])
# Update STT options
self.stt.update_options(language=language_settings["stt_language"])
self.current_language = language_code
await self.session.say(f"Language switched to {language_code}.")
2. Handle User Input:
Give your users an easy way to pick their preferred language, maybe through a command or a menu. For example:
@function_tool
async def switch_to_french(self):
await self.switch_language("fr-FR")
3. Dynamic Language Switching:
Make the language-switching part of your workflow so it adjusts smoothly based on what the user wants or says.
4. AWS Language and Voice Options:
Here are some popular AWS Polly voices you might want to try for different languages:
• English (US): Joanna, Matthew
• French (France): Celine, Mathieu
• German (Germany): Hans, Marlene
• Hindi (India): Aditi
Feel free to check out the AWS Polly documentation for the full list of voices and languages.
5. Testing and Validation:
• Give the language-switching a good test with different inputs to make sure it flows nicely.
• Double-check the STT and TTS results to ensure everything sounds and works just right in each language.
If you want, I’m here to help with more examples or any questions you have — just let me know!polite-dog-12152
08/24/2025, 4:43 AMtall-airline-16756
08/24/2025, 4:43 AMturn_detection=multilingual()
setting works with certain STT plugins in LiveKit, but just a heads-up—there’s no clear confirmation yet that AWS Transcribe or AWS Polly support this feature. Turn detection depends on STT output and VAD (Voice Activity Detection), so the STT provider needs to support multilingual or automatic language detection.
Key Points:
• Supported STT Providers: Some great options like Deepgram, Gladia, and AssemblyAI are known to work well with turn_detection=multilingual()
.
• AWS Compatibility: The docs don’t show any evidence that AWS Transcribe or Polly currently support this feature in LiveKit.
• TTS Irrelevance: Since turn detection uses STT output and VAD, TTS providers like AWS Polly aren’t directly involved here.
Recommendations:
1. Take a look at the official LiveKit docs for the list of supported STT plugins to see if AWS is included.
2. Make sure your STT provider supports real-time transcription and language detection if you want to use turn_detection=multilingual()
.
3. If AWS isn’t on the list, no worries—using a confirmed provider like Deepgram or Gladia is a solid choice for multilingual turn detection.
If you want, I’m happy to help you find the official list of supported STT plugins or guide you through integrating a custom STT provider. Just let me know!
Sources: LiveKit turn detector plugin | LiveKit Docs | LiveKit turn detector plugin | LiveKit Docs | examples/voice_agents/basic_agent.py | mcp/agent.py | Turn detection and interruptions | LiveKit Docspolite-dog-12152
08/24/2025, 4:52 AMtall-airline-16756
08/24/2025, 4:53 AMAgentSession
with some handy features and settings. Here’s a quick rundown of some great options you might want to explore:
1. Turn Detection and Interruptions:
• Pick from several turn detection methods like stt
, vad
, realtime_llm
, or even manual
.
• Customize how interruptions work with settings such as allow_interruptions
, min_interruption_duration
, and min_interruption_words
.
2. Endpointing Delays:
• Tweak min_endpointing_delay
and max_endpointing_delay
to get the timing just right for when turns end.
3. Preemptive Generation:
Turn on preemptive_generation=True
to kick off LLM and TTS processing as soon as user transcripts come in — this helps cut down response time.
4. Video Sampling:
Try using a video_sampler
function to smartly adjust video frame capture rates based on what the user is doing.
5. Session Timing and State Management:
• Set user_away_timeout
to decide when someone’s marked as “away” after being silent.
• Use agent_false_interruption_timeout
to handle those quiet interruptions smoothly.
6. Tool Execution Limits:
Keep the number of back-to-back tool calls per LLM turn in check with max_tool_steps
.
7. State Synchronization and RPC:
Make the most of LiveKit’s real-time state and data features to build your own custom state and control logic.
8. Performance Optimization:
Speed things up by preloading models using the prewarm
function in WorkerOptions
, so initialization feels snappier.
9. Telemetry and Metrics:
Plug in OpenTelemetry for trace-based observability and real-time metrics — perfect for digging into how your sessions are performing.
Example Configuration:
session = AgentSession(
vad=silero.VAD.load(),
stt=deepgram.STT(model="nova-3"),
llm=openai.LLM(model="gpt-4o-mini"),
tts=openai.TTS(voice="ash"),
preemptive_generation=True,
turn_detection=MultilingualModel(),
max_tool_steps=10,
user_away_timeout=30.0,
)
If you’d like a hand setting any of these up or want tips on optimizing your session, just give me a shout — I’m here to help!
Sources: README.md | Turn detection and interruptions | LiveKit Docs | Agent speech and audio | LiveKit Docs | Web and mobile frontends | LiveKit Docs | Silero VAD plugin | LiveKit Docspolite-dog-12152
08/24/2025, 4:55 AMtall-airline-16756
08/24/2025, 4:55 AMAgentSession
in LiveKit, here’s a friendly step-by-step guide:
1. Set Environment Variables:
• LANGFUSE_PUBLIC_KEY
• LANGFUSE_SECRET_KEY
• LANGFUSE_HOST
(this could be <https://cloud.langfuse.com>
or your own self-hosted setup)
2. Setup Langfuse Tracer Provider:
Use the setup_langfuse()
function to configure OpenTelemetry so your traces get sent to Langfuse:
from livekit.agents.telemetry import set_tracer_provider
import base64
import os
def setup_langfuse(metadata=None, host=None, public_key=None, secret_key=None):
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
public_key = public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
secret_key = secret_key or os.getenv("LANGFUSE_SECRET_KEY")
host = host or os.getenv("LANGFUSE_HOST")
if not public_key or not secret_key or not host:
raise ValueError("LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, and LANGFUSE_HOST must be set")
langfuse_auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = f"{host.rstrip('/')}/api/public/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}"
trace_provider = TracerProvider()
trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
set_tracer_provider(trace_provider, metadata=metadata)
return trace_provider
3. Initialize in the Entrypoint:
Just call setup_langfuse()
right at the start of your entrypoint function, before kicking off the `AgentSession`:
async def entrypoint(ctx: JobContext):
# Setup Langfuse tracing
trace_provider = setup_langfuse(metadata={"langfuse.session.id": ctx.room.name})
# Optional: flush traces on shutdown
async def flush_trace():
trace_provider.force_flush()
ctx.add_shutdown_callback(flush_trace)
# Start the agent session
session = AgentSession(vad=silero.VAD.load())
await session.start(agent=Kelly(), room=ctx.room)
4. Run the Agent:
Then just run your app using the LiveKit CLI worker system like this:
python langfuse_trace.py
Telemetry Data Collected:
• Session start
• Agent turns
• LLM node execution
• Function tool invocations
• TTS node processing
• End-of-turn detection events
All these traces will show up in your Langfuse dashboard, giving you great insights into how your sessions and agents are performing.
If you have any questions or want some tailored examples, just give me a shout—I’m here to help!
Sources: examples/voice_agents/restaurant_agent.py | Logs, metrics, and telemetry | LiveKit Docspolite-dog-12152
08/24/2025, 5:00 AMtall-airline-16756
08/24/2025, 5:01 AMctx.room
object in LiveKit is your go-to for all things related to a room. It gives you access to handy properties and events that help you stay on top of what's happening. Here’s a quick rundown of what you can check out:
1. Room Properties:
• Room Name: You can grab the room’s name using ctx.room.name
.
• Room Metadata: This lets you pull application-specific metadata that’s shared with everyone in the room (usually set via server APIs).
• Connection State: Keep an eye on the connection status, like whether it’s connected, disconnected, or reconnecting.
2. Participants:
• Participant Details: Get info about both local and remote participants, including who they are and their attributes.
• Participant Count: Easily track how many folks are in the room at any time.
3. Room Events:
• Participant Events: Listen for when someone joins or leaves with events like ParticipantConnected
and ParticipantDisconnected
.
• Track Events: Stay updated on media tracks being added or removed via TrackPublished
and TrackUnpublished
.
• Metadata Changes: React to updates with RoomMetadataChanged
events.
• Active Speakers: Keep tabs on who’s talking with ActiveSpeakersChanged
.
All these features make it easier for you to manage the room and keep things running smoothly, perfect for building dynamic, real-time apps. If you want me to walk you through any part or share some examples, just let me know—I’m here to help!
Sources: Room metadata | LiveKit Docs | Room & Participant Management | livekit/livekit_composite | DeepWiki | packages/livekit-rtc/src/room.ts | livekit-rtc/livekit/rtc/_proto/room_pb2.pyi | Inside a session | LiveKit Docspolite-dog-12152
08/24/2025, 5:02 AMtall-airline-16756
08/24/2025, 5:02 AM# Example of unique room names
room_name = f"user_{user_id}_{uuid.uuid4()}"
This way, you avoid reusing the same Room ID and keep each session nicely separated.
2. Route Inbound Calls to Individual Rooms (SIP Use Case):
If you’re working with Twilio Voice, try setting up a dispatch rule that sends each caller into their own room. Here’s an example:
{
"dispatch_rule": {
"rule": {
"dispatchRuleIndividual": {
"roomPrefix": "call-"
}
}
}
}
Then, apply the rule using the LiveKit CLI like this:
lk sip dispatch-rule create dispatch-rule.json
This helps make sure every incoming call gets its own session.
3. Ensure Unique Participant Identity:
It’s important that each participant in a room has a unique identity. If two people join with the same identity, the first one gets disconnected. So, when you generate access tokens, use unique user IDs for each participant.
By using unique room names, dispatch rules, and unique participant identities, you’ll be able to keep sessions nicely separated for every user. If you want, I’m here to help with more examples or any questions you have!
Sources: Avoiding Room ID Reuse When Re-Creating Rooms With the Same Name | Inbound calls with Twilio Voice | LiveKit Docs | Rooms, participants, and tracks | LiveKit Docspolite-dog-12152
08/24/2025, 5:06 AMtall-airline-16756
08/24/2025, 5:07 AMfrom livekit import api
lk_api = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
2. Create an Inbound SIP Trunk:
This is where you set up a trunk to handle incoming calls:
inbound_trunk_request = api.CreateSIPInboundTrunkRequest(
trunk=api.SIPInboundTrunkInfo(
name="My Inbound Trunk",
numbers=["+1234567890"], # Don’t forget to replace this with your actual numbers
krisp_enabled=True, # This turns on noise cancellation for clearer calls
),
)
inbound_trunk = await lk_api.sip.create_inbound_trunk(inbound_trunk_request)
3. Create a SIP Dispatch Rule:
This helps route each SIP user to their own unique room:
dispatch_rule_request = api.CreateSIPDispatchRuleRequest(
trunk_ids=[inbound_trunk.sip_trunk_id],
rule=api.SIPDispatchRule(
dispatch_rule_individual=api.SIPDispatchRuleIndividual(room_prefix="call-")
),
)
dispatch_rule = await lk_api.sip.create_sip_dispatch_rule(dispatch_rule_request)
4. Handle Incoming SIP Participants:
Every incoming SIP call will automatically create a new room and add a SIP participant. You’ll have access to details like phone number, call status, and call ID, so you can customize things just how you want.
5. Full Example:
import asyncio
import os
from livekit import api
async def setup_sip_isolation():
lk_api = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
# Create inbound trunk
inbound_trunk = await lk_api.sip.create_inbound_trunk(
api.CreateSIPInboundTrunkRequest(
trunk=api.SIPInboundTrunkInfo(name="Inbound", numbers=["+1234567890"])
)
)
# Create dispatch rule
dispatch_rule = await lk_api.sip.create_sip_dispatch_rule(
api.CreateSIPDispatchRuleRequest(
trunk_ids=[inbound_trunk.sip_trunk_id],
rule=api.SIPDispatchRule(
dispatch_rule_individual=api.SIPDispatchRuleIndividual(room_prefix="user-call")
),
)
)
print(f"Dispatch rule created: {dispatch_rule.sip_dispatch_rule_id}")
await asyncio.Future() # Keep the service running
if __name__ == "__main__":
asyncio.run(setup_sip_isolation())
And that’s it! This setup makes sure each SIP user gets their own unique room, keeping sessions nicely isolated. If you have any questions or want me to walk you through more examples, just let me know—I’m here to help!
Sources: Creating SIP Inbound Trunks and Dispatch Rules with Python SDK | livekit-api/livekit/api/sip_service.py | Inbound calls with Twilio Voice | LiveKit Docs | SIP participant | LiveKit Docs | telephony/make_call/make_call.pypolite-dog-12152
08/24/2025, 5:08 AMtall-airline-16756
08/24/2025, 5:08 AMctx.room.metadata
is always initialized with the right session-specific info. If it’s missing, maybe add a fallback or default value to keep things smooth.
2. Dynamic Room Names: If you’re juggling multiple users, it’s a good idea to keep room names unique to prevent any session mix-ups. Something like this works well: room_name = f"session_{uuid.uuid4()}"
3. Langfuse Metadata: Adding extra metadata like user IDs, call types, or timestamps can really help with tracking in Langfuse. For example: setup_langfuse(
metadata={
"langfuse.session.id": ctx.room.name,
"langfuse.user.id": participant.identity,
"langfuse.call.type": "SIP" if is_sip_call else "WEB"
})
4. Graceful Shutdown: It’s a good idea to add a shutdown callback to make sure everything (like MongoDB connections and Langfuse traces) closes properly when you’re done: async def shutdown():
transcription_logger.close()
caller_info.close()
await ctx.disconnect()
ctx.add_shutdown_callback(shutdown)
5. Concurrency: If you’re running multiple sessions at once, double-check that shared resources like MongoDB connections are thread-safe or consider using separate instances per session to avoid hiccups.
6. Error Logging: Adding a bit more context to your error logs can really speed up debugging. Something like this can help: logging.error(f"Error in entrypoint: {e}", exc_info=True)
Next Steps:
• Give the function a test run with both SIP and web calls to make sure all event handlers are firing as expected.
• Check out the Langfuse traces to confirm session metadata is being captured correctly.
• Consider optimizing your MongoDB queries if you expect to handle a large number of sessions for better scalability.
If you want to chat more or need help tweaking anything, just let me know—I’m here to help!polite-dog-12152
08/24/2025, 5:12 AMtall-airline-16756
08/24/2025, 5:12 AMpolite-dog-12152
08/24/2025, 5:13 AMtall-airline-16756
08/24/2025, 5:13 AMimport uuid
room_name = f"session_{uuid.uuid4()}"
This creates a unique ID for each session, so there’s no chance of overlap.
2. Assign Unique Participant Identities:
Make sure every participant joining a room has their own unique identity. This helps avoid any mix-ups where two people might accidentally share the same ID.
3. Create a New Session for Each Room:
For every new room, spin up a fresh AgentSession
instance. Like this:
async def start_session(room_name):
session = AgentSession(
use_tts_aligned_transcript=True,
user_away_timeout=30,
max_tool_steps=10
)
agent = IrishAgent(
llm_instance=llm,
tool=[info_tool],
vad=vad
)
await session.start(agent=agent, room=room_name, room_input_options=RoomInputOptions(close_on_disconnect=False))
4. Handle Room Cleanup:
When a session ends, it’s important to clean up the room and any resources to keep things tidy:
@ctx.room.on("disconnected")
def on_room_disconnected():
print(f"Room {ctx.room.name} disconnected. Cleaning up resources.")
# Perform cleanup tasks here
5. SIP Integration (Optional):
If you’re working with SIP calls, you can set dispatch rules to send each caller to their own unique room:
dispatch_rule_request = api.CreateSIPDispatchRuleRequest(
trunk_ids=[inbound_trunk.sip_trunk_id],
rule=api.SIPDispatchRule(
dispatch_rule_individual=api.SIPDispatchRuleIndividual(room_prefix="call-")
),
)
dispatch_rule = await lk_api.sip.create_sip_dispatch_rule(dispatch_rule_request)
6. Monitor Active Sessions:
It’s a good idea to keep track of all active sessions with a session manager. Here’s a quick example:
active_sessions = {}
def add_session(session_id, session):
active_sessions[session_id] = session
def remove_session(session_id):
if session_id in active_sessions:
del active_sessions[session_id]
If you’d like, I’m here to help with any part of this setup—just let me know!polite-dog-12152
08/24/2025, 5:17 AMtall-airline-16756
08/24/2025, 5:18 AMimport uuid
# Generate a unique room name
unique_room_name = f"session_{uuid.uuid4()}"
ctx.room.name = unique_room_name
This little step makes sure every session stays nicely isolated by giving it a unique room name.
2. Track Active Sessions:
Keep a dictionary handy to track all active sessions. Add this near the top of your entrypoint function:
active_sessions = {}
def add_session(session_id, session):
active_sessions[session_id] = session
def remove_session(session_id):
if session_id in active_sessions:
del active_sessions[session_id]
When you start a session, just add it to the dictionary like this:
add_session(unique_room_name, session)
And when the session wraps up, remove it with:
remove_session(unique_room_name)
3. Ensure Room Cleanup:
Use your existing on_room_disconnected
handler to tidy things up for the session that just ended:
@ctx.room.on("disconnected")
def on_room_disconnected():
print(f"🏠 Room {ctx.room.name} disconnected - cleaning up resources.")
remove_session(ctx.room.name)
if current_call_id and transcription_logger:
transcription_logger.close_transcription()
caller_info.end_call(current_call_id)
transcription_logger.remove_txt()
print(f"✅ Call {current_call_id} ended and transcription saved to MongoDB")
4. SIP Integration (Optional):
If you’re working with SIP calls, just ensure each SIP participant gets routed to their own unique room. You can use the dispatch rule logic from earlier to make this happen.
5. No Structural Changes:
The best part? These tweaks don’t require you to overhaul your existing logic. They simply help manage sessions better by keeping room names unique and cleaning up properly.
If anything’s unclear or you want me to walk you through some examples, just give me a shout!polite-dog-12152
08/24/2025, 6:22 AMtall-airline-16756
08/24/2025, 6:23 AMimport uuid
def generate_auth_token():
return str(uuid.uuid4())
2. Assign Room Name: Use that token to make a unique room name for the caller: auth_token = generate_auth_token()
room_name = f"sip_{auth_token}"
3. Pass the Token: Include the token in the SIP participant’s attributes or metadata when you create the session. Like this: participant_request = api.CreateSIPParticipantRequest(
room_name=room_name,
sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
sip_call_to=caller_number,
participant_identity=auth_token,
metadata={"auth_token": auth_token}
)
4. Track Sessions: Use the token to keep an eye on active sessions and clean up properly when the session ends: active_sessions[auth_token] = room_name
5. Cleanup: When the session wraps up, remove the token and any related resources: del active_sessions[auth_token]
Considerations:
• Make sure the token is generated and stored securely to keep unauthorized folks out.
• Use the token to log session details for easier troubleshooting and tracking.
• Check the token on the server side to confirm it’s in the right format and contains the expected data.
If you’d like, I’m here to help you with any part of setting this up—just let me know!polite-dog-12152
08/24/2025, 6:25 AMtall-airline-16756
08/24/2025, 6:25 AMimport uuid
def generate_auth_token():
"""Generate a unique authentication token."""
return str(uuid.uuid4())
2. Assign a Unique Room Name:
Next, use that token to create a unique room name:
auth_token = generate_auth_token()
room_name = f"sip_{auth_token}"
3. Create a SIP Participant:
When a SIP call starts, create a SIP participant and include the token in the metadata to keep things organized:
from livekit import api
participant_request = api.CreateSIPParticipantRequest(
room_name=room_name,
sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
sip_call_to="+1234567890", # Just swap this with the caller's number
participant_identity=auth_token,
metadata={"auth_token": auth_token}
)
# Use the LiveKit API to create the participant
lk_api = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
await lk_api.sip.create_sip_participant(participant_request)
4. Track Active Sessions:
Keep a handy dictionary to track which sessions are active by their auth token:
active_sessions = {}
# Add the session to the active sessions
active_sessions[auth_token] = room_name
5. Handle Session Cleanup:
When the session wraps up, make sure to remove the token and tidy up the resources:
@ctx.room.on("disconnected")
def on_room_disconnected():
"""Handle room disconnection and clean up resources."""
if auth_token in active_sessions:
print(f"🏠 Room {active_sessions[auth_token]} disconnected. Cleaning up resources.")
del active_sessions[auth_token]
6. Full Example:
Here’s the whole thing put together for you:
import uuid
import os
from livekit import api
# Step 1: Generate a unique auth token
def generate_auth_token():
return str(uuid.uuid4())
# Step 2: Assign a unique room name
auth_token = generate_auth_token()
room_name = f"sip_{auth_token}"
# Step 3: Create a SIP participant
participant_request = api.CreateSIPParticipantRequest(
room_name=room_name,
sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
sip_call_to="+1234567890", # Just swap this with the caller's number
participant_identity=auth_token,
metadata={"auth_token": auth_token}
)
lk_api = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
# Step 4: Track active sessions
active_sessions = {}
active_sessions[auth_token] = room_name
# Step 5: Handle session cleanup
@ctx.room.on("disconnected")
def on_room_disconnected():
if auth_token in active_sessions:
print(f"🏠 Room {active_sessions[auth_token]} disconnected. Cleaning up resources.")
del active_sessions[auth_token]
# Step 6: Create the participant
await lk_api.sip.create_sip_participant(participant_request)
This setup makes sure every SIP caller gets their own unique room with an auth token, and everything gets cleaned up nicely when the session ends. If you have any questions or want to tweak things further, just give me a shout!polite-dog-12152
08/24/2025, 6:28 AMtall-airline-16756
08/24/2025, 6:28 AMimport uuid
# Generate a unique auth token and room name
auth_token = str(uuid.uuid4())
room_name = f"sip_{auth_token}"
ctx.room.name = room_name
This way, each session gets its own unique room name based on the auth token.
2. Create the SIP Participant:
Next, after you’ve generated the room name, create the SIP participant using the LiveKit API like this:
from livekit import api
participant_request = api.CreateSIPParticipantRequest(
room_name=room_name,
sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
sip_call_to="+1234567890", # Swap in the caller's number here
participant_identity=auth_token,
metadata={"auth_token": auth_token}
)
lk_api = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
await lk_api.sip.create_sip_participant(participant_request)
Just pop this in right after you create the auth token and room name.
3. Track Active Sessions:
It’s helpful to keep track of active sessions. Add this at the top of your entrypoint function:
active_sessions = {}
# Add the session to the active sessions
active_sessions[auth_token] = room_name
Make sure to update this dictionary right after you create the SIP participant.
4. Handle Session Cleanup:
Use the existing on_room_disconnected
handler to tidy up resources when a session ends:
@ctx.room.on("disconnected")
def on_room_disconnected():
if auth_token in active_sessions:
print(f"🏠 Room {active_sessions[auth_token]} disconnected. Cleaning up resources.")
del active_sessions[auth_token]
5. Full Integration:
Here’s a complete picture of how your updated entrypoint function could look:
async def entrypoint(ctx: JobContext):
try:
# Generate a unique auth token and room name
auth_token = str(uuid.uuid4())
room_name = f"sip_{auth_token}"
ctx.room.name = room_name
# Set up Langfuse
setup_langfuse(
metadata={
"langfuse.session.id": ctx.room.name,
"langfuse.session": ctx.room.metadata
}
)
print("🚀 Starting LiveKit agent...")
await ctx.connect()
# Create SIP participant
participant_request = api.CreateSIPParticipantRequest(
room_name=room_name,
sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
sip_call_to="+1234567890",
participant_identity=auth_token,
metadata={"auth_token": auth_token}
)
lk_api = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
await lk_api.sip.create_sip_participant(participant_request)
# Track active sessions
active_sessions[auth_token] = room_name
# Handle room disconnect
@ctx.room.on("disconnected")
def on_room_disconnected():
if auth_token in active_sessions:
print(f"🏠 Room {active_sessions[auth_token]} disconnected. Cleaning up resources.")
del active_sessions[auth_token]
# Continue with the rest of your session setup...
except Exception as e:
print(f"❌ Error: {e}")
raise
This setup makes sure every SIP caller gets their own unique room, and that everything gets cleaned up nicely when the session wraps up. If anything’s unclear or you want to dive deeper, just give me a shout!polite-dog-12152
08/24/2025, 6:55 AMtall-airline-16756
08/24/2025, 6:55 AMcall-
). You’ve nailed this part in your code:
rule = api.SIPDispatchRule(
dispatch_rule_individual=api.SIPDispatchRuleIndividual(
room_prefix='call-',
)
)
request = api.CreateSIPDispatchRuleRequest(
dispatch_rule=api.SIPDispatchRuleInfo(
rule=rule,
name='My dispatch rule',
trunk_ids=[],
room_config=api.RoomConfiguration(
agents=[api.RoomAgentDispatch(
agent_name="inbound-agent",
metadata="job dispatch metadata",
)]
)
)
)
dispatch = await lkapi.sip.create_sip_dispatch_rule(request)
print("created dispatch", dispatch)
Just a couple of friendly suggestions:
• Make sure trunk_ids
includes the right SIP trunk IDs so your rule links up perfectly with your inbound trunk.
• Double-check the room_prefix
to avoid any clashes with existing room names.
2. Set Up Langfuse:
You’ve set up Langfuse nicely with session metadata. Just be sure that ctx.room.name
and ctx.room.metadata
are properly set before this step:
setup_langfuse(
metadata={
"langfuse.session.id": ctx.room.name,
"langfuse.sesssion": ctx.room.metadata,
}
)
A couple of quick tips here:
• Take a peek at the spelling of "langfuse.sesssion"
— looks like it might be a typo and should be "langfuse.session"
.
• Think about adding extra metadata like caller ID or call type to make tracing even easier.
3. Connect to the Room:
Once your dispatch rule and Langfuse are set, you connect to the room with:
await ctx.connect()
This step gets your agent all set and ready to handle those incoming calls routed by your dispatch rule.
4. Full Example:
Here’s the full code with a few small tweaks for you:
lkapi = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
# Create a dispatch rule to place each caller in a separate room
rule = api.SIPDispatchRule(
dispatch_rule_individual=api.SIPDispatchRuleIndividual(
room_prefix='call-',
)
)
request = api.CreateSIPDispatchRuleRequest(
dispatch_rule=api.SIPDispatchRuleInfo(
rule=rule,
name='My dispatch rule',
trunk_ids=["your_trunk_id"], # Replace with actual trunk IDs
room_config=api.RoomConfiguration(
agents=[api.RoomAgentDispatch(
agent_name="inbound-agent",
metadata="job dispatch metadata",
)]
)
)
)
dispatch = await lkapi.sip.create_sip_dispatch_rule(request)
print("created dispatch", dispatch)
await lkapi.aclose()
# Set up Langfuse
setup_langfuse(
metadata={
"langfuse.session.id": ctx.room.name,
"langfuse.session": ctx.room.metadata,
}
)
print("🚀 Starting LiveKit agent...")
await ctx.connect()
If anything’s unclear or if you want to dive deeper into any part, just give me a shout—I’m here to help!polite-dog-12152
08/24/2025, 6:57 AMtall-airline-16756
08/24/2025, 6:58 AMdispatch_rule_individual
configuration. Here’s a quick rundown of how it works:
1. Dispatch Rule with `dispatch_rule_individual`:
This rule makes sure every incoming SIP call gets its own unique room. The room_prefix
(like call-
) helps create room names on the fly for each caller. For example:
• First caller: call-12345
• Second caller: call-67890
2. Room Isolation:
Each room created this way is completely separate, so people in different rooms won’t hear or interact with each other. This keeps conversations private and neat.
3. Room Configuration:
The room_config
part of your code assigns an agent to each room, making sure someone’s there to handle the user’s session in that specific room.
4. Validation:
If you want to double-check that separate rooms are being made, you can:
• Log the room names generated for each call.
• Take a peek at the LiveKit dashboard to see active rooms and participants.
Example:
When a SIP call comes in, the dispatch rule creates a room named something like call-abc123
, where abc123
is a unique ID for that call. Every new call gets its own new room with a different name.
Feel free to reach out if you want a hand testing or verifying this setup—I’m here to help!polite-dog-12152
08/24/2025, 7:00 AMtall-airline-16756
08/24/2025, 7:00 AMtrunk_ids
in the dispatch rule:
1. What’s a Trunk ID?
Think of a trunk ID as a unique name tag for a SIP trunk you’ve set up in LiveKit. SIP trunks are what handle your inbound and outbound calls. Each trunk comes with its own special ID, which you can find on the LiveKit dashboard or grab programmatically through the API.
2. How to Find Trunk IDs Programmatically:
If you’re not sure what your trunk ID is, no worries! You can easily list all your trunks using the LiveKit API like this:
from livekit import api
lk_api = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
# List all SIP trunks
trunks = await lk_api.sip.list_sip_trunks()
for trunk in trunks.items:
print(f"Trunk Name: {trunk.name}, Trunk ID: {trunk.sip_trunk_id}")
This will show you the names and IDs of all your SIP trunks. Just use the sip_trunk_id
for the trunk_ids
field when setting things up.
3. Here’s an Example with a Real Trunk ID:
Once you’ve got your trunk ID, swap out "your_trunk_id"
with the actual one like this:
rule = api.SIPDispatchRule(
dispatch_rule_individual=api.SIPDispatchRuleIndividual(
room_prefix='call-',
)
)
request = api.CreateSIPDispatchRuleRequest(
dispatch_rule=api.SIPDispatchRuleInfo(
rule=rule,
name='My dispatch rule',
trunk_ids=["trunk_12345"], # Just replace this with your real trunk ID
room_config=api.RoomConfiguration(
agents=[api.RoomAgentDispatch(
agent_name="inbound-agent",
metadata="job dispatch metadata",
)]
)
)
)
dispatch = await lk_api.sip.create_sip_dispatch_rule(request)
print("Created dispatch rule:", dispatch)
4. Double-Check Your Dispatch Rule:
After you’ve created the rule, you can make sure it’s all set by listing your dispatch rules like this:
dispatch_rules = await lk_api.sip.list_sip_dispatch_rule()
for rule in dispatch_rules.items:
print(f"Dispatch Rule Name: {rule.name}, Trunk IDs: {rule.trunk_ids}")
5. Give It a Test:
When a call comes in on the trunk you specified, the dispatch rule will send it to a unique room with the prefix call-
. For example:
• First call: call-abc123
• Second call: call-def456
If anything’s unclear or you want a hand with this, just let me know—I’m here to help!polite-dog-12152
08/24/2025, 7:02 AMtall-airline-16756
08/24/2025, 7:02 AMCreateSIPInboundTrunkRequest
API to set up a new SIP trunk. Here's an example:
from livekit import api
lk_api = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
# Create a new SIP trunk
inbound_trunk_request = api.CreateSIPInboundTrunkRequest(
trunk=api.SIPInboundTrunkInfo(
name="My Inbound Trunk", # Give your trunk a name
numbers=["+1234567890"], # Swap this out for your actual phone numbers
krisp_enabled=True # Turn on noise cancellation for clearer calls
)
)
inbound_trunk = await lk_api.sip.create_inbound_trunk(inbound_trunk_request)
print(f"Created SIP trunk: {inbound_trunk.sip_trunk_id}")
This will create your SIP trunk and give you its unique ID (sip_trunk_id
).
2. Use the Trunk ID in Dispatch Rules:
After creating the trunk, you can plug its ID into your dispatch rule like this:
rule = api.SIPDispatchRule(
dispatch_rule_individual=api.SIPDispatchRuleIndividual(
room_prefix='call-',
)
)
request = api.CreateSIPDispatchRuleRequest(
dispatch_rule=api.SIPDispatchRuleInfo(
rule=rule,
name='My dispatch rule',
trunk_ids=[inbound_trunk.sip_trunk_id], # Here’s where you use the trunk ID
room_config=api.RoomConfiguration(
agents=[api.RoomAgentDispatch(
agent_name="inbound-agent",
metadata="job dispatch metadata",
)]
)
)
)
dispatch = await lk_api.sip.create_sip_dispatch_rule(request)
print("Created dispatch rule:", dispatch)
3. Verify the Trunk:
You can easily check all your SIP trunks to make sure yours was created:
trunks = await lk_api.sip.list_sip_trunks()
for trunk in trunks.items:
print(f"Trunk Name: {trunk.name}, Trunk ID: {trunk.sip_trunk_id}")
4. Testing:
Once everything’s set up, try making a call to the phone number you specified (+1234567890
in this example). The call should route to a unique room based on your dispatch rule.
If you have any questions or want me to walk you through any part in more detail, just let me know—I’m here to help!polite-dog-12152
08/24/2025, 7:09 AMpolite-dog-12152
08/24/2025, 7:09 AMtall-airline-16756
08/24/2025, 7:09 AMroom_name = "my-room"
agent_name = "test-agent"
3. Creating the Access Token:
The create_token_with_agent_dispatch
function makes a JWT (JSON Web Token) that includes:
• Identity: The participant’s ID is set to "my_participant"
.
• Grants: It gives permission for the participant to join the specified room (room_name
).
• Room Configuration: The room is set up with an agent dispatch, like this:
RoomConfiguration(
agents=[
RoomAgentDispatch(
agent_name="test-agent",
metadata='{"user_id": "12345"}'
)
],
)
This means the agent test-agent
is assigned to the room, and you can include extra info like user_id
in the metadata.
4. Generating the JWT:
Using the to_jwt()
method, the token is turned into a JWT string that the participant can use to authenticate:
token = AccessToken()
.with_identity("my_participant")
.with_grants(VideoGrants(room_join=True, room=room_name))
.with_room_config(RoomConfiguration(...))
.to_jwt()
5. Returning the Token:
Finally, the function hands back the generated JWT:
return token
Use Case:
This is handy when you want to:
• Authenticate someone so they can join a specific room.
• Dynamically assign an agent to that room.
• Add metadata for the agent, like user-specific details.
Example Output:
The function returns a JWT string the client can use to join the room and connect with the assigned agent.
If you have any questions or want me to walk you through more examples, just let me know—I'm happy to help!tall-airline-16756
08/24/2025, 7:10 AMfrom livekit.api import (
AccessToken,
RoomAgentDispatch,
RoomConfiguration,
VideoGrants,
)
room_name = "my-room"
agent_name = "test-agent"
def create_token_with_agent_dispatch() -> str:
token = (
AccessToken()
.with_identity("my_participant") # Gives a unique identity to the participant
.with_grants(VideoGrants(room_join=True, room=room_name)) # Allows joining the room
.with_room_config(
RoomConfiguration(
agents=[
RoomAgentDispatch(agent_name="test-agent", metadata='{"user_id": "12345"}')
], # Sets which agent will be dispatched to the room
),
)
.to_jwt() # Converts the token into a JWT string
)
return token
Key Points:
• `AccessToken`: This is what you use to create a token for a participant.
• `VideoGrants`: Grants permission for the participant to join the room.
• `RoomConfiguration`: Lets you set up the room with an agent dispatch.
• `RoomAgentDispatch`: Specifies which agent will be assigned to the room, plus any extra info you want to add.
This token lets a participant join the room and interact with the agent you’ve set up.
2. Explicit Agent Dispatch:
This snippet shows how to explicitly create an agent dispatch for a room using the LiveKit API.
import asyncio
from livekit import api
room_name = "my-room"
agent_name = "test-agent"
async def create_explicit_dispatch():
lkapi = api.LiveKitAPI()
# Create an agent dispatch for the room
dispatch = await lkapi.agent_dispatch.create_dispatch(
api.CreateAgentDispatchRequest(
agent_name=agent_name, room=room_name, metadata='{"user_id": "12345"}'
)
)
print("created dispatch", dispatch)
# List all dispatches in the room
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
print(f"there are {len(dispatches)} dispatches in {room_name}")
await lkapi.aclose()
asyncio.run(create_explicit_dispatch())
Key Points:
• `create_dispatch`: This creates a dispatch for a specific agent and room.
• `list_dispatch`: Lists all the active dispatches in that room.
• `metadata`: Lets you add extra details like a user ID to the dispatch.
This way is great if you want to assign agents to rooms on the fly and keep track of active dispatches.
Use Case:
Both snippets are here to help you manage agent assignments in LiveKit rooms. The first one creates a token for a participant with an agent dispatch set up, while the second one lets you create and list agent dispatches explicitly. You can definitely use them together to keep agent interactions smooth during your real-time communication sessions.
If you want me to walk you through anything else or give you more examples, just let me know!polite-dog-12152
08/24/2025, 7:10 AMtall-airline-16756
08/24/2025, 7:11 AMroom_name
) and assigns an agent (agent_name
) to manage interactions there. The metadata
field is super handy for adding extra info, like details about the user.
2. Adapting for Inbound Calls:
For inbound calls, you can create a unique room for each call on the fly and assign an agent to handle it. Here’s a simple way to adjust the code:
import asyncio
from livekit import api
async def handle_inbound_call(caller_id):
# Generate a unique room name for the inbound call
room_name = f"inbound-call-{caller_id}"
agent_name = "test-agent"
# Initialize LiveKit API client
lkapi = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
# Create an agent dispatch for the room
dispatch = await lkapi.agent_dispatch.create_dispatch(
api.CreateAgentDispatchRequest(
agent_name=agent_name,
room=room_name,
metadata=f'{{"caller_id": "{caller_id}"}}'
)
)
print("Created dispatch for inbound call:", dispatch)
# List all dispatches in the room
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
print(f"There are {len(dispatches)} dispatches in {room_name}")
await lkapi.aclose()
# Simulate handling an inbound call
asyncio.run(handle_inbound_call("+1234567890"))
3. Key Changes for Inbound Calls:
• Dynamic Room Name: The room name is created dynamically using the caller ID (like inbound-call-+1234567890
).
• Metadata: The metadata now includes the caller ID, which is great for logging or routing.
4. Workflow:
1. When an inbound call comes in, generate a unique room name for it.
2. Create an agent dispatch for that room, assigning an agent to take the call.
3. You can also log or keep track of dispatches to monitor things smoothly.
5. Testing:
To try this out, just simulate an inbound call by running the handle_inbound_call
function with a sample caller ID. Check that a new room pops up and your agent is assigned properly.
If you have any questions or want me to share more examples, just let me know—I’m here to help!polite-dog-12152
08/24/2025, 7:19 AMtall-airline-16756
08/24/2025, 7:19 AMuuid
module to whip up a unique identifier for every call:
import uuid
# Generate a unique ID for the call
unique_id = str(uuid.uuid4())
room_name = f"inbound-call-{unique_id}"
This way, each call gets its own unique room name, even if the caller ID is missing.
2. Updated Code:
Here’s the updated snippet to handle inbound calls without needing a caller ID:
import asyncio
import uuid
from livekit import api
async def handle_inbound_call():
# Generate a unique ID for the call
unique_id = str(uuid.uuid4())
room_name = f"inbound-call-{unique_id}"
agent_name = "test-agent"
# Initialize LiveKit API client
lkapi = api.LiveKitAPI(
url=os.getenv("LIVEKIT_URL"),
api_key=os.getenv("LIVEKIT_API_KEY"),
api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
# Create an agent dispatch for the room
dispatch = await lkapi.agent_dispatch.create_dispatch(
api.CreateAgentDispatchRequest(
agent_name=agent_name,
room=room_name,
metadata=f'{{"unique_id": "{unique_id}"}}'
)
)
print("Created dispatch for inbound call:", dispatch)
# List all dispatches in the room
dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
print(f"There are {len(dispatches)} dispatches in {room_name}")
await lkapi.aclose()
# Simulate handling an inbound call
asyncio.run(handle_inbound_call())
3. Key Changes:
• Unique ID: We generate a UUID for each call, so every room name stays unique.
• Metadata: The unique ID is tucked into the metadata to help with tracking.
4. Workflow:
1. When a call comes in, generate a unique ID for it.
2. Create a room name using that ID (like inbound-call-abc123
).
3. Set up an agent dispatch for that room, so someone’s ready to take the call.
5. Benefits:
• Keeps room names unique for every call, no matter what.
• Gives you flexibility if caller ID isn’t available or you want to skip it.
If you want me to walk you through anything else or need more examples, just give me a shout!polite-dog-12152
08/24/2025, 7:21 AMtall-airline-16756
08/24/2025, 7:22 AMgenerate_auth_token()
to assign a unique ID for each inbound call is a smart move. This way, each user gets their own unique room, which keeps conversations nicely separated.
2. Langfuse Integration:
Langfuse is set up just right with session metadata, so you can track each session individually. Just make sure ctx.room.name
and ctx.room.metadata
are properly initialized before this step to keep everything smooth.
3. Transcription Logging:
Initializing the transcription logger for each session and calling start_transcription
with the right session ID means transcription data stays organized per user. Great job on that!
4. Agent Session:
The AgentSession
is configured well, with options like use_tts_aligned_transcript
, user_away_timeout
, and max_tool_steps
tailored to your needs. This really helps customize the session experience.
5. Event Handlers:
You’ve set up thorough event handlers for user speech, agent speech, and conversation items, making sure every interaction is properly logged and handled. That’s key for keeping things running smoothly.
6. Session Cleanup:
The handlers for on_participant_disconnected
and on_room_disconnected
take care of cleaning up resources when sessions end, including closing transcription logs and ending calls. Nice and tidy!
7. Handling Multiple Users:
By creating unique rooms and sessions dynamically for each user, your code effectively keeps conversations separate and avoids overlap. Each user gets their own isolated session with dedicated logging and agent interactions.
Suggestions:
• Just double-check that ctx.room.name
and ctx.room.metadata
are correctly initialized for every session.
• It might help to add more detailed error handling in the handle_inbound_call
function to catch and recover from any hiccups during room creation or dispatch.
• Try testing with multiple users at once to make sure sessions stay isolated and there aren’t any resource conflicts.
If you want, I’m here to help with any further questions or improvements you might need!polite-dog-12152
08/24/2025, 7:34 AMtall-airline-16756
08/24/2025, 7:34 AMsid
property like this:
# Example in Python
room_sid = ctx.room.sid
print(f"Room SID: {room_sid}")
Just a heads-up, the sid
property works asynchronously and will give you the unique room identifier once it’s ready.
2. A Couple of Quick Notes:
• The LiveKit server assigns the room SID when you connect, so each room session gets its own unique ID.
• Make sure your room is connected before trying to grab the SID, or it won’t be available yet.
3. Why You Might Need It:
The room SID comes in handy for things like logging, debugging, or linking specific sessions with other systems you’re using.
If you want me to walk you through this for other SDKs or need more examples, just let me know—I’m here to help!
Sources: livekit-rtc/livekit/rtc/_proto/room_pb2.py | livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt | lib/src/core/room.dart | livekit/src/room/mod.rs | src/room/Room.tspolite-dog-12152
08/24/2025, 9:56 AMtall-airline-16756
08/24/2025, 9:56 AMclass LangfuseTracker:
def __init__(self):
self.logs = []
def log(self, node_type, input_data, output_data):
log_entry = {
"node_type": node_type,
"input": input_data,
"output": output_data
}
self.logs.append(log_entry)
print(f"Logged {node_type}: {log_entry}")
2. Override STT, LLM, and TTS Nodes:
In your custom agent class, override the nodes to log their input and output like this:
from livekit.agents import Agent
class CustomAgent(Agent):
def __init__(self, langfuse_tracker):
super().__init__()
self.langfuse_tracker = langfuse_tracker
async def stt_node(self, audio, model_settings):
async for event in super().stt_node(audio, model_settings):
if hasattr(event, 'alternatives') and event.alternatives:
input_audio = audio # Log raw audio if needed
output_text = event.alternatives[0].text
self.langfuse_tracker.log("STT", input_audio, output_text)
yield event
async def llm_node(self, chat_ctx, tools, model_settings=None):
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
input_text = chat_ctx.messages[-1].content # Last user message
output_text = chunk.delta.content if hasattr(chunk, 'delta') else str(chunk)
self.langfuse_tracker.log("LLM", input_text, output_text)
yield chunk
async def tts_node(self, text, model_settings):
async for audio_frame in super().tts_node(text, model_settings):
input_text = text
output_audio = audio_frame # Log raw audio frames if needed
self.langfuse_tracker.log("TTS", input_text, output_audio)
yield audio_frame
3. Integrate the Custom Agent:
Just pass your Langfuse tracker to your custom agent like this:
langfuse_tracker = LangfuseTracker()
agent = CustomAgent(langfuse_tracker=langfuse_tracker)
4. Benefits:
• You’ll be tracking input and output for STT, LLM, and TTS nodes in real-time.
• The logs come in handy for debugging, analytics, or auditing.
• It’s super flexible—you can add extra metadata or customize the processing however you like.
If you want, I’m here to help with more examples or any questions you have!polite-dog-12152
08/24/2025, 9:58 AMtall-airline-16756
08/24/2025, 9:59 AMLangfuseTracker
class is built to log inputs and outputs from each node. You can even extend it to send these logs to external tools like Langfuse for telemetry and analytics.
2. Overridden Node Methods:
In your custom agent class, the stt_node
, llm_node
, and tts_node
methods get overridden so you can catch and log the data:
• STT Node: Captures raw audio input and the transcribed text output.
• LLM Node: Logs the user’s input text and the LLM’s response.
• TTS Node: Logs the text input and the generated audio output.
3. Integration:
You pass the LangfuseTracker
instance into your custom agent, so all nodes share the same tracker for logging. This keeps everything neat and centralized.
4. Use Case:
This setup is super helpful for:
• Debugging: Pinpointing any hiccups in the STT, LLM, or TTS processes.
• Analytics: Understanding user interactions and how the agent responds.
• Auditing: Keeping a clear record of inputs and outputs for compliance or review.
5. Example Workflow:
Here’s what happens when a user speaks:
1. The stt_node
handles the audio and logs the transcribed text.
2. The llm_node
processes that text and logs the LLM’s reply.
3. The tts_node
takes the response text and logs the generated audio.
If you want me to clarify anything or help add more features, just let me know—I’m here to help!polite-dog-12152
08/24/2025, 10:03 AMtall-airline-16756
08/24/2025, 10:04 AMpolite-dog-12152
08/24/2025, 10:04 AMtall-airline-16756
08/24/2025, 10:04 AMclass LangfuseTracker:
def __init__(self):
self.logs = []
def log(self, node_type, input_data, output_data):
log_entry = {
"node_type": node_type,
"input": input_data,
"output": output_data
}
self.logs.append(log_entry)
print(f"Logged {node_type}: {log_entry}")
2. Custom Agent Class:
from livekit.agents import Agent
class CustomAgent(Agent):
def __init__(self, langfuse_tracker):
super().__init__()
self.langfuse_tracker = langfuse_tracker
async def stt_node(self, audio, model_settings):
async for event in super().stt_node(audio, model_settings):
if hasattr(event, 'alternatives') and event.alternatives:
input_audio = audio # Log raw audio if needed
output_text = event.alternatives[0].text
self.langfuse_tracker.log("STT", input_audio, output_text)
yield event
async def llm_node(self, chat_ctx, tools, model_settings=None):
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
input_text = chat_ctx.messages[-1].content # Last user message
output_text = chunk.delta.content if hasattr(chunk, 'delta') else str(chunk)
self.langfuse_tracker.log("LLM", input_text, output_text)
yield chunk
async def tts_node(self, text, model_settings):
async for audio_frame in super().tts_node(text, model_settings):
input_text = text
output_audio = audio_frame # Log raw audio frames if needed
self.langfuse_tracker.log("TTS", input_text, output_audio)
yield audio_frame
3. Integration with Entry Point:
import asyncio
from livekit import api
async def entrypoint(ctx):
try:
# Initialize Langfuse tracker
langfuse_tracker = LangfuseTracker()
# Create the custom agent
agent = CustomAgent(langfuse_tracker=langfuse_tracker)
# Set up the session
session = AgentSession(
use_tts_aligned_transcript=True,
user_away_timeout=30,
max_tool_steps=10
)
# Start the session
await session.start(agent=agent, room=ctx.room, room_input_options=RoomInputOptions(close_on_disconnect=False))
print("✅ Session started")
except Exception as e:
print(f"❌ Error: {e}")
raise
4. Workflow:
1. When a user speaks, the stt_node
logs the audio input and transcribed text.
2. The llm_node
logs the user’s input text and the LLM’s response.
3. The tts_node
logs the response text and the generated audio.
5. Benefits:
• Tracks all interactions in real-time for debugging, analytics, and auditing.
• Centralized logging with the LangfuseTracker
class.
• Customizable to include additional metadata or processing logic.
If you have any questions or want to tweak anything, just let me know! I’m here to help.polite-dog-12152
08/24/2025, 10:27 AMtall-airline-16756
08/24/2025, 10:28 AMLanguageSwitchingAgent
. You can extend it to include the Langfuse tracker like this:
class TrackedLanguageSwitchingAgent(LanguageSwitchingAgent):
def __init__(self, langfuse_tracker, *args, **kwargs):
super().__init__(*args, **kwargs)
self.langfuse_tracker = langfuse_tracker
async def stt_node(self, audio, model_settings):
async for event in super().stt_node(audio, model_settings):
if hasattr(event, 'alternatives') and event.alternatives:
input_audio = audio # Log raw audio if needed
output_text = event.alternatives[0].text
self.langfuse_tracker.log("STT", input_audio, output_text)
yield event
async def llm_node(self, chat_ctx, tools, model_settings=None):
async for chunk in super().llm_node(chat_ctx, tools, model_settings):
input_text = chat_ctx.messages[-1].content # Last user message
output_text = chunk.delta.content if hasattr(chunk, 'delta') else str(chunk)
self.langfuse_tracker.log("LLM", input_text, output_text)
yield chunk
async def tts_node(self, text, model_settings):
async for audio_frame in super().tts_node(text, model_settings):
input_text = text
output_audio = audio_frame # Log raw audio frames if needed
self.langfuse_tracker.log("TTS", input_text, output_audio)
yield audio_frame
2. Initialize the Tracker and Use the New Agent:
When you set up your entrypoint or session, just initialize the Langfuse tracker and use this new agent class like so:
async def entrypoint(ctx):
try:
# Initialize Langfuse tracker
langfuse_tracker = LangfuseTracker()
# Create the tracked language-switching agent
agent = TrackedLanguageSwitchingAgent(
langfuse_tracker=langfuse_tracker,
# Pass other arguments required by your LanguageSwitchingAgent
tools=[info_tool],
llm_instance=llm,
vad=vad
)
# Set up the session
session = AgentSession(
use_tts_aligned_transcript=True,
user_away_timeout=30,
max_tool_steps=10
)
# Start the session
await session.start(agent=agent, room=ctx.room, room_input_options=RoomInputOptions(close_on_disconnect=False))
print("✅ Session started")
except Exception as e:
print(f"❌ Error: {e}")
raise
3. Why This Works Well:
• You get to reuse your existing language-switching logic without any hassle.
• Tracking for STT, LLM, and TTS nodes is added smoothly and seamlessly.
• All your logs are centralized with the Langfuse tracker, making debugging and analytics a breeze.
4. A Quick Testing Checklist:
Before you wrap up, just check that:
• Language switching behaves exactly as you expect.
• Input and output for STT, LLM, and TTS nodes are properly logged.
If you have any questions or want to explore more enhancements, just let me know—I’m here to help!