how to stop session when no user prasent in room
# ask-ai
p
how to stop session when no user prasent in room
t
Thanks for reaching out, arpan. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
To stop a session when no user is present in a LiveKit room, you can set up a simple inactivity detection system. Here’s a friendly way to do it: 1. Keep an Eye on User Status: Use the
user_state_changed
event to notice when a user goes inactive (like switching to "away"). 2. Handle the Inactivity: Try reaching out to the inactive user a few times. If they don’t respond, it’s probably time to close the session. 3. Wrap Up the Session and Delete the Room: Once the user doesn’t respond, close the session and delete the room to free up resources. Here’s a simple example in Python to get you started:
Copy code
async def user_presence_task():
    # Try to ping the user 3 times
    for _ in range(3):
        await session.generate_reply(
            instructions="The user has been inactive. Please respond if you are still present."
        )
        await asyncio.sleep(10)
    # Close the session and delete the room
    await session.aclose()
    ctx.delete_room()

# Monitor user state changes
@session.on("user_state_changed")
def _user_state_changed(ev: UserStateChangedEvent):
    nonlocal inactivity_task
    if ev.new_state == "away":
        # Start inactivity check when user becomes away
        inactivity_task = asyncio.create_task(user_presence_task())
        return
    # Cancel inactivity task if user becomes active again
    if inactivity_task is not None:
        inactivity_task.cancel()
This method helps keep things running smoothly by cleaning up the room when it’s no longer needed. If you want to chat more about it or need a hand, just let me know! Sources: Plugin System | livekit/livekit_composite | DeepWiki | Agents Core Architecture | livekit/livekit_composite | DeepWiki | Voice Assistants | livekit/livekit_composite | DeepWiki | examples/voice_agents/inactive_user.py | Voice Assistants | livekit/livekit_composite | DeepWiki
p
how check No participants left
t
If you want to check whether there are no participants left in a LiveKit room, here are a couple of friendly ways to do it: 1. Take a peek at the Remote Participants List: You can simply check if the list of remote participants is empty like this:
Copy code
if len(room.remote_participants) == 0:
    print("No participants left in the room")
2. Keep an eye on Participant Disconnections: By using the
participant_disconnected
event, you can catch when someone leaves and then see if the room is empty:
Copy code
@room.on("participant_disconnected")
def on_participant_disconnected(participant):
    if len(room.remote_participants) == 0:
        print("No participants left in the room")
If you want a hand with setting this up or have any questions, just give me a shout—I’m here to help! Sources: Anatomy of an Agent | LiveKit Docs | livekit-rtc/livekit/rtc/room.py
p
full example
t
Here’s a full example to help you check if there are no participants left in a LiveKit room:
Copy code
import asyncio
from livekit import rtc

# Create a future that will be set when the room is empty
room_empty_future = asyncio.get_running_loop().create_future()

# Define a callback for when a participant disconnects
def on_participant_disconnected(_: rtc.Participant, fut=room_empty_future):
    if len(ctx.room.remote_participants) == 0 and not fut.done():
        fut.set_result(None)

# Attach the callback to the room's participant_disconnected event
ctx.room.on("participant_disconnected", on_participant_disconnected)

try:
    # Wait for the future to be resolved
    await asyncio.wait([room_empty_future], return_when=asyncio.FIRST_COMPLETED)
finally:
    # Cleanup the event listener
    ctx.room.off("participant_disconnected", on_participant_disconnected)
    # Ensure the session is closed properly
    await session.aclose()
Explanation:Future Setup: We create a future to keep an eye on when the room becomes empty. • Event Listener: We listen for the
participant_disconnected
event to check how many participants are still around. • Room Cleanup: Once the room is empty, the future is resolved, and we remove the event listener to tidy things up. This way, you’ll know exactly when the room is empty and can handle any cleanup or next steps smoothly. If anything’s unclear or you want to dive deeper, just let me know—I’m here to help! Sources: Anatomy of an Agent | LiveKit Docs
p
how to mask stt output before reach llm
t
Hey there! If you want to mask the STT (speech-to-text) output before it gets to an LLM in LiveKit, a great way is to override the
stt_node
method in your custom Agent class. This lets you tweak the transcribed text just the way you want before passing it along. Here’s a simple example to get you started:
Copy code
from livekit import rtc
from livekit.agents import ModelSettings, stt, Agent
from typing import AsyncIterable, Optional

class MaskingAgent(Agent):
    def __init__(self):
        super().__init__(
            instructions="Your agent instructions here",
            stt=your_stt_provider,
            llm=your_llm_provider,
            tts=your_tts_provider,
            vad=your_vad_provider
        )

    async def stt_node(
        self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
    ) -> Optional[AsyncIterable[stt.SpeechEvent]]:
        # First, get the default STT processing
        async for event in Agent.default.stt_node(self, audio, model_settings):
            # Here’s where you can post-process the transcribed text
            if hasattr(event, 'alternatives') and event.alternatives:
                # Let’s mask any sensitive info in the transcript
                original_text = event.alternatives[0].text
                masked_text = self.mask_sensitive_content(original_text)
                event.alternatives[0].text = masked_text
            yield event

    def mask_sensitive_content(self, text: str) -> str:
        # This is where your masking magic happens
        import re

        # Mask credit card numbers
        text = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CREDIT_CARD]', text)

        # Mask phone numbers
        text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE_NUMBER]', text)

        # Feel free to add more patterns to mask as needed
        return text
Here’s a quick rundown: • Override the
stt_node
method to catch and tweak the STT output. • Create a handy function like
mask_sensitive_content
to swap out sensitive info (think credit cards, phone numbers) with placeholders. • Yield the updated events so the LLM only sees the masked text. This way, you’ve got full control over what the LLM receives from the STT. If you want me to walk you through anything or have questions, just give me a shout—I’m here to help! Sources: Pipeline nodes and hooks | LiveKit Docs | Voice pipeline nodes | LiveKit Docs
p
can you provide me some metric dashbord blog or link
t
Hey there! I’ve put together some handy resources and examples to help you create metric dashboards in LiveKit. Check them out: 1. Complete Metrics Dashboard Example: You’ll find a thorough example in the LiveKit examples repository. It includes: ◦ A Flask-based metrics server with API endpoints to receive and serve your metrics data ◦ A real-time HTML dashboard built with Bootstrap and Chart.js to visualize everything nicely ◦ Support for various metric types like LLM, STT, TTS, EOU, and VAD metrics ◦ Live updates every 2 seconds, showing JSON data so you’re always in the loop 1. Documentation Resources:Logs, metrics, and telemetry guide – a great place to get familiar with the basics ◦ Capturing metrics (v0.x) – if you’re working with the older version 1. Metrics Examples Repository: The LiveKit Recipes page has direct links to specific metrics examples, including LLM, STT, TTS, and VAD. Super useful if you want to see things in action! 2. Server-Level Dashboards: There’s a Grafana dashboard config ready for tracking server metrics like rooms, participants, message rates, and network stats. Perfect for keeping an eye on the big picture. 3. LiveKit Cloud Analytics: If you’re using LiveKit Cloud, it comes with built-in analytics dashboards that offer detailed real-time media insights — think track-level stats like bitrate, frame rate, packet loss, and simulcast layer changes. If you want me to walk you through anything or need some specific examples, just give me a shout. I’m here to help! Sources: Logs, metrics, and telemetry | LiveKit Docs | Capturing metrics | LiveKit Docs | LiveKit | Recipes and examples for voice AI and more. | LiveKit Docs | deploy/README.md | Announcing LiveKit Cloud
p
can you tell me about langfuse is it track voice agent pipeline responses
t
Hey there! Just wanted to share how Langfuse can be seamlessly integrated with LiveKit to help you keep track of voice agent pipeline responses. It works as a tracer provider, so you can easily trace agent sessions and all the interactions happening within the voice pipeline (think STT → LLM → TTS). Here’s what it can do for you: • Uses OpenTelemetry to send traces over to Langfuse. • Keeps track of session-level info like session IDs, so you get detailed tracing. • Supports tracing voice agent interactions, including function calls, agent transfers, and how conversations flow. Getting started is pretty straightforward:
Copy code
def setup_langfuse(
    metadata: dict[str, AttributeValue] | None = None,
    *,
    host: str | None = None,
    public_key: str | None = None,
    secret_key: str | None = None,
) -> TracerProvider:
Just make sure you have these environment variables set up:
LANGFUSE_PUBLIC_KEY
,
LANGFUSE_SECRET_KEY
, and
LANGFUSE_HOST
. Want to see it in action? Check out the LiveKit repo’s example where they trace agents like: • Kelly: Powered by GPT-4o-mini, Deepgram STT, and OpenAI TTS. • Alloy: Running on OpenAI’s real-time model. The traces capture user turns and session details, giving you a clear, full picture of what’s happening in your voice agent pipeline. If you have any questions or want a hand setting it up, just let me know—I’m here to help! Sources: Logs, metrics, and telemetry | LiveKit Docs | examples/voice_agents/langfuse_trace.py
p
how to remove thinking response and menstion of tool
t
If you'd like to remove those 'thinking' responses and mentions of tools in your LiveKit voice agents, here are some friendly ways to do it: 1. Using
before_tts_cb
with VoicePipelineAgent:
This handy callback lets you catch the text before it goes to the TTS engine. Here's a simple example:
Copy code
async def _before_tts_cb(agent: VoicePipelineAgent, text: str | AsyncIterable[str]):
    if isinstance(text, str):
        # For non-streaming text
        return text.replace("[thinking]", "")
    else:
        # For streaming text
        async def process_stream():
            async for chunk in text:
                yield chunk.replace("[thinking]", "")
        return process_stream()

agent = VoicePipelineAgent(
    vad=ctx.proc.userdata["vad"],
    stt=openai.STT(),
    llm=openai.LLM.with_groq(model="deepseek-r1-distill-llama-70b"),
    tts=openai.TTS(),
    before_tts_cb=_before_tts_cb
)
2. Overriding
llm_node
in a Custom Agent Class:
If you want more control, you can tweak the
llm_node
method to clean up the LLM output before it reaches the TTS engine. Here's how:
Copy code
class CustomAgent(Agent):
    async def llm_node(self, chat_ctx, tools, model_settings=None):
        async def process_stream():
            async with self.llm.chat(chat_ctx=chat_ctx, tools=tools) as stream:
                async for chunk in stream:
                    content = getattr(chunk.delta, 'content', None)
                    if content:
                        chunk.delta.content = content.replace("[thinking]", "")
                    yield chunk
        return process_stream()
3. Built-in Thinking Token Handling: LiveKit also has a
strip_thinking_tokens
function that conveniently removes those "thinking" tags from the content for you. 4. Removing Tool Mentions: You can expand your text-cleaning logic to get rid of tool-related phrases as well:
Copy code
def clean_text(text: str) -> str:
    text = text.replace("[thinking]", "")
    tool_phrases = ["I'll use the", "Let me call the", "Using the tool"]
    for phrase in tool_phrases:
        text = text.replace(phrase, "")
    return text.strip()
These tips should help make your agent’s responses feel smoother and more natural. If you want a hand implementing any of these or have more questions, just let me know—I’m here to help! Sources: chain-of-thought-tts/agent.py | Modifying LLM output before TTS | LiveKit Docs | livekit-agents/livekit/agents/llm/utils.py
p
explain this async def tts_node( self, text: AsyncIterable[str], model_settings: ModelSettings ) -> AsyncIterable[rtc.AudioFrame]: # TTS node allows us to process the text before it's sent to the model # here we'll strip out markdown filtered_text = filter_markdown(text) return super().tts_node(filtered_text, model_settings)
t
The
tts_node
method in LiveKit is a super handy, customizable pipeline node that helps you tweak text before it goes to the Text-To-Speech (TTS) model to create audio. It gives you the power to adjust the text output from the Large Language Model (LLM) just the way you want before it’s turned into speech. How It Works: 1. The method takes in an asynchronous stream of text (
AsyncIterable[str]
) and processes it however you need. 2. In the example here, the text gets filtered with a function like
filter_markdown
to clean out any markdown formatting:
Copy code
async def tts_node(
    self, text: AsyncIterable[str], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
    # Process the text to remove markdown
    filtered_text = filter_markdown(text)
    # Pass the processed text to the default TTS implementation
    return super().tts_node(filtered_text, model_settings)
Key Features:Text Preprocessing: You can change the text before it reaches the TTS model—whether that’s removing markdown, swapping out words, or tweaking pronunciation. • Custom TTS Logic: Feel free to build your own TTS engines or add special audio processing after the speech is generated. • Streaming Support: It works smoothly with asynchronous text streams, so you can process text in real-time. Use Case: In this example, the
filter_markdown
function makes sure the TTS model gets clean text without markdown, which helps the speech sound more natural and clear. If you want me to walk you through anything else or share more examples, just let me know—I’m here to help! Sources: pipeline-tts/short_replies_only.py | Agents 0.x migration guide | LiveKit Docs | Voice pipeline nodes | LiveKit Docs | livekit-agents/livekit/agents/voice/generation.py
p
i want to modify sst before it reach llm and trancribed
t
Hey there! If you want to tweak the STT output before it reaches the LLM in LiveKit, you’ve got two great options: override either the
stt_node()
or
transcription_node()
methods in your custom Agent class. Here’s a quick rundown on how to do it: 1. Using the
stt_node()
Method:
This one lets you customize the whole STT process. Here’s an example:
Copy code
async def stt_node(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings) -> Optional[AsyncIterable[stt.SpeechEvent]]:
    # Add your custom processing before STT here
    events = Agent.default.stt_node(self, audio, model_settings)
    # Add your custom processing after STT here
    return events
2. Using the
transcription_node()
Method:
This method is perfect if you want to modify the transcription text right before it hits the LLM. Check this out:
Copy code
async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings):
    replacements = {
        "hello": "👋 HELLO",
        "goodbye": "GOODBYE 👋",
    }

    async def process_text():
        async for chunk in text:
            modified_chunk = chunk
            for word, replacement in replacements.items():
                modified_chunk = modified_chunk.replace(word, replacement)
            yield modified_chunk

    return process_text()
Some handy features to keep in mind: • You can clean up transcription text by removing filler words, fixing pronunciations, or even adding some fun formatting. • It works with streaming chunks, so your real-time performance stays smooth. • Go for
stt_node()
if you want to customize the whole STT flow, or
transcription_node()
if you just want to tweak the text. If anything’s unclear or you want more examples, just give me a shout—I’m happy to help! Sources: Pipeline nodes and hooks | LiveKit Docs | pipeline-llm/replacing_llm_output.py
p
is this fine async def stt_node( self, text : AsyncIterable[str], model_settings: ModelSettings ) -> AsyncIterable[rtc.AudioFrame]: """Content Filtering""" filtered_text = self.mask_sensitive_content(text) return super().stt_node(filtered_text, model_settings)
t
You're really close with your implementation! Just a couple of tweaks will make it work smoothly. The
stt_node
method actually handles audio frames, not text, so the parameter should be
audio
instead of
text
. Also, the
mask_sensitive_content
function needs to be applied to the transcribed text, not the raw audio frames. Here's an updated version for you:
Copy code
async def stt_node(
    self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
    """Content Filtering"""
    async for event in super().stt_node(audio, model_settings):
        if hasattr(event, 'alternatives') and event.alternatives:
            # Apply masking to the transcribed text
            original_text = event.alternatives[0].text
            filtered_text = self.mask_sensitive_content(original_text)
            event.alternatives[0].text = filtered_text
        yield event
Here’s what to keep in mind: • The
stt_node
method works with audio frames, so make sure the input parameter is
audio
. • Apply
mask_sensitive_content
to the transcribed text inside the STT events—not the audio frames themselves. • Don’t forget to yield the updated events back to the pipeline. If anything’s unclear or you want to dive deeper, just let me know—I’m here to help!
p
yes more deeprt
t
Let me walk you through how to properly implement and use the
stt_node
method to tweak the STT output in LiveKit—it’s simpler than it sounds! What’s the
stt_node
for?
The
stt_node
method is a key part of the LiveKit voice pipeline. It takes in audio frames (
rtc.AudioFrame
) and turns them into speech events (
stt.SpeechEvent
) with the transcribed text. By customizing this method, you get to catch and adjust the transcribed text before it moves on to the next step (like feeding into an LLM). Here’s how you can implement it:
Copy code
async def stt_node(
    self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
    """Custom STT Node with Content Filtering"""
    # Start by calling the default STT processing to get speech events
    async for event in super().stt_node(audio, model_settings):
        if hasattr(event, 'alternatives') and event.alternatives:
            # Grab the transcribed text from the first alternative
            original_text = event.alternatives[0].text

            # Apply your own filtering or masking here
            filtered_text = self.mask_sensitive_content(original_text)

            # Swap out the original text for your filtered version
            event.alternatives[0].text = filtered_text

        # Send the updated event back into the pipeline
        yield event
What’s going on here?Audio Input: The
audio
parameter is an async stream of audio frames (
AsyncIterable[rtc.AudioFrame]
). • Speech Events: The default
stt_node
processes these frames and produces
stt.SpeechEvent
objects, which have the transcribed text inside the
alternatives
attribute. • Text Modification: The
mask_sensitive_content
function is where you can filter or adjust the text—like hiding phone numbers or credit card info. • Yielding Events: After you tweak the text, the updated event is sent back to keep the pipeline moving. Here’s a quick example of `mask_sensitive_content`:
Copy code
def mask_sensitive_content(self, text: str) -> str:
    """Mask sensitive information in the transcribed text."""
    import re

    # Hide credit card numbers
    text = re.sub(r'\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b', '[CREDIT_CARD]', text)

    # Hide phone numbers
    text = re.sub(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', '[PHONE_NUMBER]', text)

    # You can add more patterns here as needed
    return text
How it all fits together: 1. The
stt_node
method gets audio frames and runs them through the default STT processing. 2. For each speech event, it grabs the transcribed text and runs it through your
mask_sensitive_content
function. 3. The updated event—with filtered text—is yielded back, so only the cleaned-up version moves forward to the LLM. Why this is great: • You can easily preprocess transcribed text to protect privacy or meet other custom needs. • It keeps things running smoothly in real-time by handling text in streaming chunks. • It fits right into the LiveKit voice pipeline without a hitch. If you want me to go over anything in more detail or need more examples, just let me know—I’m happy to help!
p
this is my masking class import re sensitive_words = [ # Credentials "password", "pass", "pwd", "secret", "confidential", "api_key", "token", "jwt", "private_key", "ssh_key", # PII "ssn", "aadhar", "pan", "passport", "driver_license", "dob", "date_of_birth", "address", "home_address", # Finance "credit_card", "debit_card", "bank_account", "iban", "swift", "cvv", "pin", "otp" ] patterns = { # Auth/Secrets "password": r"(?i)\b(?:password|pass|pwd)\b", "api_key": r"(?i)\b(?:api[-]?key|access[-]?token|secret|private[_-]?key)\b", "jwt": r"(?i)\b(?:eyJ[a-zA-Z0-9-]+\.[a-zA-Z0-9-]+\.[a-zA-Z0-9-_]+)\b", # Finance "credit_card": r"\b(?:\d[ -]*?){13,16}\b", "bank_account": r"\b[0-9]{9,18}\b", "iban": r"\b[A-Z]{2}[0-9]{2}[A-Z0-9]{11,30}\b", # Government IDs "ssn": r"\b\d{3}-\d{2}-\d{4}\b", "aadhar": r"\b\d{12}\b", "pan": r"\b[A-Z]{5}[0-9]{4}[A-Z]{1}\b", # Contact "email": r"[\w\.-]+@[\w\.-]+\.\w+", "phone": r"\+?[0-9][0-9\- ]{7,14}", # Network/Infra "ip": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", "url": r"https?:\/\/[^\s]+", # Health "mrn": r"\bMRN[: ]?\d{6,10}\b" } class ContentOptimizer: def __init__(self,banned_words=None, patterns=None, Normalize=None, mask="****") -> None: self.banned_words = banned_words or sensitive_words self.patterns = patterns or { "email": r"[\w\.-]+@[\w\.-]+", "credit_card": r"\b(?:\d[ -]*?){13,16}\b", "password": r"(?i)\b(?:password|pass|pwd)\b" } self.Normalize = Normalize or {} self.mask = mask def normalize_text(self, text : str) ->str: """Fixed mispronounced or misheard words""" for wrong, correct in self.Normalize.items(): text = text.replace(wrong, correct) return text def mask_sensitive(self, text: str) ->str: """Fixed Sensitive Information""" if self.banned_words: parttern = re.compile(r'\b(?:' + '|'.join(re.escape(word) for word in self.banned_words) + r')\b', re.IGNORECASE) for _,regex in self.patterns.items(): text = re.sub(regex, self.mask, text) return text def process(self, text: str) ->str: try: text = self.normalize_text(text) text = self.mask_sensitive(text) return text except Exception as e: raise e
t
Your masking class is thoughtfully designed and offers a solid way to handle sensitive information. Let me walk you through how it works and how it fits nicely with the
stt_node
method: Key Features of Your Masking Class:Banned Words: A handy list of sensitive words (like "password," "credit_card") that get masked automatically. • Regex Patterns: Smart patterns to spot sensitive info such as email addresses, credit card numbers, and government IDs. • Normalization: A neat method to fix mispronounced or misheard words using a dictionary of replacements. • Masking: Replaces sensitive details with a default mask (like "****") to keep things safe. Integration with `stt_node`: You can easily plug the
ContentOptimizer
class into your
stt_node
method to clean up the transcribed text. Here’s a simple way to do it:
Copy code
from typing import AsyncIterable
from livekit import rtc
from livekit.agents import ModelSettings

class CustomAgent(Agent):
    def __init__(self):
        super().__init__(
            instructions="Your agent instructions here",
            stt=your_stt_provider,
            llm=your_llm_provider,
            tts=your_tts_provider,
            vad=your_vad_provider
        )
        self.content_optimizer = ContentOptimizer()

    async def stt_node(
        self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
    ) -> AsyncIterable[rtc.AudioFrame]:
        async for event in super().stt_node(audio, model_settings):
            if hasattr(event, 'alternatives') and event.alternatives:
                # Grab the transcribed text
                original_text = event.alternatives[0].text

                # Run it through ContentOptimizer
                filtered_text = self.content_optimizer.process(original_text)

                # Swap out the original text with the cleaned-up version
                event.alternatives[0].text = filtered_text

            yield event
How It Works: 1. The
stt_node
method takes in audio frames and produces speech events. 2. Each speech event’s transcribed text gets sent to
ContentOptimizer.process()
. 3.
process()
normalizes the text and masks any sensitive info using your patterns and banned words. 4. The updated text replaces the original in the speech event, which then continues through the pipeline. Benefits: • Keeps sensitive info safe before it reaches the LLM. • Makes the text more accurate by fixing common errors. • Offers a clean, modular way to handle text processing that you can reuse easily. If you want to dive deeper or add more features, just let me know—I’m happy to help!
p
is this agent class fine or separate language and stt
t
Hey there! Your current agent class, which combines both language processing (LLM) and STT, works well if your setup involves a tightly connected pipeline where the STT output goes straight to the LLM for the next step. But, if you’re looking for more flexibility and modularity, splitting the language and STT parts into separate classes or modules might be a great option. Here’s a quick rundown to help you weigh your choices: Integrated Agent Class:Advantages: ◦ Easy to manage and deploy as one simple unit. ◦ STT and LLM components communicate directly. ◦ Perfect for straightforward pipelines with little need for customization. • Disadvantages: ◦ Harder to scale or swap out individual parts (like changing STT or LLM providers). ◦ Less modular, which can make debugging or adding features trickier. Separate Language and STT Components:Advantages: ◦ More modular, so you can update or replace STT and LLM independently. ◦ Better scalability, since you can deploy and scale each part on its own. ◦ Makes testing and debugging individual components easier. • Disadvantages: ◦ Needs a bit more effort to handle communication between the parts. ◦ Could add some delay if components run on separate systems. My two cents: If you find yourself needing to update or customize the STT or LLM often, going with separate classes or modules is probably the way to go. But if your pipeline is steady and you want to keep things simple, sticking with the integrated agent class should do the trick. Feel free to reach out if you want a hand restructuring your agent class or setting up a modular design—I’m here to help!
p
yes separate that it call after language selaction
t
Hey there! To keep things neat and manageable, you can separate the STT and language parts by designing a modular setup. This way, the STT handles the audio-to-text conversion on its own, then passes the text over to the language module once you've picked the language. Here's a friendly breakdown of how you might set it up: 1. STT Module: This part takes care of turning your audio into text. It can also do some preprocessing, like masking sensitive info before passing it along.
Copy code
class STTProcessor:
    def __init__(self, stt_provider, content_optimizer):
        self.stt_provider = stt_provider
        self.content_optimizer = content_optimizer

    async def process_audio(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings) -> str:
        async for event in self.stt_provider.stt_node(audio, model_settings):
            if hasattr(event, 'alternatives') and event.alternatives:
                text = event.alternatives[0].text
                # Apply masking or preprocessing
                return self.content_optimizer.process(text)
        return ""
2. Language Module: This module takes the text from the STT and chats with the language model to get the response you want.
Copy code
class LanguageProcessor:
    def __init__(self, llm_provider):
        self.llm_provider = llm_provider

    async def process_text(self, text: str, language: str) -> str:
        # Use the selected language model to process the text
        response = await self.llm_provider.chat(text, language=language)
        return response
3. Integration: The main agent ties everything together. After you choose the language, it sends the STT output to the right language processor.
Copy code
class ModularAgent:
    def __init__(self, stt_processor, language_processors):
        self.stt_processor = stt_processor
        self.language_processors = language_processors

    async def handle_audio(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings, language: str):
        # Process audio to get text
        text = await self.stt_processor.process_audio(audio, model_settings)

        # Pass text to the selected language processor
        if language in self.language_processors:
            response = await self.language_processors[language].process_text(text, language)
            return response
        else:
            raise ValueError(f"Unsupported language: {language}")
Why this is great:Modularity: STT and language parts work independently, so you can update or swap them out easily. • Flexibility: You can support lots of languages by picking the right processor on the fly. • Scalability: Each module can grow or shrink based on what you need. If you want, I’m here to help you get this up and running or answer any questions you have!
p
let suppose my workflow user call -> greet choose language or human agent if human then transfer or language agent to stt node or llm node what will better create multi agent when user say somting besed on that take action or tool call
t
Hey there! For the workflow you described—where a user calls in, gets greeted, picks a language or a human agent, and then moves on to either STT/LLM processing or a human transfer—I’d suggest going with a multi-agent setup. It’s a neat way to keep things modular, scalable, and flexible, adapting dynamically based on what the user chooses. Here’s a friendly breakdown of how you might set it up: 1. Workflow Overview:Step 1: The user calls and gets a warm greeting from a primary agent. • Step 2: The primary agent helps with language selection or connects the user to a human agent if they prefer. • Step 3: If the user picks a language, they’re routed to a language-specific agent that handles STT and LLM tasks. • Step 4: Based on what the user says, the language agent decides what to do next or calls on tools as needed. 2. Multi-Agent Design: Here’s a simple way to think about the agents: 1. Primary Agent: ◦ Greets the user and helps with choosing a language. ◦ Transfers the call to a human agent when requested. ◦ Routes the user to the right language agent based on their choice. 1. Language Agents: ◦ Each one handles STT and LLM processing for a specific language. ◦ They’re smart about making decisions and calling tools depending on what the user says. 3. Dynamic Action Handling: Inside each language agent, you can build a decision-making system that listens to the user and acts accordingly:
Copy code
class LanguageAgent:
    def __init__(self, stt_processor, llm_processor, tools):
        self.stt_processor = stt_processor
        self.llm_processor = llm_processor
        self.tools = tools

    async def handle_user_input(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings):
        # Turn the audio into text
        text = await self.stt_processor.process_audio(audio, model_settings)

        # Send the text to the LLM for processing
        response = await self.llm_processor.process_text(text)

        # Check the response and decide what to do
        if "tool" in response:
            tool_name = self.extract_tool_name(response)
            result = await self.tools[tool_name].execute(response)
            return result
        else:
            return response

    def extract_tool_name(self, response: str) -> str:
        # Grab the tool name from the LLM response
        return response.split()[1]  # Just an example
4. Benefits of Multi-Agent Design:Modularity: Each agent has its own job, which makes everything easier to manage and grow. • Scalability: You can scale language agents up or down depending on how many users you have. • Flexibility: Agents can handle all kinds of user inputs and take the right actions on the fly. • Seamless Human Transfer: The primary agent smoothly connects users to humans without messing with the language agents. If you want, I’m here to help you dive deeper into setting this up or tackling any specific parts. Just let me know!
p
tell me one thing how it understand when what tool call need
t
Hey there! So, when it comes to deciding when and which tool to call, that’s usually handled by the LLM (Large Language Model) inside the language agent. Basically, the LLM looks at what you’ve said and figures out the best response — sometimes that means triggering a tool to help out. Here’s a friendly walkthrough of how you could set this up: 1. LLM Response Parsing: The LLM is set up with instructions to include tool-related actions in its replies when needed. For example, it might respond with something like this:
Copy code
{
  "action": "call_tool",
  "tool_name": "weather_api",
  "parameters": {
    "location": "New York"
  }
}
Your language agent can then read this and know exactly what to do next. 2. Tool Registry: Keep a handy list of all your tools, each with a name and a way to run it. Something like this:
Copy code
tools = {
    "weather_api": WeatherAPI(),
    "calendar": CalendarTool(),
    "email": EmailTool()
}
3. Decision-Making Logic: Inside your language agent, you’d have some logic that looks at the LLM’s response and calls the right tool:
Copy code
class LanguageAgent:
    def __init__(self, stt_processor, llm_processor, tools):
        self.stt_processor = stt_processor
        self.llm_processor = llm_processor
        self.tools = tools

    async def handle_user_input(self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings):
        # Turn audio into text
        text = await self.stt_processor.process_audio(audio, model_settings)

        # Send the text to the LLM and get a response
        response = await self.llm_processor.process_text(text)

        # Check if the response says to call a tool
        if "action" in response and response["action"] == "call_tool":
            tool_name = response["tool_name"]
            parameters = response.get("parameters", {})

            # Call the tool if it exists
            if tool_name in self.tools:
                result = await self.tools[tool_name].execute(**parameters)
                return result
            else:
                return f"Tool {tool_name} not found."
        else:
            return response
4. Tool Implementation: Each tool should have an
execute
method that takes the right parameters and does its job. For example:
Copy code
class WeatherAPI:
    async def execute(self, location):
        # Fetch the weather and return it
        return f"The weather in {location} is sunny."

class CalendarTool:
    async def execute(self, event_name, date):
        # Add an event to the calendar
        return f"Event '{event_name}' added to your calendar on {date}."
5. Training the LLM: To help the LLM get the hang of when to call a tool, you can give it examples and instructions during fine-tuning or prompt engineering. For instance: • Example Prompt: "If the user asks about the weather, respond with an action to call the weather API." • Example Input: "What’s the weather in New York?" • Example Output:
Copy code
{
  "action": "call_tool",
  "tool_name": "weather_api",
  "parameters": {
    "location": "New York"
  }
}
Why this approach is pretty great: • It’s dynamic and flexible — the LLM decides when a tool is needed based on what the user says. • It’s scalable — you can easily add new tools without changing the core logic. • It’s customizable — you can fine-tune the LLM to handle specific tools and use cases. If you want me to help you dive deeper or get this set up, just let me know! I’m here to help.
p
give me a call transfer tool
t
Here’s a handy example of a call transfer tool you can set up in LiveKit: Cold Transfer (Direct Transfer): This approach uses SIP REFER to send a call straight to another number or endpoint.
Copy code
from livekit import api
from livekit.protocol import sip as proto_sip

async def transfer_call(participant_identity: str, transfer_to: str):
    """Transfer a SIP call to another number using SIP REFER."""
    # Initialize LiveKit API client
    livekit_api = api.LiveKitAPI(
        url=os.getenv('LIVEKIT_URL'),
        api_key=os.getenv('LIVEKIT_API_KEY'),
        api_secret=os.getenv('LIVEKIT_API_SECRET')
    )

    # Create transfer request
    transfer_request = proto_sip.TransferSIPParticipantRequest(
        participant_identity=participant_identity,
        room_name="your_room_name",  # Replace with the actual room name
        transfer_to=transfer_to,  # e.g., "tel:+15105550100"
        play_dialtone=True
    )

    # Execute transfer
    await livekit_api.sip.transfer_sip_participant(transfer_request)
Warm Transfer (Supervisor Escalation): If you want to pass the call along with some context, you can first connect to a supervisor and then merge the calls.
Copy code
class SessionManager:
    async def start_transfer(self):
        # Put customer on hold
        self.start_hold()

        # Create supervisor room
        supervisor_room_name = self.customer_room.name + "-supervisor"
        self.supervisor_room = rtc.Room()

        # Connect to supervisor room
        await self.supervisor_room.connect(os.getenv("LIVEKIT_URL"), token.to_jwt())

        # Dial supervisor
        await self.lkapi.sip.create_sip_participant(
            api.CreateSIPParticipantRequest(
                sip_trunk_id="your_sip_trunk_id",
                sip_call_to="supervisor_number",
                room_name=supervisor_room_name,
                participant_identity="supervisor-sip",
                wait_until_answered=True,
            )
        )

    async def merge_calls(self):
        # Move supervisor to customer room
        await self.lkapi.room.move_participant(
            api.MoveParticipantRequest(
                room=self.supervisor_room.name,
                identity="supervisor-sip",
                destination_room=self.customer_room.name,
            )
        )

        self.stop_hold()
        await self.customer_session.say("You are on the line with my supervisor.")
DTMF-Based Transfer: For those phone menu systems, you can listen for DTMF tones to trigger transfers easily.
Copy code
def _setup_event_handlers(self, room: rtc.Room):
    @room.on("sip_dtmf_received")
    def handle_dtmf(dtmf_event: rtc.SipDTMF):
        digit = dtmf_event.digit
        identity = dtmf_event.participant.identity

        department_numbers = {
            "1": ("BILLING_PHONE_NUMBER", "Billing"),
            "2": ("TECH_SUPPORT_PHONE_NUMBER", "Tech Support"),
            "3": ("CUSTOMER_SERVICE_PHONE_NUMBER", "Customer Service")
        }

        if digit in department_numbers:
            env_var, dept_name = department_numbers[digit]
            transfer_number = f"tel:{os.getenv(env_var)}"
            asyncio.create_task(self._handle_transfer(identity, transfer_number, dept_name))
If you want a hand with setting this up or tweaking it to fit your needs, just let me know—I’m happy to help! Sources: Transferring calls | LiveKit Docs | Company directory phone assistant | LiveKit Docs | examples/warm-transfer/README.md | agent.py
p
switch voice
t
Hey there! If you want to switch voices during a LiveKit session, here are a few easy ways to do it: 1. Using
update_options
Method:
You can change the TTS voice on the fly with the
update_options
method. The new voice will kick in with the next thing that’s spoken.
Copy code
session.tts.update_options(
    voice=hume.VoiceByName(name="Colton Rivers", provider=hume.VoiceProvider.hume),
    description="The voice exudes calm, serene, and peaceful qualities, like a gentle stream flowing through a quiet forest.",
    speed=2,
)
2. Using Function Tools for Agent Switching: If you want to have multiple voices ready, you can create different agents and switch between them using function tools. Here’s a quick example:
Copy code
class RimeAgent(Agent):
    def __init__(self):
        super().__init__(
            tts=rime.TTS(model="mistv2", speaker="abbie"),
            # ... other config
        )

    @function_tool
    async def switch_to_elevenlabs(self):
        """Switch to ElevenLabs TTS voice"""
        return ElevenLabsAgent()

    @function_tool
    async def switch_to_cartesia(self):
        """Switch to Cartesia TTS voice"""
        return CartesiaAgent()
3. Using Participant Attributes: You can also listen for changes in participant attributes and update the voice dynamically like this:
Copy code
@ctx.room.on("participant_attributes_changed")
def on_participant_attributes_changed(changed_attributes: dict[str, str], participant: rtc.Participant):
    if "voice" in changed_attributes:
        voice_id = participant.attributes.get("voice")
        # Update TTS options dynamically
        tts._opts.voice = voice_id
        asyncio.create_task(agent.say("How do I sound now?", allow_interruptions=True))
4. Language Switching: Want to switch languages during your session? You can use
update_options
for that too:
Copy code
async def _switch_language(self, language_code: str):
    if self.tts is not None:
        self.tts.update_options(language=language_code)

    if self.stt is not None:
        self.stt.update_options(language=deepgram_language)

    await self.session.say(self.greetings[language_code])
Just a couple of things to keep in mind: • Voice changes will apply on the next utterance, not in the middle of speaking. • Different TTS providers might have different ways they handle voice switching. • You can mix voice changes with other settings like speed or language for a custom feel. If you want me to walk you through any of these or need examples tailored to what you’re working on, just let me know—I’m here to help! Sources: Hume TTS integration guide | LiveKit Docs | pipeline-tts/elevenlabs_change_language.py | agent/main.py
p
i am using aws tts i already have language switching but want to add voice change
t
Hey there! If you want to add voice switching with AWS TTS, you can easily use the
update_options
method to change the voice on the fly during a session. AWS TTS offers a bunch of voices, and you just need to specify which one you want in the options. Here’s a simple way to do it: 1. Update TTS Options: Just use the
update_options
method to switch voices dynamically. For example:
Copy code
session.tts.update_options(
    voice="Joanna",  # Swap this out for whichever AWS Polly voice you like
    language="en-US"  # Optional: specify the language if you want
)
This will switch the voice to "Joanna" for the next thing it says. 2. Integrate Voice Switching: If you want to let users switch voices whenever they want, you can set up a function or a command handler like this:
Copy code
class LanguageAgent:
    def __init__(self, tts):
        self.tts = tts

    async def switch_voice(self, voice_name: str):
        """Dynamically switch the TTS voice."""
        self.tts.update_options(voice=voice_name)
        await self.session.say(f"Voice switched to {voice_name}.")
3. Example Voice List: Here are some popular AWS Polly voices you might want to try: • English (US): Joanna, Matthew, Ivy, Justin • English (UK): Amy, Brian, Emma • Spanish: Conchita, Enrique, Lucia • French: Celine, Mathieu If you want to explore all the voices, you can check out the AWS Polly documentation. 4. Combine with Language Switching: If you’re already handling language switching, you can easily add voice options too:
Copy code
async def switch_language_and_voice(self, language_code: str, voice_name: str):
    self.tts.update_options(language=language_code, voice=voice_name)
    await self.session.say(f"Language switched to {language_code} and voice to {voice_name}.")
5. User Interaction: Let your users ask to change voices naturally or through a menu. For example:
Copy code
if user_input == "Change voice to Joanna":
    await agent.switch_voice("Joanna")
If you want a hand with setting this up or tweaking it to fit your needs, just let me know—I’m happy to help!
p
check it async def _switch_language(self,language_code: str, voice: str, speech_engine: str): "Helper Function To Switch Languages" try: if language_code == self.current_language: await self.session.say("You are already using this language.") return if self.tts is not None: self.tts.update_options(language_code=language_code, voice=voice, speech_engine=speech_engine) if self.stt is not None: aws_languages = self.aws_languages.get(language_code) aws_voice = self.aws_voice.get(voice) aws_speech_engine = self.aws_speech_engine.get(speech_engine) self.stt.update_options(language_code=aws_languages) self.current_language = language_code await self.session.say(f"Language switched to {self.language_names[language_code]}.") except Exception as e: logging.error(f"Error switching language: {e}") raise CustomException(e, sys)
t
Your implementation for switching language and voice looks well-structured and functional. Here’s a quick review along with some friendly suggestions to make it even better: Key Features: • Smartly checks if the requested language is already in use to avoid unnecessary updates. • Updates TTS options smoothly with the chosen language, voice, and speech engine. • Adjusts STT options using the right mappings for AWS languages, voices, and speech engines. • Handles exceptions gracefully and logs errors to help with debugging. Suggestions for Improvement: 1. Validation of Inputs: It’s a good idea to double-check that the
language_code
,
voice
, and
speech_engine
you get are valid before updating anything. Here’s a simple way to do that:
Copy code
if language_code not in self.aws_languages:
    raise ValueError(f"Invalid language code: {language_code}")

if voice not in self.aws_voice:
    raise ValueError(f"Invalid voice: {voice}")

if speech_engine not in self.aws_speech_engine:
    raise ValueError(f"Invalid speech engine: {speech_engine}")
2. Feedback for Voice and Engine Changes: Letting the user know about the voice and speech engine in the confirmation message adds a nice touch of clarity:
await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.")
3. Default Fallbacks: Just in case the requested voice or speech engine isn’t found, having a default fallback can keep things running smoothly:
Copy code
aws_voice = self.aws_voice.get(voice, "default_voice")
aws_speech_engine = self.aws_speech_engine.get(speech_engine, "standard")
4. Logging Enhancements: Adding more context in your error logs can make troubleshooting a breeze:
logging.error(f"Error switching language to {language_code}, voice: {voice}, engine: {speech_engine}: {e}")
5. Asynchronous Safety: To keep your speech output smooth and uninterrupted, you might want to use a lock around
self.session.say
calls if multiple async tasks could interfere. Example with Improvements:
Copy code
async def _switch_language(self, language_code: str, voice: str, speech_engine: str):
    """Helper Function To Switch Languages"""
    try:
        if language_code == self.current_language:
            await self.session.say("You are already using this language.")
            return

        # Validate inputs
        if language_code not in self.aws_languages:
            raise ValueError(f"Invalid language code: {language_code}")

        if voice not in self.aws_voice:
            raise ValueError(f"Invalid voice: {voice}")

        if speech_engine not in self.aws_speech_engine:
            raise ValueError(f"Invalid speech engine: {speech_engine}")

        # Update TTS options
        if self.tts is not None:
            self.tts.update_options(language_code=language_code, voice=voice, speech_engine=speech_engine)

        # Update STT options
        if self.stt is not None:
            aws_languages = self.aws_languages.get(language_code)
            aws_voice = self.aws_voice.get(voice)
            aws_speech_engine = self.aws_speech_engine.get(speech_engine)
            self.stt.update_options(language_code=aws_languages)

        # Update current language
        self.current_language = language_code

        # Confirmation message
        await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.")

    except Exception as e:
        logging.error(f"Error switching language to {language_code}, voice: {voice}, engine: {speech_engine}: {e}")
        raise CustomException(e, sys)
If you want me to help with anything else or add more enhancements, just let me know! I’m here to help.
p
import os import sys import re import librosa import numpy as np from dotenv import load_dotenv from typing import AsyncIterable, Optional from livekit import rtc from livekit.agents.llm import function_tool from livekit.agents import Agent, ModelSettings, stt from livekit.plugins import aws,silero from src.utils.logger import logging from src.utils.exception import CustomException from src.prompt.voice_agent_prompt import Agent_instructions from src.voice_agent.masking import ContentOptimizer load_dotenv() content_optimizer = ContentOptimizer() class IrishAgent(Agent): def __init__(self,tool, llm_instance)-> None: super().__init__( instructions=Agent_instructions, stt=aws.STT(language="en-IN",region="eu-central-1"), llm=llm_instance, tts=aws.TTS(region="eu-central-1",api_key=os.getenv("AWS_ACCESS_KEY_ID"),api_secret=os.getenv("AWS_SECRET_ACCESS_KEY")), vad= silero.VAD.load(), tools=tool, #use_tts_aligned_transcript=True, ) self.current_language = "en-US" self.aws_languages ={ "en-GB": "en-US", "en-IN": "en-US", "fr-FR": "fr-FR", "de-DE": "de-DE", "hi-IN": "hi-IN", "de-CH": "de-DE" } self.language_names={ "en-GB": "English (United Kingdom)", "en-IN": "English (India)", "fr-FR": "French (France)", "de-DE": "German (Germany)", "hi-IN": "Hindi (India)", "de-CH": "German (Switzerland)" } self.aws_speech_engine ={ "en-GB": "en-US", "en-IN": "Aditi", "fr-FR": "fr-FR", "de-DE": "de-DE", "hi-IN": "hi-IN", "de-CH": "de-DE" } self.aws_voice ={ "en-GB": "en-US", "en-IN": "Aditi", "fr-FR": "fr-FR", "de-DE": "de-DE", "hi-IN": "hi-IN", "de-CH": "de-DE" } self.greeting ={ "en-GB": "Hello", "en-IN": "Namaste", "fr-FR": "Bonjour", "de-DE": "Hallo", "hi-IN": "नमस्ते", "de-CH": "Grüezi" } async def on_entry(self): self.session.generate_reply(instructions="Hello! How can I assist you today?") async def on_enter(self): await self.session.say( f"{self.greeting[self.current_language]}! Please select your preferred language first, then" ) async def _switch_language(self, language_code: str, voice: str, speech_engine: str): """Helper Function To Switch Languages""" try: if language_code == self.current_language: await self.session.say("You are already using this language.") return # Validate inputs if language_code not in self.aws_languages: raise ValueError(f"Invalid language code: {language_code}") if voice not in self.aws_voice: raise ValueError(f"Invalid voice: {voice}") if speech_engine not in self.aws_speech_engine: raise ValueError(f"Invalid speech engine: {speech_engine}") # Update TTS options if self.tts is not None: self.tts.update_options(language_code=language_code, voice=voice, speech_engine=speech_engine) # Update STT options if self.stt is not None: aws_languages = self.aws_languages.get(language_code) aws_voice = self.aws_voice.get(voice) aws_speech_engine = self.aws_speech_engine.get(speech_engine) self.stt.update_options(language_code=aws_languages) # Update current language self.current_language = language_code # Confirmation message await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.") except Exception as e: logging.error(f"Error switching language to {language_code}, voice: {voice}, engine: {speech_engine}: {e}") raise CustomException(e, sys) @function_tool async def switch_to_en_gb(self): "switch to english (United Kingdom)" await self._switch_language("en-GB") @function_tool async def switch_to_en_in(self): "switch to english (India)" await self._switch_language("en-IN") @function_tool async def switch_to_fr_fr(self): "switch to french (France)" await self._switch_language("fr-FR") @function_tool async def switch_to_de_de(self): "switch to german (Germany)" await self._switch_language("de-DE") @function_tool async def switch_to_hi_in(self): "switch to hindi (India)" await self._switch_language("hi-IN") @function_tool async def switch_to_de_ch(self): "switch to german (Switzerland)" await self._switch_language("de-CH") # async def stt_node( # self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings # ) -> AsyncIterable[rtc.AudioFrame]: # """Custom STT Node with Content Filtering""" # # Start by calling the default STT processing to get speech events # async for event in super().stt_node(audio, model_settings): # if hasattr(event, 'alternatives') and event.alternatives: # # Grab the transcribed text from the first alternative # original_text = event.alternatives[0].text # # Apply your own filtering or masking here # filtered_text = content_optimizer.process(original_text) # # Swap out the original text for your filtered version # event.alternatives[0].text = filtered_text # # Send the updated event back into the pipeline # yield event
t
Your implementation of the
IrishAgent
class looks really solid and thoughtfully put together, especially with the language and voice switching features. Here’s a little review along with some friendly suggestions to make it even better: Strengths: • Great job supporting multiple languages and voices, complete with mappings for AWS TTS and STT options. • Nice touch with validation checks to gracefully handle any invalid inputs for language codes, voices, and speech engines. • Clear, user-friendly confirmation messages after switching language and voice—super helpful! • Using
@function_tool
decorators makes integrating language-switching commands a breeze. • Thoughtful exception handling paired with detailed logging to make debugging easier. Suggestions for Improvement: 1. Default Fallbacks: It might be handy to add default fallbacks for voice and speech engine in case the provided values aren’t found. Something like:
Copy code
aws_voice = self.aws_voice.get(voice, "default_voice")
aws_speech_engine = self.aws_speech_engine.get(speech_engine, "standard")
2. Dynamic Language and Voice Switching: Instead of having separate hardcoded methods (like
switch_to_en_gb
), how about a single dynamic method that accepts language code, voice, and speech engine as parameters? For example:
Copy code
@function_tool
async def switch_language(self, language_code: str, voice: str, speech_engine: str):
    await self._switch_language(language_code, voice, speech_engine)
3. STT Node: It would be great to uncomment and integrate that custom
stt_node
method for content filtering. This way, any sensitive info gets masked before the LLM processes it. 4. Greeting Customization: Including the selected voice and speech engine in the greeting message can give users a clearer sense of what’s happening. Something like:
await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.")
5. Code Organization: For easier maintenance, consider moving those mappings (like
aws_languages
,
aws_voice
, etc.) into a config file or constants module. Example of a Dynamic Language-Switching Method:
Copy code
@function_tool
async def switch_language(self, language_code: str, voice: str, speech_engine: str):
    """Switch to a specified language, voice, and speech engine."""
    await self._switch_language(language_code, voice, speech_engine)
Next Steps: • Integrate the
stt_node
method for content filtering. • Test the language and voice switching with different combinations to make sure everything runs smoothly. • Think about adding more languages and voices to your mappings to support even more users. Feel free to reach out if you want to chat more or need any help tweaking things!
p
stt node
t
Here’s a friendly guide on how you can add the
stt_node
method into your
IrishAgent
class to help with content filtering and customization: What’s the
stt_node
method for?
The
stt_node
method takes in audio frames and turns them into speech events. By customizing this method, you get to peek at and tweak the transcribed text before it heads over to the LLM. How to implement it:
Copy code
async def stt_node(
    self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
    """Custom STT Node with Content Filtering"""
    # First, call the default STT processing to grab speech events
    async for event in super().stt_node(audio, model_settings):
        if hasattr(event, 'alternatives') and event.alternatives:
            # Pull out the transcribed text from the first alternative
            original_text = event.alternatives[0].text

            # Here’s where you can filter or mask the content
            filtered_text = content_optimizer.process(original_text)

            # Swap the original text with your filtered version
            event.alternatives[0].text = filtered_text

        # Send the updated event back down the pipeline
        yield event
What’s happening step-by-step: 1. We start by calling the usual
stt_node
method to turn audio into speech events. 2. Next, we grab the transcribed text from the
alternatives
part of the speech event. 3. Then, we use the
ContentOptimizer
to clean up or mask anything sensitive in that text. 4. After that, we replace the original text with the cleaned-up version in the event. 5. Finally, we send that updated event back into the pipeline for the next steps. Why this is great: • It makes sure any sensitive info is hidden before the LLM sees it. • You can customize the transcribed text on the fly. • It fits right in with the existing LiveKit voice pipeline without a hitch. What to do next: • Try out the
stt_node
method with different audio clips to make sure the filtering works smoothly. • If you want, expand the
ContentOptimizer
to catch more patterns or handle extra cases. Feel free to reach out if you want a hand with anything else or want to add more features!
p
in crese user speech time
t
Hey there! If you want to help users speak for longer in LiveKit, tweaking the Voice Activity Detection (VAD) settings is a great place to start. Here are the main ones to play around with: 1. min_speech_duration: This sets the minimum length of speech needed to kick off a new speech segment. Turning this up means only longer bits of speech get counted. 2. min_silence_duration: This controls how long a pause has to be after someone stops talking before the system decides they’re done. Increasing this lets users take longer pauses without cutting off. 3. max_buffered_speech: This is the max amount of speech (in seconds) the system can hold onto before processing. Raising it means longer speech chunks can be handled. Here’s a quick example: Adjusting these settings in the Silero VAD plugin:
Copy code
from livekit.plugins import silero

vad = silero.VAD.load(
    min_speech_duration=2.0,  # At least 2 seconds of speech to start
    min_silence_duration=1.5,  # Wait 1.5 seconds of silence before ending
    max_buffered_speech=10.0  # Buffer up to 10 seconds of speech
)
Why this helps: • Users get to speak longer without interruptions. • Short pauses won’t cut off their speech prematurely. If you want a hand setting this up or have any questions, just let me know—I’m here to help! Sources: Silero VAD plugin | LiveKit Docs
p
multi language with aws turn
t
Hey there! If you’re looking to add multi-language support with AWS TTS and STT in LiveKit, you can totally customize the language, voice, and speech engine for each user interaction on the fly. Here’s a friendly guide to help you get started: 1. Configure AWS TTS and STT: First, set up AWS TTS and STT to handle multiple languages. You can use the
update_options
method to easily switch between languages and voices whenever needed.
Copy code
class MultiLanguageAgent(Agent):
    def __init__(self):
        super().__init__(
            stt=aws.STT(language="en-US", region="us-east-1"),
            tts=aws.TTS(region="us-east-1", api_key=os.getenv("AWS_ACCESS_KEY_ID"), api_secret=os.getenv("AWS_SECRET_ACCESS_KEY"))
        )
        self.current_language = "en-US"
        self.language_mappings = {
            "en-US": {"voice": "Joanna", "stt_language": "en-US"},
            "fr-FR": {"voice": "Celine", "stt_language": "fr-FR"},
            "de-DE": {"voice": "Hans", "stt_language": "de-DE"},
            "hi-IN": {"voice": "Aditi", "stt_language": "hi-IN"}
        }

    async def switch_language(self, language_code: str):
        if language_code not in self.language_mappings:
            raise ValueError(f"Unsupported language: {language_code}")

        # Update TTS options
        language_settings = self.language_mappings[language_code]
        self.tts.update_options(language=language_code, voice=language_settings["voice"])

        # Update STT options
        self.stt.update_options(language=language_settings["stt_language"])

        self.current_language = language_code
        await self.session.say(f"Language switched to {language_code}.")
2. Handle User Input: Give your users an easy way to pick their preferred language, maybe through a command or a menu. For example:
Copy code
@function_tool
async def switch_to_french(self):
    await self.switch_language("fr-FR")
3. Dynamic Language Switching: Make the language-switching part of your workflow so it adjusts smoothly based on what the user wants or says. 4. AWS Language and Voice Options: Here are some popular AWS Polly voices you might want to try for different languages: • English (US): Joanna, Matthew • French (France): Celine, Mathieu • German (Germany): Hans, Marlene • Hindi (India): Aditi Feel free to check out the AWS Polly documentation for the full list of voices and languages. 5. Testing and Validation: • Give the language-switching a good test with different inputs to make sure it flows nicely. • Double-check the STT and TTS results to ensure everything sounds and works just right in each language. If you want, I’m here to help with more examples or any questions you have — just let me know!
p
can we use turn_detection=multilingual() with aws
t
The
turn_detection=multilingual()
setting works with certain STT plugins in LiveKit, but just a heads-up—there’s no clear confirmation yet that AWS Transcribe or AWS Polly support this feature. Turn detection depends on STT output and VAD (Voice Activity Detection), so the STT provider needs to support multilingual or automatic language detection. Key Points:Supported STT Providers: Some great options like Deepgram, Gladia, and AssemblyAI are known to work well with
turn_detection=multilingual()
. • AWS Compatibility: The docs don’t show any evidence that AWS Transcribe or Polly currently support this feature in LiveKit. • TTS Irrelevance: Since turn detection uses STT output and VAD, TTS providers like AWS Polly aren’t directly involved here. Recommendations: 1. Take a look at the official LiveKit docs for the list of supported STT plugins to see if AWS is included. 2. Make sure your STT provider supports real-time transcription and language detection if you want to use
turn_detection=multilingual()
. 3. If AWS isn’t on the list, no worries—using a confirmed provider like Deepgram or Gladia is a solid choice for multilingual turn detection. If you want, I’m happy to help you find the official list of supported STT plugins or guide you through integrating a custom STT provider. Just let me know! Sources: LiveKit turn detector plugin | LiveKit Docs | LiveKit turn detector plugin | LiveKit Docs | examples/voice_agents/basic_agent.py | mcp/agent.py | Turn detection and interruptions | LiveKit Docs
p
apart from add model what can we add in agentsession that improve session
t
Hey there! To help boost your session’s performance and functionality in LiveKit, you can tweak the
AgentSession
with some handy features and settings. Here’s a quick rundown of some great options you might want to explore: 1. Turn Detection and Interruptions: • Pick from several turn detection methods like
stt
,
vad
,
realtime_llm
, or even
manual
. • Customize how interruptions work with settings such as
allow_interruptions
,
min_interruption_duration
, and
min_interruption_words
. 2. Endpointing Delays: • Tweak
min_endpointing_delay
and
max_endpointing_delay
to get the timing just right for when turns end. 3. Preemptive Generation: Turn on
preemptive_generation=True
to kick off LLM and TTS processing as soon as user transcripts come in — this helps cut down response time. 4. Video Sampling: Try using a
video_sampler
function to smartly adjust video frame capture rates based on what the user is doing. 5. Session Timing and State Management: • Set
user_away_timeout
to decide when someone’s marked as “away” after being silent. • Use
agent_false_interruption_timeout
to handle those quiet interruptions smoothly. 6. Tool Execution Limits: Keep the number of back-to-back tool calls per LLM turn in check with
max_tool_steps
. 7. State Synchronization and RPC: Make the most of LiveKit’s real-time state and data features to build your own custom state and control logic. 8. Performance Optimization: Speed things up by preloading models using the
prewarm
function in
WorkerOptions
, so initialization feels snappier. 9. Telemetry and Metrics: Plug in OpenTelemetry for trace-based observability and real-time metrics — perfect for digging into how your sessions are performing. Example Configuration:
Copy code
session = AgentSession(
    vad=silero.VAD.load(),
    stt=deepgram.STT(model="nova-3"),
    llm=openai.LLM(model="gpt-4o-mini"),
    tts=openai.TTS(voice="ash"),
    preemptive_generation=True,
    turn_detection=MultilingualModel(),
    max_tool_steps=10,
    user_away_timeout=30.0,
)
If you’d like a hand setting any of these up or want tips on optimizing your session, just give me a shout — I’m here to help! Sources: README.md | Turn detection and interruptions | LiveKit Docs | Agent speech and audio | LiveKit Docs | Web and mobile frontends | LiveKit Docs | Silero VAD plugin | LiveKit Docs
p
i have langfuse where i add
t
Hey there! To get Langfuse working smoothly with your
AgentSession
in LiveKit, here’s a friendly step-by-step guide: 1. Set Environment Variables:
LANGFUSE_PUBLIC_KEY
LANGFUSE_SECRET_KEY
LANGFUSE_HOST
(this could be
<https://cloud.langfuse.com>
or your own self-hosted setup) 2. Setup Langfuse Tracer Provider: Use the
setup_langfuse()
function to configure OpenTelemetry so your traces get sent to Langfuse:
Copy code
from livekit.agents.telemetry import set_tracer_provider
import base64
import os

def setup_langfuse(metadata=None, host=None, public_key=None, secret_key=None):
    from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor

    public_key = public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
    secret_key = secret_key or os.getenv("LANGFUSE_SECRET_KEY")
    host = host or os.getenv("LANGFUSE_HOST")

    if not public_key or not secret_key or not host:
        raise ValueError("LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, and LANGFUSE_HOST must be set")

    langfuse_auth = base64.b64encode(f"{public_key}:{secret_key}".encode()).decode()
    os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = f"{host.rstrip('/')}/api/public/otel"
    os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {langfuse_auth}"

    trace_provider = TracerProvider()
    trace_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
    set_tracer_provider(trace_provider, metadata=metadata)
    return trace_provider
3. Initialize in the Entrypoint: Just call
setup_langfuse()
right at the start of your entrypoint function, before kicking off the `AgentSession`:
Copy code
async def entrypoint(ctx: JobContext):
    # Setup Langfuse tracing
    trace_provider = setup_langfuse(metadata={"langfuse.session.id": ctx.room.name})

    # Optional: flush traces on shutdown
    async def flush_trace():
        trace_provider.force_flush()

    ctx.add_shutdown_callback(flush_trace)

    # Start the agent session
    session = AgentSession(vad=silero.VAD.load())
    await session.start(agent=Kelly(), room=ctx.room)
4. Run the Agent: Then just run your app using the LiveKit CLI worker system like this:
Copy code
python langfuse_trace.py
Telemetry Data Collected: • Session start • Agent turns • LLM node execution • Function tool invocations • TTS node processing • End-of-turn detection events All these traces will show up in your Langfuse dashboard, giving you great insights into how your sessions and agents are performing. If you have any questions or want some tailored examples, just give me a shout—I’m here to help! Sources: examples/voice_agents/restaurant_agent.py | Logs, metrics, and telemetry | LiveKit Docs
p
ctx.room how many things we track
t
The
ctx.room
object in LiveKit is your go-to for all things related to a room. It gives you access to handy properties and events that help you stay on top of what's happening. Here’s a quick rundown of what you can check out: 1. Room Properties:Room Name: You can grab the room’s name using
ctx.room.name
. • Room Metadata: This lets you pull application-specific metadata that’s shared with everyone in the room (usually set via server APIs). • Connection State: Keep an eye on the connection status, like whether it’s connected, disconnected, or reconnecting. 2. Participants:Participant Details: Get info about both local and remote participants, including who they are and their attributes. • Participant Count: Easily track how many folks are in the room at any time. 3. Room Events:Participant Events: Listen for when someone joins or leaves with events like
ParticipantConnected
and
ParticipantDisconnected
. • Track Events: Stay updated on media tracks being added or removed via
TrackPublished
and
TrackUnpublished
. • Metadata Changes: React to updates with
RoomMetadataChanged
events. • Active Speakers: Keep tabs on who’s talking with
ActiveSpeakersChanged
. All these features make it easier for you to manage the room and keep things running smoothly, perfect for building dynamic, real-time apps. If you want me to walk you through any part or share some examples, just let me know—I’m here to help! Sources: Room metadata | LiveKit Docs | Room &amp; Participant Management | livekit/livekit_composite | DeepWiki | packages/livekit-rtc/src/room.ts | livekit-rtc/livekit/rtc/_proto/room_pb2.pyi | Inside a session | LiveKit Docs
p
now i wnat to create diffrent sesssion for diffrent user not overlap users conversation
t
To help you create separate sessions for different users in LiveKit and keep conversations from overlapping, here are some friendly tips you can follow: 1. Use Unique Room Names: Make sure each user joins their own unique room by creating distinct room names. You can do this by adding a timestamp, UUID, or a random string to the room name, like this:
Copy code
# Example of unique room names
room_name = f"user_{user_id}_{uuid.uuid4()}"
This way, you avoid reusing the same Room ID and keep each session nicely separated. 2. Route Inbound Calls to Individual Rooms (SIP Use Case): If you’re working with Twilio Voice, try setting up a dispatch rule that sends each caller into their own room. Here’s an example:
Copy code
{
  "dispatch_rule": {
    "rule": {
      "dispatchRuleIndividual": {
        "roomPrefix": "call-"
      }
    }
  }
}
Then, apply the rule using the LiveKit CLI like this:
Copy code
lk sip dispatch-rule create dispatch-rule.json
This helps make sure every incoming call gets its own session. 3. Ensure Unique Participant Identity: It’s important that each participant in a room has a unique identity. If two people join with the same identity, the first one gets disconnected. So, when you generate access tokens, use unique user IDs for each participant. By using unique room names, dispatch rules, and unique participant identities, you’ll be able to keep sessions nicely separated for every user. If you want, I’m here to help with more examples or any questions you have! Sources: Avoiding Room ID Reuse When Re-Creating Rooms With the Same Name | Inbound calls with Twilio Voice | LiveKit Docs | Rooms, participants, and tracks | LiveKit Docs
p
in sip python
t
Hey there! If you want to create separate SIP sessions for different users in LiveKit using Python, here’s a simple step-by-step guide to help you out: 1. Initialize the LiveKit API Client:
Copy code
from livekit import api

lk_api = api.LiveKitAPI(
    url=os.getenv("LIVEKIT_URL"),
    api_key=os.getenv("LIVEKIT_API_KEY"),
    api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
2. Create an Inbound SIP Trunk: This is where you set up a trunk to handle incoming calls:
Copy code
inbound_trunk_request = api.CreateSIPInboundTrunkRequest(
    trunk=api.SIPInboundTrunkInfo(
        name="My Inbound Trunk",
        numbers=["+1234567890"],  # Don’t forget to replace this with your actual numbers
        krisp_enabled=True,       # This turns on noise cancellation for clearer calls
    ),
)
inbound_trunk = await lk_api.sip.create_inbound_trunk(inbound_trunk_request)
3. Create a SIP Dispatch Rule: This helps route each SIP user to their own unique room:
Copy code
dispatch_rule_request = api.CreateSIPDispatchRuleRequest(
    trunk_ids=[inbound_trunk.sip_trunk_id],
    rule=api.SIPDispatchRule(
        dispatch_rule_individual=api.SIPDispatchRuleIndividual(room_prefix="call-")
    ),
)
dispatch_rule = await lk_api.sip.create_sip_dispatch_rule(dispatch_rule_request)
4. Handle Incoming SIP Participants: Every incoming SIP call will automatically create a new room and add a SIP participant. You’ll have access to details like phone number, call status, and call ID, so you can customize things just how you want. 5. Full Example:
Copy code
import asyncio
import os
from livekit import api

async def setup_sip_isolation():
    lk_api = api.LiveKitAPI(
        url=os.getenv("LIVEKIT_URL"),
        api_key=os.getenv("LIVEKIT_API_KEY"),
        api_secret=os.getenv("LIVEKIT_API_SECRET"),
    )

    # Create inbound trunk
    inbound_trunk = await lk_api.sip.create_inbound_trunk(
        api.CreateSIPInboundTrunkRequest(
            trunk=api.SIPInboundTrunkInfo(name="Inbound", numbers=["+1234567890"])
        )
    )

    # Create dispatch rule
    dispatch_rule = await lk_api.sip.create_sip_dispatch_rule(
        api.CreateSIPDispatchRuleRequest(
            trunk_ids=[inbound_trunk.sip_trunk_id],
            rule=api.SIPDispatchRule(
                dispatch_rule_individual=api.SIPDispatchRuleIndividual(room_prefix="user-call")
            ),
        )
    )
    print(f"Dispatch rule created: {dispatch_rule.sip_dispatch_rule_id}")

    await asyncio.Future()  # Keep the service running

if __name__ == "__main__":
    asyncio.run(setup_sip_isolation())
And that’s it! This setup makes sure each SIP user gets their own unique room, keeping sessions nicely isolated. If you have any questions or want me to walk you through more examples, just let me know—I’m here to help! Sources: Creating SIP Inbound Trunks and Dispatch Rules with Python SDK | livekit-api/livekit/api/sip_service.py | Inbound calls with Twilio Voice | LiveKit Docs | SIP participant | LiveKit Docs | telephony/make_call/make_call.py
p
my code async def entrypoint(ctx: JobContext): """Main entrypoint for the LiveKit agent""" try: # set up the langfuse setup_langfuse( metadata={ "langfuse.session.id": ctx.room.name, "langfuse.session" : ctx.room.metadata } ) print("🚀 Starting LiveKit agent...") logging.info("Starting LiveKit agent...") await ctx.connect() print(" Agent created with tools") print(" Connecting to room") # Create the information tool info_tool = create_tool_function() print(" Information tool created") # Wait for participant print(" Waiting for participant...") participant = await ctx.wait_for_participant() # Initialize transcription logger transcription_logger = TranscriptionLogger( mongo_uri=os.getenv("MONOGO_DB_URI"), db_name=os.getenv("MONGODB_DB_NAME_TRANSCRIPTION") ) # Intilaize call info logger caller_info= CallLogger( mongo_uri=os.getenv("MONOGO_DB_URI"), db_name="caller_info" ) # Create session with error handling for each component print("🔧 Creating session components...") try: vad = silero.VAD.load( min_silence_duration=2, # at least 2s of speech to start min_speech_duration=1.5, # wait 1.5s of silance before ending max_buffered_speech=10 # buffer up to 10s speech ) print(" VAD loaded") llm = aws.LLM( api_key=os.getenv("AWS_API_KEY"), api_secret=os.getenv("AWS_API_SECRET"), region="eu-central-1", model="arnawsbedrockeu central 1914194815954inference profile/eu.amazon.nova pro v10", max_output_tokens=500 ) print(" LLM configured") except Exception as e: print(f" TTS error: {e}") raise CustomException(e,sys) print(" Session created") print(" Transcription logging set up") # Set up transcription logging # Check if this is a SIP call and initialize logging is_sip_call = ( hasattr(participant, 'kind') and participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP ) current_call_id = caller_info.start_call(participant) # Start transcription logging for both SIP and web calls if current_call_id: call_type = "SIP" if is_sip_call else "WEB" print(f"📞 {call_type} Call detected - Call ID: {current_call_id}") if is_sip_call: print(f"📱 Caller: {participant.attributes.get('sip.phoneNumber', 'Unknown')}") transcription_logger.start_transcription(session_id=current_call_id, session=session, mode=call_type) logging.info(f"Call logging started for call ID: {current_call_id}") else: print("⚠️ Failed to start call logging") session = AgentSession( use_tts_aligned_transcript=True, user_away_timeout=30, max_tool_steps=10 ) agent = IrishAgent( llm_instance=llm, tool=[info_tool], vad=vad ) @session.on("metrics_collected") def _on_metrics_collected(ev: MetricsCollectedEvent): metrics.log_metrics(ev.metrics) # Start the session await session.start(agent=agent, room=ctx.room,room_input_options=RoomInputOptions(close_on_disconnect=False)) print(" Session started") print(" Initial greeting sent") print("🎙️ Agent is now active and listening...") logging.info("Agent is now active and listening...") logging.info("Agent is now active and listening...") # Send initial greeting await session.generate_reply(instructions="Hello! How can I assist you today?") # Set up comprehensive transcription event handlers @session.on("user_speech_committed") def on_user_speech(event): """Handle user speech transcription""" if transcription_logger and hasattr(event, 'user_transcript') and event.user_transcript: transcription_logger.log_user_speech(event.user_transcript) @session.on("agent_speech_committed") def on_agent_speech(event): """Handle agent speech transcription""" if transcription_logger and hasattr(event, 'agent_transcript') and event.agent_transcript: transcription_logger.log_agent_response(event.agent_transcript) # Additional event handlers for comprehensive logging @session.on("conversation_item_added") def on_conversation_item(event): """Handle conversation items (covers both user and agent messages)""" if transcription_logger and hasattr(event, 'item'): item = event.item if hasattr(item, 'content') and item.content: # Determine speaker based on item type or role speaker = "USER" if hasattr(item, 'role') and item.role == 'user' else "AGENT" transcription_logger.log_transcription(item.content, is_final=True, speaker=speaker) # Handle session end and participant disconnect @ctx.room.on("participant_disconnected") def on_participant_disconnected(participant): """Handle participant disconnect - save transcription to MongoDB""" if current_call_id and transcription_logger: print(f"📞 Participant disconnected - saving transcription for call {current_call_id}") transcription_logger.close_transcription() caller_info.end_call(current_call_id) print(f" Call {current_call_id} ended and transcription saved to MongoDB") # Handle room disconnect @ctx.room.on("disconnected") def on_room_disconnected(): """Handle room disconnect - ensure transcription is saved""" if current_call_id and transcription_logger: print(f"🏠 Room disconnected - saving transcription for call {current_call_id}") transcription_logger.close_transcription() caller_info.end_call(current_call_id) transcription_logger.remove_txt() print(f" Call {current_call_id} ended and transcription saved to MongoDB") except Exception as e: error_msg = f"Voice agent error: {e}" print(f" {error_msg}") logging.error(error_msg) raise CustomException(e, sys)
t
Your entrypoint function looks really solid and thoughtfully put together, covering all the important bits like handling SIP and web calls, logging transcriptions, and managing sessions smoothly. Here’s a quick review with some friendly suggestions to make it even better: Strengths: • Great job setting up Langfuse integration for telemetry and session tracking with detailed metadata. • Thorough logging for both SIP and web calls, capturing transcription and call details nicely. • Smart dynamic handling of VAD, LLM, and session components, plus error handling for each – very robust! • Event handlers for user and agent speech, conversation items, and participant disconnects keep session management solid. • MongoDB integration for storing transcription and call data is well done and reliable. Suggestions for Improvement: 1. Room Metadata: Just a heads-up to make sure
ctx.room.metadata
is always initialized with the right session-specific info. If it’s missing, maybe add a fallback or default value to keep things smooth. 2. Dynamic Room Names: If you’re juggling multiple users, it’s a good idea to keep room names unique to prevent any session mix-ups. Something like this works well:
room_name = f"session_{uuid.uuid4()}"
3. Langfuse Metadata: Adding extra metadata like user IDs, call types, or timestamps can really help with tracking in Langfuse. For example:
Copy code
setup_langfuse(
    metadata={
        "langfuse.session.id": ctx.room.name,
        "langfuse.user.id": participant.identity,
        "langfuse.call.type": "SIP" if is_sip_call else "WEB"
    })
4. Graceful Shutdown: It’s a good idea to add a shutdown callback to make sure everything (like MongoDB connections and Langfuse traces) closes properly when you’re done:
Copy code
async def shutdown():
    transcription_logger.close()
    caller_info.close()
    await ctx.disconnect()

ctx.add_shutdown_callback(shutdown)
5. Concurrency: If you’re running multiple sessions at once, double-check that shared resources like MongoDB connections are thread-safe or consider using separate instances per session to avoid hiccups. 6. Error Logging: Adding a bit more context to your error logs can really speed up debugging. Something like this can help:
logging.error(f"Error in entrypoint: {e}", exc_info=True)
Next Steps: • Give the function a test run with both SIP and web calls to make sure all event handlers are firing as expected. • Check out the Langfuse traces to confirm session metadata is being captured correctly. • Consider optimizing your MongoDB queries if you expect to handle a large number of sessions for better scalability. If you want to chat more or need help tweaking anything, just let me know—I’m here to help!
p
yes
t
Awesome! If you have any more questions or need a hand as you work through your implementation, just give me a shout. Wishing you the best with your LiveKit project!
p
i want multipe session each session one use intaract with my agent no overlapping conversation
t
To make sure each user gets their own separate session with your agent—without any overlapping conversations—here’s a simple way to do it: 1. Use Unique Room Names: Give each session a unique room name. This way, every user gets their own private space. For example:
Copy code
import uuid
room_name = f"session_{uuid.uuid4()}"
This creates a unique ID for each session, so there’s no chance of overlap. 2. Assign Unique Participant Identities: Make sure every participant joining a room has their own unique identity. This helps avoid any mix-ups where two people might accidentally share the same ID. 3. Create a New Session for Each Room: For every new room, spin up a fresh
AgentSession
instance. Like this:
Copy code
async def start_session(room_name):
    session = AgentSession(
        use_tts_aligned_transcript=True,
        user_away_timeout=30,
        max_tool_steps=10
    )

    agent = IrishAgent(
        llm_instance=llm,
        tool=[info_tool],
        vad=vad
    )

    await session.start(agent=agent, room=room_name, room_input_options=RoomInputOptions(close_on_disconnect=False))
4. Handle Room Cleanup: When a session ends, it’s important to clean up the room and any resources to keep things tidy:
Copy code
@ctx.room.on("disconnected")
def on_room_disconnected():
    print(f"Room {ctx.room.name} disconnected. Cleaning up resources.")
    # Perform cleanup tasks here
5. SIP Integration (Optional): If you’re working with SIP calls, you can set dispatch rules to send each caller to their own unique room:
Copy code
dispatch_rule_request = api.CreateSIPDispatchRuleRequest(
    trunk_ids=[inbound_trunk.sip_trunk_id],
    rule=api.SIPDispatchRule(
        dispatch_rule_individual=api.SIPDispatchRuleIndividual(room_prefix="call-")
    ),
)
dispatch_rule = await lk_api.sip.create_sip_dispatch_rule(dispatch_rule_request)
6. Monitor Active Sessions: It’s a good idea to keep track of all active sessions with a session manager. Here’s a quick example:
Copy code
active_sessions = {}

def add_session(session_id, session):
    active_sessions[session_id] = session

def remove_session(session_id):
    if session_id in active_sessions:
        del active_sessions[session_id]
If you’d like, I’m here to help with any part of this setup—just let me know!
p
async def entrypoint(ctx: JobContext): """Main entrypoint for the LiveKit agent""" try: # set up the langfuse setup_langfuse( metadata={ "langfuse.session.id": ctx.room.name, "langfuse.session" : ctx.room.metadata } ) print("🚀 Starting LiveKit agent...") logging.info("Starting LiveKit agent...") await ctx.connect() print(" Agent created with tools") print(" Connecting to room") # Create the information tool info_tool = create_tool_function() print(" Information tool created") # Wait for participant print(" Waiting for participant...") participant = await ctx.wait_for_participant() # Initialize transcription logger transcription_logger = TranscriptionLogger( mongo_uri=os.getenv("MONOGO_DB_URI"), db_name=os.getenv("MONGODB_DB_NAME_TRANSCRIPTION") ) # Intilaize call info logger caller_info= CallLogger( mongo_uri=os.getenv("MONOGO_DB_URI"), db_name="caller_info" ) # Create session with error handling for each component print("🔧 Creating session components...") try: vad = silero.VAD.load( min_silence_duration=2, # at least 2s of speech to start min_speech_duration=1.5, # wait 1.5s of silance before ending max_buffered_speech=10 # buffer up to 10s speech ) print(" VAD loaded") llm = aws.LLM( api_key=os.getenv("AWS_API_KEY"), api_secret=os.getenv("AWS_API_SECRET"), region="eu-central-1", model="arnawsbedrockeu central 1914194815954inference profile/eu.amazon.nova pro v10", max_output_tokens=500 ) print(" LLM configured") except Exception as e: print(f" TTS error: {e}") raise CustomException(e,sys) print(" Session created") print(" Transcription logging set up") # Set up transcription logging # Check if this is a SIP call and initialize logging is_sip_call = ( hasattr(participant, 'kind') and participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP ) current_call_id = caller_info.start_call(participant) # Start transcription logging for both SIP and web calls if current_call_id: call_type = "SIP" if is_sip_call else "WEB" print(f"📞 {call_type} Call detected - Call ID: {current_call_id}") if is_sip_call: print(f"📱 Caller: {participant.attributes.get('sip.phoneNumber', 'Unknown')}") transcription_logger.start_transcription(session_id=current_call_id, session=session, mode=call_type) logging.info(f"Call logging started for call ID: {current_call_id}") else: print("⚠️ Failed to start call logging") session = AgentSession( use_tts_aligned_transcript=True, user_away_timeout=30, max_tool_steps=10 ) agent = IrishAgent( llm_instance=llm, tool=[info_tool], vad=vad ) @session.on("metrics_collected") def _on_metrics_collected(ev: MetricsCollectedEvent): metrics.log_metrics(ev.metrics) # Start the session await session.start(agent=agent, room=ctx.room,room_input_options=RoomInputOptions(close_on_disconnect=False)) print(" Session started") print(" Initial greeting sent") print("🎙️ Agent is now active and listening...") logging.info("Agent is now active and listening...") logging.info("Agent is now active and listening...") # Send initial greeting await session.generate_reply(instructions="Hello! How can I assist you today?") # Set up comprehensive transcription event handlers @session.on("user_speech_committed") def on_user_speech(event): """Handle user speech transcription""" if transcription_logger and hasattr(event, 'user_transcript') and event.user_transcript: transcription_logger.log_user_speech(event.user_transcript) @session.on("agent_speech_committed") def on_agent_speech(event): """Handle agent speech transcription""" if transcription_logger and hasattr(event, 'agent_transcript') and event.agent_transcript: transcription_logger.log_agent_response(event.agent_transcript) # Additional event handlers for comprehensive logging @session.on("conversation_item_added") def on_conversation_item(event): """Handle conversation items (covers both user and agent messages)""" if transcription_logger and hasattr(event, 'item'): item = event.item if hasattr(item, 'content') and item.content: # Determine speaker based on item type or role speaker = "USER" if hasattr(item, 'role') and item.role == 'user' else "AGENT" transcription_logger.log_transcription(item.content, is_final=True, speaker=speaker) # Handle session end and participant disconnect @ctx.room.on("participant_disconnected") def on_participant_disconnected(participant): """Handle participant disconnect - save transcription to MongoDB""" if current_call_id and transcription_logger: print(f"📞 Participant disconnected - saving transcription for call {current_call_id}") transcription_logger.close_transcription() caller_info.end_call(current_call_id) print(f" Call {current_call_id} ended and transcription saved to MongoDB") # Handle room disconnect @ctx.room.on("disconnected") def on_room_disconnected(): """Handle room disconnect - ensure transcription is saved""" if current_call_id and transcription_logger: print(f"🏠 Room disconnected - saving transcription for call {current_call_id}") transcription_logger.close_transcription() caller_info.end_call(current_call_id) transcription_logger.remove_txt() print(f" Call {current_call_id} ended and transcription saved to MongoDB") except Exception as e: error_msg = f"Voice agent error: {e}" print(f" {error_msg}") logging.error(error_msg) raise CustomException(e, sys) my code whre add without change anything
t
Hey there! To make sure each user gets their own separate session without any overlapping conversations, you can add some simple logic to create unique room names dynamically and manage sessions smoothly—all without changing the core of your existing code. Here’s a friendly walkthrough on how to do it: 1. Generate Unique Room Names: Right before starting a session, create a unique room name for each user. Just pop this at the start of your entrypoint function:
Copy code
import uuid

# Generate a unique room name
unique_room_name = f"session_{uuid.uuid4()}"
ctx.room.name = unique_room_name
This little step makes sure every session stays nicely isolated by giving it a unique room name. 2. Track Active Sessions: Keep a dictionary handy to track all active sessions. Add this near the top of your entrypoint function:
Copy code
active_sessions = {}

def add_session(session_id, session):
    active_sessions[session_id] = session

def remove_session(session_id):
    if session_id in active_sessions:
        del active_sessions[session_id]
When you start a session, just add it to the dictionary like this:
Copy code
add_session(unique_room_name, session)
And when the session wraps up, remove it with:
Copy code
remove_session(unique_room_name)
3. Ensure Room Cleanup: Use your existing
on_room_disconnected
handler to tidy things up for the session that just ended:
Copy code
@ctx.room.on("disconnected")
def on_room_disconnected():
    print(f"🏠  Room {ctx.room.name} disconnected - cleaning up resources.")
    remove_session(ctx.room.name)
    if current_call_id and transcription_logger:
        transcription_logger.close_transcription()
        caller_info.end_call(current_call_id)
        transcription_logger.remove_txt()
        print(f"✅  Call {current_call_id} ended and transcription saved to MongoDB")
4. SIP Integration (Optional): If you’re working with SIP calls, just ensure each SIP participant gets routed to their own unique room. You can use the dispatch rule logic from earlier to make this happen. 5. No Structural Changes: The best part? These tweaks don’t require you to overhaul your existing logic. They simply help manage sessions better by keeping room names unique and cleaning up properly. If anything’s unclear or you want me to walk you through some examples, just give me a shout!
p
what do you thing about auth token as uniq id for sip caller for separate room
t
Using an authentication token as a unique identifier for SIP callers to assign them to separate rooms is a smart move. Here’s why it works well and how you can set it up: Advantages:Uniqueness: Auth tokens are naturally unique, so each SIP caller gets their own distinct room without any mix-ups. • Traceability: You can use the token to keep track of and log session details, which makes debugging and analyzing call data much easier. • Security: Auth tokens can be encrypted or signed, so only authorized callers can join the session—keeping things safe and sound. • Integration: Tokens can carry extra info (like caller ID or session type) that helps with routing or logging. Implementation: 1. Generate a Unique Token: When a SIP call starts, create a unique token for the caller. For example:
Copy code
import uuid

def generate_auth_token():
    return str(uuid.uuid4())
2. Assign Room Name: Use that token to make a unique room name for the caller:
Copy code
auth_token = generate_auth_token()
room_name = f"sip_{auth_token}"
3. Pass the Token: Include the token in the SIP participant’s attributes or metadata when you create the session. Like this:
Copy code
participant_request = api.CreateSIPParticipantRequest(
    room_name=room_name,
    sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
    sip_call_to=caller_number,
    participant_identity=auth_token,
    metadata={"auth_token": auth_token}
)
4. Track Sessions: Use the token to keep an eye on active sessions and clean up properly when the session ends:
active_sessions[auth_token] = room_name
5. Cleanup: When the session wraps up, remove the token and any related resources:
del active_sessions[auth_token]
Considerations: • Make sure the token is generated and stored securely to keep unauthorized folks out. • Use the token to log session details for easier troubleshooting and tracking. • Check the token on the server side to confirm it’s in the right format and contains the expected data. If you’d like, I’m here to help you with any part of setting this up—just let me know!
p
can you provide clean code to understand where to add setp by step
t
Hey there! Here’s a straightforward, step-by-step guide to using an auth token as a unique identifier for SIP callers, so you can assign each of them to their own separate rooms: 1. Generate a Unique Auth Token: First up, let's create a function that generates a unique token for every SIP caller:
Copy code
import uuid

def generate_auth_token():
    """Generate a unique authentication token."""
    return str(uuid.uuid4())
2. Assign a Unique Room Name: Next, use that token to create a unique room name:
Copy code
auth_token = generate_auth_token()
room_name = f"sip_{auth_token}"
3. Create a SIP Participant: When a SIP call starts, create a SIP participant and include the token in the metadata to keep things organized:
Copy code
from livekit import api

participant_request = api.CreateSIPParticipantRequest(
    room_name=room_name,
    sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
    sip_call_to="+1234567890",  # Just swap this with the caller's number
    participant_identity=auth_token,
    metadata={"auth_token": auth_token}
)

# Use the LiveKit API to create the participant
lk_api = api.LiveKitAPI(
    url=os.getenv("LIVEKIT_URL"),
    api_key=os.getenv("LIVEKIT_API_KEY"),
    api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
await lk_api.sip.create_sip_participant(participant_request)
4. Track Active Sessions: Keep a handy dictionary to track which sessions are active by their auth token:
Copy code
active_sessions = {}

# Add the session to the active sessions
active_sessions[auth_token] = room_name
5. Handle Session Cleanup: When the session wraps up, make sure to remove the token and tidy up the resources:
Copy code
@ctx.room.on("disconnected")
def on_room_disconnected():
    """Handle room disconnection and clean up resources."""
    if auth_token in active_sessions:
        print(f"🏠  Room {active_sessions[auth_token]} disconnected. Cleaning up resources.")
        del active_sessions[auth_token]
6. Full Example: Here’s the whole thing put together for you:
Copy code
import uuid
import os
from livekit import api

# Step 1: Generate a unique auth token
def generate_auth_token():
    return str(uuid.uuid4())

# Step 2: Assign a unique room name
auth_token = generate_auth_token()
room_name = f"sip_{auth_token}"

# Step 3: Create a SIP participant
participant_request = api.CreateSIPParticipantRequest(
    room_name=room_name,
    sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
    sip_call_to="+1234567890",  # Just swap this with the caller's number
    participant_identity=auth_token,
    metadata={"auth_token": auth_token}
)

lk_api = api.LiveKitAPI(
    url=os.getenv("LIVEKIT_URL"),
    api_key=os.getenv("LIVEKIT_API_KEY"),
    api_secret=os.getenv("LIVEKIT_API_SECRET"),
)

# Step 4: Track active sessions
active_sessions = {}
active_sessions[auth_token] = room_name

# Step 5: Handle session cleanup
@ctx.room.on("disconnected")
def on_room_disconnected():
    if auth_token in active_sessions:
        print(f"🏠  Room {active_sessions[auth_token]} disconnected. Cleaning up resources.")
        del active_sessions[auth_token]

# Step 6: Create the participant
await lk_api.sip.create_sip_participant(participant_request)
This setup makes sure every SIP caller gets their own unique room with an auth token, and everything gets cleaned up nicely when the session ends. If you have any questions or want to tweak things further, just give me a shout!
p
where to add in entry point
t
Here’s a friendly step-by-step guide to help you integrate the auth token logic into your existing entrypoint function: 1. Generate the Auth Token and Room Name: Start by adding this at the very beginning of your entrypoint function, before you connect to the room:
Copy code
import uuid

# Generate a unique auth token and room name
auth_token = str(uuid.uuid4())
room_name = f"sip_{auth_token}"
ctx.room.name = room_name
This way, each session gets its own unique room name based on the auth token. 2. Create the SIP Participant: Next, after you’ve generated the room name, create the SIP participant using the LiveKit API like this:
Copy code
from livekit import api

participant_request = api.CreateSIPParticipantRequest(
    room_name=room_name,
    sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
    sip_call_to="+1234567890",  # Swap in the caller's number here
    participant_identity=auth_token,
    metadata={"auth_token": auth_token}
)

lk_api = api.LiveKitAPI(
    url=os.getenv("LIVEKIT_URL"),
    api_key=os.getenv("LIVEKIT_API_KEY"),
    api_secret=os.getenv("LIVEKIT_API_SECRET"),
)
await lk_api.sip.create_sip_participant(participant_request)
Just pop this in right after you create the auth token and room name. 3. Track Active Sessions: It’s helpful to keep track of active sessions. Add this at the top of your entrypoint function:
Copy code
active_sessions = {}

# Add the session to the active sessions
active_sessions[auth_token] = room_name
Make sure to update this dictionary right after you create the SIP participant. 4. Handle Session Cleanup: Use the existing
on_room_disconnected
handler to tidy up resources when a session ends:
Copy code
@ctx.room.on("disconnected")
def on_room_disconnected():
    if auth_token in active_sessions:
        print(f"🏠  Room {active_sessions[auth_token]} disconnected. Cleaning up resources.")
        del active_sessions[auth_token]
5. Full Integration: Here’s a complete picture of how your updated entrypoint function could look:
Copy code
async def entrypoint(ctx: JobContext):
    try:
        # Generate a unique auth token and room name
        auth_token = str(uuid.uuid4())
        room_name = f"sip_{auth_token}"
        ctx.room.name = room_name

        # Set up Langfuse
        setup_langfuse(
            metadata={
                "langfuse.session.id": ctx.room.name,
                "langfuse.session": ctx.room.metadata
            }
        )

        print("🚀  Starting LiveKit agent...")
        await ctx.connect()

        # Create SIP participant
        participant_request = api.CreateSIPParticipantRequest(
            room_name=room_name,
            sip_trunk_id=os.getenv("SIP_TRUNK_ID"),
            sip_call_to="+1234567890",
            participant_identity=auth_token,
            metadata={"auth_token": auth_token}
        )

        lk_api = api.LiveKitAPI(
            url=os.getenv("LIVEKIT_URL"),
            api_key=os.getenv("LIVEKIT_API_KEY"),
            api_secret=os.getenv("LIVEKIT_API_SECRET"),
        )
        await lk_api.sip.create_sip_participant(participant_request)

        # Track active sessions
        active_sessions[auth_token] = room_name

        # Handle room disconnect
        @ctx.room.on("disconnected")
        def on_room_disconnected():
            if auth_token in active_sessions:
                print(f"🏠  Room {active_sessions[auth_token]} disconnected. Cleaning up resources.")
                del active_sessions[auth_token]

        # Continue with the rest of your session setup...

    except Exception as e:
        print(f"❌  Error: {e}")
        raise
This setup makes sure every SIP caller gets their own unique room, and that everything gets cleaned up nicely when the session wraps up. If anything’s unclear or you want to dive deeper, just give me a shout!
p
lkapi = api.LiveKitAPI() # Create a dispatch rule to place each caller in a separate room rule = api.SIPDispatchRule( dispatch_rule_individual = api.SIPDispatchRuleIndividual( room_prefix = 'call-', ) ) request = api.CreateSIPDispatchRuleRequest( dispatch_rule = api.SIPDispatchRuleInfo( rule = rule, name = 'My dispatch rule', trunk_ids = [], room_config=api.RoomConfiguration( agents=[api.RoomAgentDispatch( agent_name="inbound-agent", metadata="job dispatch metadata", )] ) ) ) dispatch = await lkapi.sip.create_sip_dispatch_rule(request) print("created dispatch", dispatch) await lkapi.aclose() # set up the langfuse setup_langfuse( metadata={ "langfuse.session.id": ctx.room.name, "langfuse.sesssion": ctx.room.metadata, } ) print("🚀 Starting LiveKit agent...") logging.info("Starting LiveKit agent...") await ctx.connect()
t
Hey there! Your code for creating a SIP dispatch rule and plugging it into the LiveKit entrypoint looks really solid. Let me walk you through it step-by-step and share a few friendly tips to make it even better: 1. Create the Dispatch Rule: This dispatch rule is what makes sure each SIP caller gets routed to their own unique room with a special prefix (like
call-
). You’ve nailed this part in your code:
Copy code
rule = api.SIPDispatchRule(
    dispatch_rule_individual=api.SIPDispatchRuleIndividual(
        room_prefix='call-',
    )
)

request = api.CreateSIPDispatchRuleRequest(
    dispatch_rule=api.SIPDispatchRuleInfo(
        rule=rule,
        name='My dispatch rule',
        trunk_ids=[],
        room_config=api.RoomConfiguration(
            agents=[api.RoomAgentDispatch(
                agent_name="inbound-agent",
                metadata="job dispatch metadata",
            )]
        )
    )
)

dispatch = await lkapi.sip.create_sip_dispatch_rule(request)
print("created dispatch", dispatch)
Just a couple of friendly suggestions: • Make sure
trunk_ids
includes the right SIP trunk IDs so your rule links up perfectly with your inbound trunk. • Double-check the
room_prefix
to avoid any clashes with existing room names. 2. Set Up Langfuse: You’ve set up Langfuse nicely with session metadata. Just be sure that
ctx.room.name
and
ctx.room.metadata
are properly set before this step:
Copy code
setup_langfuse(
    metadata={
        "langfuse.session.id": ctx.room.name,
        "langfuse.sesssion": ctx.room.metadata,
    }
)
A couple of quick tips here: • Take a peek at the spelling of
"langfuse.sesssion"
— looks like it might be a typo and should be
"langfuse.session"
. • Think about adding extra metadata like caller ID or call type to make tracing even easier. 3. Connect to the Room: Once your dispatch rule and Langfuse are set, you connect to the room with:
Copy code
await ctx.connect()
This step gets your agent all set and ready to handle those incoming calls routed by your dispatch rule. 4. Full Example: Here’s the full code with a few small tweaks for you:
Copy code
lkapi = api.LiveKitAPI(
    url=os.getenv("LIVEKIT_URL"),
    api_key=os.getenv("LIVEKIT_API_KEY"),
    api_secret=os.getenv("LIVEKIT_API_SECRET"),
)

# Create a dispatch rule to place each caller in a separate room
rule = api.SIPDispatchRule(
    dispatch_rule_individual=api.SIPDispatchRuleIndividual(
        room_prefix='call-',
    )
)

request = api.CreateSIPDispatchRuleRequest(
    dispatch_rule=api.SIPDispatchRuleInfo(
        rule=rule,
        name='My dispatch rule',
        trunk_ids=["your_trunk_id"],  # Replace with actual trunk IDs
        room_config=api.RoomConfiguration(
            agents=[api.RoomAgentDispatch(
                agent_name="inbound-agent",
                metadata="job dispatch metadata",
            )]
        )
    )
)

dispatch = await lkapi.sip.create_sip_dispatch_rule(request)
print("created dispatch", dispatch)
await lkapi.aclose()

# Set up Langfuse
setup_langfuse(
    metadata={
        "langfuse.session.id": ctx.room.name,
        "langfuse.session": ctx.room.metadata,
    }
)

print("🚀  Starting LiveKit agent...")
await ctx.connect()
If anything’s unclear or if you want to dive deeper into any part, just give me a shout—I’m here to help!
p
is it create separate room for each user
t
Absolutely! The code you shared sets up a separate room for each user by using the
dispatch_rule_individual
configuration. Here’s a quick rundown of how it works: 1. Dispatch Rule with `dispatch_rule_individual`: This rule makes sure every incoming SIP call gets its own unique room. The
room_prefix
(like
call-
) helps create room names on the fly for each caller. For example: • First caller:
call-12345
• Second caller:
call-67890
2. Room Isolation: Each room created this way is completely separate, so people in different rooms won’t hear or interact with each other. This keeps conversations private and neat. 3. Room Configuration: The
room_config
part of your code assigns an agent to each room, making sure someone’s there to handle the user’s session in that specific room. 4. Validation: If you want to double-check that separate rooms are being made, you can: • Log the room names generated for each call. • Take a peek at the LiveKit dashboard to see active rooms and participants. Example: When a SIP call comes in, the dispatch rule creates a room named something like
call-abc123
, where
abc123
is a unique ID for that call. Every new call gets its own new room with a different name. Feel free to reach out if you want a hand testing or verifying this setup—I’m here to help!
p
give example that i can under stand trunk_ids=["your_trunk_id"], # Replace with actual trunk IDs
t
Here’s a little example to help you get the hang of using
trunk_ids
in the dispatch rule: 1. What’s a Trunk ID? Think of a trunk ID as a unique name tag for a SIP trunk you’ve set up in LiveKit. SIP trunks are what handle your inbound and outbound calls. Each trunk comes with its own special ID, which you can find on the LiveKit dashboard or grab programmatically through the API. 2. How to Find Trunk IDs Programmatically: If you’re not sure what your trunk ID is, no worries! You can easily list all your trunks using the LiveKit API like this:
Copy code
from livekit import api

lk_api = api.LiveKitAPI(
    url=os.getenv("LIVEKIT_URL"),
    api_key=os.getenv("LIVEKIT_API_KEY"),
    api_secret=os.getenv("LIVEKIT_API_SECRET"),
)

# List all SIP trunks
trunks = await lk_api.sip.list_sip_trunks()
for trunk in trunks.items:
    print(f"Trunk Name: {trunk.name}, Trunk ID: {trunk.sip_trunk_id}")
This will show you the names and IDs of all your SIP trunks. Just use the
sip_trunk_id
for the
trunk_ids
field when setting things up. 3. Here’s an Example with a Real Trunk ID: Once you’ve got your trunk ID, swap out
"your_trunk_id"
with the actual one like this:
Copy code
rule = api.SIPDispatchRule(
    dispatch_rule_individual=api.SIPDispatchRuleIndividual(
        room_prefix='call-',
    )
)

request = api.CreateSIPDispatchRuleRequest(
    dispatch_rule=api.SIPDispatchRuleInfo(
        rule=rule,
        name='My dispatch rule',
        trunk_ids=["trunk_12345"],  # Just replace this with your real trunk ID
        room_config=api.RoomConfiguration(
            agents=[api.RoomAgentDispatch(
                agent_name="inbound-agent",
                metadata="job dispatch metadata",
            )]
        )
    )
)

dispatch = await lk_api.sip.create_sip_dispatch_rule(request)
print("Created dispatch rule:", dispatch)
4. Double-Check Your Dispatch Rule: After you’ve created the rule, you can make sure it’s all set by listing your dispatch rules like this:
Copy code
dispatch_rules = await lk_api.sip.list_sip_dispatch_rule()
for rule in dispatch_rules.items:
    print(f"Dispatch Rule Name: {rule.name}, Trunk IDs: {rule.trunk_ids}")
5. Give It a Test: When a call comes in on the trunk you specified, the dispatch rule will send it to a unique room with the prefix
call-
. For example: • First call:
call-abc123
• Second call:
call-def456
If anything’s unclear or you want a hand with this, just let me know—I’m here to help!
p
is theri is a way create trunk id automatically
t
Absolutely! You can create a SIP trunk programmatically using the LiveKit API. Let me walk you through how to do it: 1. Create an Inbound SIP Trunk: Start by using the
CreateSIPInboundTrunkRequest
API to set up a new SIP trunk. Here's an example:
Copy code
from livekit import api

lk_api = api.LiveKitAPI(
    url=os.getenv("LIVEKIT_URL"),
    api_key=os.getenv("LIVEKIT_API_KEY"),
    api_secret=os.getenv("LIVEKIT_API_SECRET"),
)

# Create a new SIP trunk
inbound_trunk_request = api.CreateSIPInboundTrunkRequest(
    trunk=api.SIPInboundTrunkInfo(
        name="My Inbound Trunk",  # Give your trunk a name
        numbers=["+1234567890"],  # Swap this out for your actual phone numbers
        krisp_enabled=True  # Turn on noise cancellation for clearer calls
    )
)

inbound_trunk = await lk_api.sip.create_inbound_trunk(inbound_trunk_request)
print(f"Created SIP trunk: {inbound_trunk.sip_trunk_id}")
This will create your SIP trunk and give you its unique ID (
sip_trunk_id
). 2. Use the Trunk ID in Dispatch Rules: After creating the trunk, you can plug its ID into your dispatch rule like this:
Copy code
rule = api.SIPDispatchRule(
    dispatch_rule_individual=api.SIPDispatchRuleIndividual(
        room_prefix='call-',
    )
)

request = api.CreateSIPDispatchRuleRequest(
    dispatch_rule=api.SIPDispatchRuleInfo(
        rule=rule,
        name='My dispatch rule',
        trunk_ids=[inbound_trunk.sip_trunk_id],  # Here’s where you use the trunk ID
        room_config=api.RoomConfiguration(
            agents=[api.RoomAgentDispatch(
                agent_name="inbound-agent",
                metadata="job dispatch metadata",
            )]
        )
    )
)

dispatch = await lk_api.sip.create_sip_dispatch_rule(request)
print("Created dispatch rule:", dispatch)
3. Verify the Trunk: You can easily check all your SIP trunks to make sure yours was created:
Copy code
trunks = await lk_api.sip.list_sip_trunks()
for trunk in trunks.items:
    print(f"Trunk Name: {trunk.name}, Trunk ID: {trunk.sip_trunk_id}")
4. Testing: Once everything’s set up, try making a call to the phone number you specified (
+1234567890
in this example). The call should route to a unique room based on your dispatch rule. If you have any questions or want me to walk you through any part in more detail, just let me know—I’m here to help!
p
tell me about this code from livekit.api import ( AccessToken, RoomAgentDispatch, RoomConfiguration, VideoGrants, ) room_name = "my-room" agent_name = "test-agent" def create_token_with_agent_dispatch() -> str: token = ( AccessToken() .with_identity("my_participant") .with_grants(VideoGrants(room_join=True, room=room_name)) .with_room_config( RoomConfiguration( agents=[ RoomAgentDispatch(agent_name="test-agent", metadata='{"user_id": "12345"}') ], ), ) .to_jwt() ) return token
import asyncio from livekit import api room_name = "my-room" agent_name = "test-agent" async def create_explicit_dispatch(): lkapi = api.LiveKitAPI() dispatch = await lkapi.agent_dispatch.create_dispatch( api.CreateAgentDispatchRequest( agent_name=agent_name, room=room_name, metadata='{"user_id": "12345"}' ) ) print("created dispatch", dispatch) dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name) print(f"there are {len(dispatches)} dispatches in {room_name}") await lkapi.aclose() asyncio.run(create_explicit_dispatch())
t
Hey there! This code shows you how to create an access token in LiveKit with agent dispatch and room configuration. Let me walk you through what each part does: 1. Importing Required Classes: Here, the code brings in a few important classes from the LiveKit API: • `AccessToken`: This helps generate a token so participants can authenticate. • `RoomAgentDispatch`: This is used to specify which agent gets assigned to the room. • `RoomConfiguration`: This sets up the room, including assigning agents. • `VideoGrants`: Grants the permissions needed for video actions, like joining a room. 2. Room and Agent Details: We define the room and agent names with these variables:
Copy code
room_name = "my-room"
agent_name = "test-agent"
3. Creating the Access Token: The
create_token_with_agent_dispatch
function makes a JWT (JSON Web Token) that includes: • Identity: The participant’s ID is set to
"my_participant"
. • Grants: It gives permission for the participant to join the specified room (
room_name
). • Room Configuration: The room is set up with an agent dispatch, like this:
Copy code
RoomConfiguration(
    agents=[
        RoomAgentDispatch(
            agent_name="test-agent",
            metadata='{"user_id": "12345"}'
        )
    ],
)
This means the agent
test-agent
is assigned to the room, and you can include extra info like
user_id
in the metadata. 4. Generating the JWT: Using the
to_jwt()
method, the token is turned into a JWT string that the participant can use to authenticate:
Copy code
token = AccessToken()
    .with_identity("my_participant")
    .with_grants(VideoGrants(room_join=True, room=room_name))
    .with_room_config(RoomConfiguration(...))
    .to_jwt()
5. Returning the Token: Finally, the function hands back the generated JWT:
Copy code
return token
Use Case: This is handy when you want to: • Authenticate someone so they can join a specific room. • Dynamically assign an agent to that room. • Add metadata for the agent, like user-specific details. Example Output: The function returns a JWT string the client can use to join the room and connect with the assigned agent. If you have any questions or want me to walk you through more examples, just let me know—I'm happy to help!
Here’s a friendly breakdown of the two code snippets you shared: 1. Token Creation with Agent Dispatch: This snippet helps you create an access token for a participant to join a room, and it also sets up an agent dispatch configuration.
Copy code
from livekit.api import (
  AccessToken,
  RoomAgentDispatch,
  RoomConfiguration,
  VideoGrants,
)

room_name = "my-room"
agent_name = "test-agent"

def create_token_with_agent_dispatch() -> str:
    token = (
        AccessToken()
        .with_identity("my_participant")  # Gives a unique identity to the participant
        .with_grants(VideoGrants(room_join=True, room=room_name))  # Allows joining the room
        .with_room_config(
            RoomConfiguration(
                agents=[
                    RoomAgentDispatch(agent_name="test-agent", metadata='{"user_id": "12345"}')
                ],  # Sets which agent will be dispatched to the room
            ),
        )
        .to_jwt()  # Converts the token into a JWT string
    )
    return token
Key Points: • `AccessToken`: This is what you use to create a token for a participant. • `VideoGrants`: Grants permission for the participant to join the room. • `RoomConfiguration`: Lets you set up the room with an agent dispatch. • `RoomAgentDispatch`: Specifies which agent will be assigned to the room, plus any extra info you want to add. This token lets a participant join the room and interact with the agent you’ve set up. 2. Explicit Agent Dispatch: This snippet shows how to explicitly create an agent dispatch for a room using the LiveKit API.
Copy code
import asyncio
from livekit import api

room_name = "my-room"
agent_name = "test-agent"

async def create_explicit_dispatch():
    lkapi = api.LiveKitAPI()

    # Create an agent dispatch for the room
    dispatch = await lkapi.agent_dispatch.create_dispatch(
        api.CreateAgentDispatchRequest(
            agent_name=agent_name, room=room_name, metadata='{"user_id": "12345"}'
        )
    )
    print("created dispatch", dispatch)

    # List all dispatches in the room
    dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
    print(f"there are {len(dispatches)} dispatches in {room_name}")

    await lkapi.aclose()

asyncio.run(create_explicit_dispatch())
Key Points: • `create_dispatch`: This creates a dispatch for a specific agent and room. • `list_dispatch`: Lists all the active dispatches in that room. • `metadata`: Lets you add extra details like a user ID to the dispatch. This way is great if you want to assign agents to rooms on the fly and keep track of active dispatches. Use Case: Both snippets are here to help you manage agent assignments in LiveKit rooms. The first one creates a token for a participant with an agent dispatch set up, while the second one lets you create and list agent dispatches explicitly. You can definitely use them together to keep agent interactions smooth during your real-time communication sessions. If you want me to walk you through anything else or give you more examples, just let me know!
p
can i use it for inbound call room import asyncio from livekit import api room_name = "my-room" agent_name = "test-agent" async def create_explicit_dispatch(): lkapi = api.LiveKitAPI() dispatch = await lkapi.agent_dispatch.create_dispatch( api.CreateAgentDispatchRequest( agent_name=agent_name, room=room_name, metadata='{"user_id": "12345"}' ) ) print("created dispatch", dispatch) dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name) print(f"there are {len(dispatches)} dispatches in {room_name}") await lkapi.aclose() asyncio.run(create_explicit_dispatch())
t
Absolutely, you can use this code for inbound call rooms in LiveKit! Let me walk you through how it works and how you can tweak it for inbound calls: 1. Purpose of the Code: This snippet sets up an agent dispatch for a specific room (
room_name
) and assigns an agent (
agent_name
) to manage interactions there. The
metadata
field is super handy for adding extra info, like details about the user. 2. Adapting for Inbound Calls: For inbound calls, you can create a unique room for each call on the fly and assign an agent to handle it. Here’s a simple way to adjust the code:
Copy code
import asyncio
from livekit import api

async def handle_inbound_call(caller_id):
    # Generate a unique room name for the inbound call
    room_name = f"inbound-call-{caller_id}"
    agent_name = "test-agent"

    # Initialize LiveKit API client
    lkapi = api.LiveKitAPI(
        url=os.getenv("LIVEKIT_URL"),
        api_key=os.getenv("LIVEKIT_API_KEY"),
        api_secret=os.getenv("LIVEKIT_API_SECRET"),
    )

    # Create an agent dispatch for the room
    dispatch = await lkapi.agent_dispatch.create_dispatch(
        api.CreateAgentDispatchRequest(
            agent_name=agent_name,
            room=room_name,
            metadata=f'{{"caller_id": "{caller_id}"}}'
        )
    )
    print("Created dispatch for inbound call:", dispatch)

    # List all dispatches in the room
    dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
    print(f"There are {len(dispatches)} dispatches in {room_name}")

    await lkapi.aclose()

# Simulate handling an inbound call
asyncio.run(handle_inbound_call("+1234567890"))
3. Key Changes for Inbound Calls:Dynamic Room Name: The room name is created dynamically using the caller ID (like
inbound-call-+1234567890
). • Metadata: The metadata now includes the caller ID, which is great for logging or routing. 4. Workflow: 1. When an inbound call comes in, generate a unique room name for it. 2. Create an agent dispatch for that room, assigning an agent to take the call. 3. You can also log or keep track of dispatches to monitor things smoothly. 5. Testing: To try this out, just simulate an inbound call by running the
handle_inbound_call
function with a sample caller ID. Check that a new room pops up and your agent is assigned properly. If you have any questions or want me to share more examples, just let me know—I’m here to help!
p
caller id medatory can create uniq id
t
Hey there! Just to clarify, the caller ID isn’t mandatory. If it’s missing or you’d rather not use it, you can easily generate a unique ID for each inbound call. Here’s a friendly guide on how to tweak your code to create a unique ID on the fly: 1. Generate a Unique ID: You can use Python’s
uuid
module to whip up a unique identifier for every call:
Copy code
import uuid

# Generate a unique ID for the call
unique_id = str(uuid.uuid4())
room_name = f"inbound-call-{unique_id}"
This way, each call gets its own unique room name, even if the caller ID is missing. 2. Updated Code: Here’s the updated snippet to handle inbound calls without needing a caller ID:
Copy code
import asyncio
import uuid
from livekit import api

async def handle_inbound_call():
    # Generate a unique ID for the call
    unique_id = str(uuid.uuid4())
    room_name = f"inbound-call-{unique_id}"
    agent_name = "test-agent"

    # Initialize LiveKit API client
    lkapi = api.LiveKitAPI(
        url=os.getenv("LIVEKIT_URL"),
        api_key=os.getenv("LIVEKIT_API_KEY"),
        api_secret=os.getenv("LIVEKIT_API_SECRET"),
    )

    # Create an agent dispatch for the room
    dispatch = await lkapi.agent_dispatch.create_dispatch(
        api.CreateAgentDispatchRequest(
            agent_name=agent_name,
            room=room_name,
            metadata=f'{{"unique_id": "{unique_id}"}}'
        )
    )
    print("Created dispatch for inbound call:", dispatch)

    # List all dispatches in the room
    dispatches = await lkapi.agent_dispatch.list_dispatch(room_name=room_name)
    print(f"There are {len(dispatches)} dispatches in {room_name}")

    await lkapi.aclose()

# Simulate handling an inbound call
asyncio.run(handle_inbound_call())
3. Key Changes:Unique ID: We generate a UUID for each call, so every room name stays unique. • Metadata: The unique ID is tucked into the metadata to help with tracking. 4. Workflow: 1. When a call comes in, generate a unique ID for it. 2. Create a room name using that ID (like
inbound-call-abc123
). 3. Set up an agent dispatch for that room, so someone’s ready to take the call. 5. Benefits: • Keeps room names unique for every call, no matter what. • Gives you flexibility if caller ID isn’t available or you want to skip it. If you want me to walk you through anything else or need more examples, just give me a shout!
p
just check it and give some ansewr now my code handel multipel user whiout having conversation overlap issue async def entrypoint(ctx: JobContext): """Main entrypoint for the LiveKit agent""" try: asyncio.run(handle_inbound_call(caller_id=generate_auth_token())) # Set up Langfuse setup_langfuse( metadata={ "langfuse.session.id": ctx.room.name, "langfuse.session": ctx.room.metadata, } ) print("🚀 Starting LiveKit agent...") await ctx.connect() print(" Agent created with tools") print(" Connecting to room") # Create the information tool info_tool = create_tool_function() print(" Information tool created") # Wait for participant print(" Waiting for participant...") participant = await ctx.wait_for_participant() # Initialize transcription logger transcription_logger = TranscriptionLogger( mongo_uri=os.getenv("MONOGO_DB_URI"), db_name=os.getenv("MONGODB_DB_NAME_TRANSCRIPTION") ) # Intilaize call info logger caller_info= CallLogger( mongo_uri=os.getenv("MONOGO_DB_URI"), db_name="caller_info" ) # Create session with error handling for each component print("🔧 Creating session components...") try: vad = silero.VAD.load( min_silence_duration=2, # at least 2s of speech to start min_speech_duration=1.5, # wait 1.5s of silance before ending ) print(" VAD loaded") llm = aws.LLM( api_key=os.getenv("AWS_API_KEY"), api_secret=os.getenv("AWS_API_SECRET"), region="eu-central-1", model="arnawsbedrockeu central 1914194815954inference profile/eu.amazon.nova pro v10", max_output_tokens=500 ) print(" LLM configured") except Exception as e: print(f" TTS error: {e}") raise CustomException(e,sys) print(" Session created") session = AgentSession( use_tts_aligned_transcript=True, user_away_timeout=30, max_tool_steps=10 ) print(" Transcription logging set up") # Set up transcription logging # Check if this is a SIP call and initialize logging is_sip_call = ( hasattr(participant, 'kind') and participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP ) current_call_id = caller_info.start_call(participant) # Start transcription logging for both SIP and web calls if current_call_id: call_type = "SIP" if is_sip_call else "WEB" print(f"📞 {call_type} Call detected - Call ID: {current_call_id}") if is_sip_call: print(f"📱 Caller: {participant.attributes.get('sip.phoneNumber', 'Unknown')}") transcription_logger.start_transcription(session_id=current_call_id, session=session, mode=call_type) logging.info(f"Call logging started for call ID: {current_call_id}") else: print("⚠️ Failed to start call logging") transcription_logger.start_transcription(session_id=unique_room_name, session=session, mode="WEB") logging.info(f"Call logging started for call ID: {current_call_id}") agent = IrishAgent( llm_instance=llm, tool=[info_tool], vad=vad ) @session.on("metrics_collected") def _on_metrics_collected(ev: MetricsCollectedEvent): metrics.log_metrics(ev.metrics) # Start the session await session.start(agent=agent, room=ctx.room,room_input_options=RoomInputOptions(close_on_disconnect=False)) print(" Session started") print(" Initial greeting sent") print("🎙️ Agent is now active and listening...") logging.info("Agent is now active and listening...") # Send initial greeting await session.generate_reply(instructions="Hello! How can I assist you today?") # Set up comprehensive transcription event handlers @session.on("user_speech_committed") def on_user_speech(event): """Handle user speech transcription""" if transcription_logger and hasattr(event, 'user_transcript') and event.user_transcript: transcription_logger.log_user_speech(event.user_transcript) @session.on("agent_speech_committed") def on_agent_speech(event): """Handle agent speech transcription""" if transcription_logger and hasattr(event, 'agent_transcript') and event.agent_transcript: transcription_logger.log_agent_response(event.agent_transcript) # Additional event handlers for comprehensive logging @session.on("conversation_item_added") def on_conversation_item(event): """Handle conversation items (covers both user and agent messages)""" if transcription_logger and hasattr(event, 'item'): item = event.item if hasattr(item, 'content') and item.content: # Determine speaker based on item type or role speaker = "USER" if hasattr(item, 'role') and item.role == 'user' else "AGENT" transcription_logger.log_transcription(item.content, is_final=True, speaker=speaker) # Handle session end and participant disconnect @ctx.room.on("participant_disconnected") def on_participant_disconnected(participant): """Handle participant disconnect - save transcription to MongoDB""" if current_call_id and transcription_logger: print(f"📞 Participant disconnected - saving transcription for call {current_call_id}") transcription_logger.close_transcription() caller_info.end_call(current_call_id) print(f" Call {current_call_id} ended and transcription saved to MongoDB") # Handle room disconnect @ctx.room.on("disconnected") def on_room_disconnected(): """Handle room disconnect - ensure transcription is saved""" if current_call_id and transcription_logger: print(f"🏠 Room disconnected - saving transcription for call {current_call_id}") transcription_logger.close_transcription() caller_info.end_call(current_call_id) transcription_logger.remove_txt() print(f" Call {current_call_id} ended and transcription saved to MongoDB") except Exception as e: error_msg = f"Voice agent error: {e}" print(f" {error_msg}") logging.error(error_msg) raise CustomException(e, sys)
t
Your code looks really well-organized and seems to handle multiple users without mixing up conversations. Let me walk you through a quick review and highlight the key points: 1. Unique Room Creation: Using
generate_auth_token()
to assign a unique ID for each inbound call is a smart move. This way, each user gets their own unique room, which keeps conversations nicely separated. 2. Langfuse Integration: Langfuse is set up just right with session metadata, so you can track each session individually. Just make sure
ctx.room.name
and
ctx.room.metadata
are properly initialized before this step to keep everything smooth. 3. Transcription Logging: Initializing the transcription logger for each session and calling
start_transcription
with the right session ID means transcription data stays organized per user. Great job on that! 4. Agent Session: The
AgentSession
is configured well, with options like
use_tts_aligned_transcript
,
user_away_timeout
, and
max_tool_steps
tailored to your needs. This really helps customize the session experience. 5. Event Handlers: You’ve set up thorough event handlers for user speech, agent speech, and conversation items, making sure every interaction is properly logged and handled. That’s key for keeping things running smoothly. 6. Session Cleanup: The handlers for
on_participant_disconnected
and
on_room_disconnected
take care of cleaning up resources when sessions end, including closing transcription logs and ending calls. Nice and tidy! 7. Handling Multiple Users: By creating unique rooms and sessions dynamically for each user, your code effectively keeps conversations separate and avoids overlap. Each user gets their own isolated session with dedicated logging and agent interactions. Suggestions: • Just double-check that
ctx.room.name
and
ctx.room.metadata
are correctly initialized for every session. • It might help to add more detailed error handling in the
handle_inbound_call
function to catch and recover from any hiccups during room creation or dispatch. • Try testing with multiple users at once to make sure sessions stay isolated and there aren’t any resource conflicts. If you want, I’m here to help with any further questions or improvements you might need!
p
how to get room id
t
Hey there! If you’re looking to grab the room ID (also called the Room SID) in LiveKit, here’s a simple way to do it depending on which SDK you're using: 1. Python SDK: With the Python SDK, you can get the room SID by accessing the
sid
property like this:
Copy code
# Example in Python
room_sid = ctx.room.sid
print(f"Room SID: {room_sid}")
Just a heads-up, the
sid
property works asynchronously and will give you the unique room identifier once it’s ready. 2. A Couple of Quick Notes: • The LiveKit server assigns the room SID when you connect, so each room session gets its own unique ID. • Make sure your room is connected before trying to grab the SID, or it won’t be available yet. 3. Why You Might Need It: The room SID comes in handy for things like logging, debugging, or linking specific sessions with other systems you’re using. If you want me to walk you through this for other SDKs or need more examples, just let me know—I’m here to help! Sources: livekit-rtc/livekit/rtc/_proto/room_pb2.py | livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt | lib/src/core/room.dart | livekit/src/room/mod.rs | src/room/Room.ts
p
fine can we create languse custome class add stt llm or tts node to track their input and out put
t
Absolutely, you can create a custom Langfuse class to keep an eye on the input and output of your STT, LLM, and TTS nodes. By overriding these nodes in your custom agent class, you’ll be able to catch and log the data for tracking. Here’s a friendly walkthrough on how to set it up: 1. Create a Custom Langfuse Class: This class will take care of logging and tracking for your STT, LLM, and TTS nodes:
Copy code
class LangfuseTracker:
    def __init__(self):
        self.logs = []

    def log(self, node_type, input_data, output_data):
        log_entry = {
            "node_type": node_type,
            "input": input_data,
            "output": output_data
        }
        self.logs.append(log_entry)
        print(f"Logged {node_type}: {log_entry}")
2. Override STT, LLM, and TTS Nodes: In your custom agent class, override the nodes to log their input and output like this:
Copy code
from livekit.agents import Agent

class CustomAgent(Agent):
    def __init__(self, langfuse_tracker):
        super().__init__()
        self.langfuse_tracker = langfuse_tracker

    async def stt_node(self, audio, model_settings):
        async for event in super().stt_node(audio, model_settings):
            if hasattr(event, 'alternatives') and event.alternatives:
                input_audio = audio  # Log raw audio if needed
                output_text = event.alternatives[0].text
                self.langfuse_tracker.log("STT", input_audio, output_text)
            yield event

    async def llm_node(self, chat_ctx, tools, model_settings=None):
        async for chunk in super().llm_node(chat_ctx, tools, model_settings):
            input_text = chat_ctx.messages[-1].content  # Last user message
            output_text = chunk.delta.content if hasattr(chunk, 'delta') else str(chunk)
            self.langfuse_tracker.log("LLM", input_text, output_text)
            yield chunk

    async def tts_node(self, text, model_settings):
        async for audio_frame in super().tts_node(text, model_settings):
            input_text = text
            output_audio = audio_frame  # Log raw audio frames if needed
            self.langfuse_tracker.log("TTS", input_text, output_audio)
            yield audio_frame
3. Integrate the Custom Agent: Just pass your Langfuse tracker to your custom agent like this:
Copy code
langfuse_tracker = LangfuseTracker()
agent = CustomAgent(langfuse_tracker=langfuse_tracker)
4. Benefits: • You’ll be tracking input and output for STT, LLM, and TTS nodes in real-time. • The logs come in handy for debugging, analytics, or auditing. • It’s super flexible—you can add extra metadata or customize the processing however you like. If you want, I’m here to help with more examples or any questions you have!
p
this function relevent with your code
t
Yes, the function you shared fits right in with the code structure I provided. It helps you keep track of what’s going in and coming out of the STT, LLM, and TTS nodes by overriding their methods in a custom agent class. Here’s how it all ties together: 1. LangfuseTracker Class: The
LangfuseTracker
class is built to log inputs and outputs from each node. You can even extend it to send these logs to external tools like Langfuse for telemetry and analytics. 2. Overridden Node Methods: In your custom agent class, the
stt_node
,
llm_node
, and
tts_node
methods get overridden so you can catch and log the data: • STT Node: Captures raw audio input and the transcribed text output. • LLM Node: Logs the user’s input text and the LLM’s response. • TTS Node: Logs the text input and the generated audio output. 3. Integration: You pass the
LangfuseTracker
instance into your custom agent, so all nodes share the same tracker for logging. This keeps everything neat and centralized. 4. Use Case: This setup is super helpful for: • Debugging: Pinpointing any hiccups in the STT, LLM, or TTS processes. • Analytics: Understanding user interactions and how the agent responds. • Auditing: Keeping a clear record of inputs and outputs for compliance or review. 5. Example Workflow: Here’s what happens when a user speaks: 1. The
stt_node
handles the audio and logs the transcribed text. 2. The
llm_node
processes that text and logs the LLM’s reply. 3. The
tts_node
takes the response text and logs the generated audio. If you want me to clarify anything or help add more features, just let me know—I’m here to help!
p
yes
t
Awesome! If you run into any questions or need a hand while working on this, just give me a shout. Wishing you the best with your project!
p
full code
t
Here’s the full code for creating a custom Langfuse tracker and integrating it with a custom agent to track input and output for STT, LLM, and TTS nodes: 1. LangfuseTracker Class:
Copy code
class LangfuseTracker:
    def __init__(self):
        self.logs = []

    def log(self, node_type, input_data, output_data):
        log_entry = {
            "node_type": node_type,
            "input": input_data,
            "output": output_data
        }
        self.logs.append(log_entry)
        print(f"Logged {node_type}: {log_entry}")
2. Custom Agent Class:
Copy code
from livekit.agents import Agent

class CustomAgent(Agent):
    def __init__(self, langfuse_tracker):
        super().__init__()
        self.langfuse_tracker = langfuse_tracker

    async def stt_node(self, audio, model_settings):
        async for event in super().stt_node(audio, model_settings):
            if hasattr(event, 'alternatives') and event.alternatives:
                input_audio = audio  # Log raw audio if needed
                output_text = event.alternatives[0].text
                self.langfuse_tracker.log("STT", input_audio, output_text)
            yield event

    async def llm_node(self, chat_ctx, tools, model_settings=None):
        async for chunk in super().llm_node(chat_ctx, tools, model_settings):
            input_text = chat_ctx.messages[-1].content  # Last user message
            output_text = chunk.delta.content if hasattr(chunk, 'delta') else str(chunk)
            self.langfuse_tracker.log("LLM", input_text, output_text)
            yield chunk

    async def tts_node(self, text, model_settings):
        async for audio_frame in super().tts_node(text, model_settings):
            input_text = text
            output_audio = audio_frame  # Log raw audio frames if needed
            self.langfuse_tracker.log("TTS", input_text, output_audio)
            yield audio_frame
3. Integration with Entry Point:
Copy code
import asyncio
from livekit import api

async def entrypoint(ctx):
    try:
        # Initialize Langfuse tracker
        langfuse_tracker = LangfuseTracker()

        # Create the custom agent
        agent = CustomAgent(langfuse_tracker=langfuse_tracker)

        # Set up the session
        session = AgentSession(
            use_tts_aligned_transcript=True,
            user_away_timeout=30,
            max_tool_steps=10
        )

        # Start the session
        await session.start(agent=agent, room=ctx.room, room_input_options=RoomInputOptions(close_on_disconnect=False))

        print("✅  Session started")

    except Exception as e:
        print(f"❌  Error: {e}")
        raise
4. Workflow: 1. When a user speaks, the
stt_node
logs the audio input and transcribed text. 2. The
llm_node
logs the user’s input text and the LLM’s response. 3. The
tts_node
logs the response text and the generated audio. 5. Benefits: • Tracks all interactions in real-time for debugging, analytics, and auditing. • Centralized logging with the
LangfuseTracker
class. • Customizable to include additional metadata or processing logic. If you have any questions or want to tweak anything, just let me know! I’m here to help.
p
i alrady have a gent that switch language now how to add this custom agent to that agent class
t
Hey there! To smoothly integrate the custom Langfuse tracker into your existing agent class that supports language switching, you can simply extend your current agent class and override the necessary nodes (STT, LLM, TTS) to add the tracking functionality. Here’s a friendly guide on how to do it: 1. Extend Your Existing Agent Class: Let’s say your existing agent class is called
LanguageSwitchingAgent
. You can extend it to include the Langfuse tracker like this:
Copy code
class TrackedLanguageSwitchingAgent(LanguageSwitchingAgent):
    def __init__(self, langfuse_tracker, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.langfuse_tracker = langfuse_tracker

    async def stt_node(self, audio, model_settings):
        async for event in super().stt_node(audio, model_settings):
            if hasattr(event, 'alternatives') and event.alternatives:
                input_audio = audio  # Log raw audio if needed
                output_text = event.alternatives[0].text
                self.langfuse_tracker.log("STT", input_audio, output_text)
            yield event

    async def llm_node(self, chat_ctx, tools, model_settings=None):
        async for chunk in super().llm_node(chat_ctx, tools, model_settings):
            input_text = chat_ctx.messages[-1].content  # Last user message
            output_text = chunk.delta.content if hasattr(chunk, 'delta') else str(chunk)
            self.langfuse_tracker.log("LLM", input_text, output_text)
            yield chunk

    async def tts_node(self, text, model_settings):
        async for audio_frame in super().tts_node(text, model_settings):
            input_text = text
            output_audio = audio_frame  # Log raw audio frames if needed
            self.langfuse_tracker.log("TTS", input_text, output_audio)
            yield audio_frame
2. Initialize the Tracker and Use the New Agent: When you set up your entrypoint or session, just initialize the Langfuse tracker and use this new agent class like so:
Copy code
async def entrypoint(ctx):
    try:
        # Initialize Langfuse tracker
        langfuse_tracker = LangfuseTracker()

        # Create the tracked language-switching agent
        agent = TrackedLanguageSwitchingAgent(
            langfuse_tracker=langfuse_tracker,
            # Pass other arguments required by your LanguageSwitchingAgent
            tools=[info_tool],
            llm_instance=llm,
            vad=vad
        )

        # Set up the session
        session = AgentSession(
            use_tts_aligned_transcript=True,
            user_away_timeout=30,
            max_tool_steps=10
        )

        # Start the session
        await session.start(agent=agent, room=ctx.room, room_input_options=RoomInputOptions(close_on_disconnect=False))

        print("✅  Session started")

    except Exception as e:
        print(f"❌  Error: {e}")
        raise
3. Why This Works Well: • You get to reuse your existing language-switching logic without any hassle. • Tracking for STT, LLM, and TTS nodes is added smoothly and seamlessly. • All your logs are centralized with the Langfuse tracker, making debugging and analytics a breeze. 4. A Quick Testing Checklist: Before you wrap up, just check that: • Language switching behaves exactly as you expect. • Input and output for STT, LLM, and TTS nodes are properly logged. If you have any questions or want to explore more enhancements, just let me know—I’m here to help!