provide exmaple of using llm_node i have put guard...
# ask-ai
p
provide exmaple of using llm_node i have put guardrails logic in it and if it is triggered i want to stop the response from getting to the tts and then to the user. if the llm response was triggered i want to take the user input + custom system prompt then resend it to the llm to make sure the second response is not violating the guardrails can you show me an example of it
t
Thanks for reaching out, Yasser. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
Hey there! If you want to add guardrails in
llm_node
to make sure responses that break the rules don’t get sent to TTS, here’s a simple way to do it: 1. Override the
llm_node
Method:
Tweak the
llm_node
so it keeps an eye on the LLM output and takes action when something’s off. 2. Add Guardrail Logic: Build in some checks to spot when the output isn’t following the rules. If it detects a problem, stop that response and resend the user input with a special system prompt. 3. Stop the Response: Use
self.session.interrupt()
to pause the current response before it goes any further. 4. Resend User Input: Make a new
ChatContext
with your custom system prompt and send the user input again. Here’s a quick example:
Copy code
async def llm_node(self, chat_ctx, tools, model_settings):
    async for chunk in Agent.default.llm_node(self, chat_ctx, tools, model_settings):
        if should_apply_guardrail(chunk):
            self.session.interrupt()
            new_context = ChatContext(
                messages=[
                    ChatMessage(type="message", role="system", content="Your custom system prompt here."),
                    *chat_ctx.messages
                ]
            )
            await self.update_chat_ctx(ChatContext())
            async for new_chunk in Agent.default.llm_node(self, new_context, tools, model_settings):
                yield new_chunk
                break
        else:
            yield chunk
Here’s an example of the guardrail condition:
Copy code
def should_apply_guardrail(chunk):
    banned_words = ["prohibited_keyword"]
    content = str(chunk) if isinstance(chunk, llm.ChatChunk) else chunk
    return any(word in content.lower() for word in banned_words)
This way, any response that doesn’t follow the guardrails gets caught early, and the user input is processed again with your custom prompt to make sure everything stays on track. Hope this helps! Sources: pipeline-llm/interrupt_user.py | livekit-agents/livekit/agents/voice/agent_activity.py
p
i want to llm response that will be checked is the latest response only without accumulating the whole conversation
t
Hey there! To make sure we're only checking the latest LLM response without piling up the whole conversation, you can tweak the conversation context to keep just the most recent message. Here's a neat way to do that using the
before_llm_cb
callback:
Copy code
async def before_llm_cb(assistant: VoicePipelineAgent, chat_ctx: llm.ChatContext):
    """
    Callback to only keep the latest message in the context before LLM response.
    """
    # Keep only the last message to focus on the latest response
    if len(chat_ctx.messages) > 1:
        chat_ctx.messages = [chat_ctx.messages[-1]]
Here's what's going on:
before_llm_cb
callback:
This runs right before the LLM replies, giving you a chance to adjust the
ChatContext
. • Truncating the context: By keeping only the latest message, the LLM focuses on the newest info without getting bogged down by the whole chat history. If you'd like me to walk you through it more or share some extra examples, just let me know—I’m here to help! Sources: VoicePipelineAgent | LiveKit Docs | Vision Agent Quickstart | LiveKit Docs | Handling RAG Delays in Voice Agents | LiveKit Docs
p
put it together with llm_node
t
Here’s a complete example of using
llm_node
to check only the latest response and apply guardrails:
Copy code
async def llm_node(
    self, 
    chat_ctx: ChatContext,
    tools: list[FunctionTool],
    model_settings: ModelSettings
) -> AsyncIterable[llm.ChatChunk]:
    # Keep only the latest message in the context
    if len(chat_ctx.messages) > 1:
        chat_ctx.messages = [chat_ctx.messages[-1]]

    async for chunk in Agent.default.llm_node(self, chat_ctx, tools, model_settings):
        # Define guardrail condition
        def should_apply_guardrail():
            banned_words = ["prohibited_keyword"]
            content = str(chunk) if isinstance(chunk, llm.ChatChunk) else chunk
            return any(word in content.lower() for word in banned_words)

        if should_apply_guardrail():
            # Stop the current response
            self.session.interrupt()

            # Create new context with custom system prompt
            new_context = ChatContext(
                messages=[
                    ChatMessage(
                        type="message",
                        role="system",
                        content="Your custom system prompt here."
                    ),
                    *chat_ctx.messages
                ]
            )

            # Clear current context
            await self.update_chat_ctx(ChatContext())

            # Resend user input with new context
            async for new_chunk in Agent.default.llm_node(self, new_context, tools, model_settings):
                yield new_chunk
                break
        else:
            yield chunk
Key Points:Latest Message Only: We trim the context down to just the most recent message before processing, keeping things focused. • Guardrails: There’s a check for any banned words in the response. If something’s off, we pause the response and try again with a custom system prompt to keep things on track. • Custom System Prompt: We create a new
ChatContext
to gently guide the LLM towards giving a response that fits within the rules. If you have any questions or want me to tweak anything, just let me know—I’m here to help! Sources: BeforeLLMCallback | LiveKit Agents
p
this error occur 2025-08-08 014643,558 - ERROR livekit.agents - Error in _llm_inference_task Traceback (most recent call last): in llm_node for msg in chat_ctx.messages: ^^^^^^^^^^^^^^^^^ AttributeError: 'ChatContext' object has no attribute 'messages'
t
Hey there! The error you're seeing happens because the
ChatContext
object in your version of LiveKit doesn’t have a
messages
attribute anymore. In the newer versions, they switched it out for
items
. No worries though! To get things working, just update your code like this:
Copy code
# Instead of:
chat_ctx.messages

# Use:
chat_ctx.items
If you want to grab only the messages, you can do something like this:
Copy code
from livekit.agents import ChatMessage

# Filter for ChatMessage items
messages = [item for item in chat_ctx.items if isinstance(item, ChatMessage)]
This update came with LiveKit Agents v1.0 to better support different LLM providers and their needs. If you hit any bumps or want me to walk you through it, just let me know—I’m here to help!
p
how to pass it to the tts to get to the user?
t
Here’s a simple way to pass the LLM response to TTS in LiveKit—just follow these steps: 1. Process the LLM Output: Make sure the LLM response is formatted just right. You can use a callback function to tweak the response before it goes to TTS. For example:
Copy code
def _before_tts_cb(text: str | AsyncIterable[str]) -> str | AsyncIterable[str]:
    if isinstance(text, str):
        # Handle non-streaming text
        return text.replace("unwanted_token", "")
    else:
        # Handle streaming text
        async def process_stream():
            async for chunk in text:
                yield chunk.replace("unwanted_token", "")
        return process_stream()

agent = VoicePipelineAgent(
    before_tts_cb=_before_tts_cb,
    # ... other configurations ...
)
1. Update TTS Settings: If your LLM response has voice instructions, you can easily update the TTS options on the fly. Here’s how:
Copy code
class MyAgent(Agent):
    async def tts_node(self, text: AsyncIterable[str], model_settings: ModelSettings):
        instruction_updated = False

        def output_processed(resp):
            nonlocal instruction_updated
            if resp.get("voice_instructions") and not instruction_updated:
                instruction_updated = True
                self.tts.update_options(instructions=resp["voice_instructions"])

        return Agent.default.tts_node(
            self, process_structured_output(text, callback=output_processed), model_settings
        )
If you want me to walk you through any part of this or need more examples, just let me know—I’m happy to help!
p
can i do it within llm_node?
t
Just a heads-up: TTS processing can’t be done directly inside the
llm_node
in LiveKit. The
llm_node
takes care of generating text responses, and then a separate
tts_node
handles turning that text into audio. If you want to tweak the LLM response before it gets to the TTS, you can definitely do that within the
llm_node
, and then send the updated response over to the
tts_node
for audio conversion. Feel free to reach out if you'd like a hand with setting this up!
p
can i use within llm_node session.say to the regenerated output from the llm? or what do you suggest
t
Hey there! You can use
session.say
inside
llm_node
to send the regenerated LLM output to TTS, but it’s usually better not to. Using
session.say
directly in
llm_node
might skip over the natural STT-LLM-TTS pipeline flow, which could lead to some hiccups or unnecessary repetition. Here’s a friendly tip: Instead of calling
session.say
, try tweaking the LLM output right inside
llm_node
and let the pipeline take care of the TTS part. This way, everything runs smoothly without any interruptions. If you’d like a hand setting this up, just give me a shout—I’m happy to help!
p
what do you think of this as retry to the llm async for chunk in super().llm_node(safe_chat_ctx, tools, model_settings): safe_chunk_count += 1 logger.debug(f"[Safe Response] Processing safe chunk #{safe_chunk_count}")
t
Using
super().llm_node
with
safe_chat_ctx
and chunk processing can definitely be a good way to add custom retry logic in LiveKit. Just a couple of things to keep in mind: • Built-in Retry Mechanism: LiveKit already has a retry system with exponential backoff for recoverable errors. This is built right into the default LLM processing pipeline and will retry up to the
max_retry
limit you set. • Custom Retry Logic: If you have unique needs that the built-in retry doesn’t cover, you can override
llm_node
and call
super().llm_node
to process chunks while adding your own retry logic. Your approach of logging and processing chunks with
safe_chat_ctx
makes sense if you want to manage retries or errors in a way that fits your use case. Just double-check that you’re not unintentionally duplicating the built-in retry process. Feel free to ask if you'd like me to share an example of how to implement custom retry logic—I’m here to help! Sources: pipeline-llm/replacing_llm_output.py | livekit-plugins/livekit-plugins-langchain/livekit/plugins/langchain/langgra... | Pipeline nodes and hooks | LiveKit Docs
p
"LiveKit already has a retry system" what is it, how it works? and can you provide example for it
t
LiveKit comes with a handy built-in retry system for LLMs that helps keep things running smoothly by handling those pesky temporary hiccups. Here’s a quick rundown of how it works: • Exponential Backoff: It retries failed requests with gradually longer waits, so it doesn’t overload the service. • FallbackAdapter: If the main LLM provider has issues, it automatically switches over to a backup one. • Error Differentiation: It knows the difference between errors that can be fixed by retrying and ones that can’t, so it only retries the ones worth it. • Background Recovery: Keeps an eye on any failed LLM instances and brings them back into the mix once they’re good to go again. • Configurable Policies: You can tweak how many retries happen, timeouts, and the intervals to fit your needs. Example:
Copy code
# Configure retry options
retry_options = APIConnectOptions(
    max_retry=3,    # Maximum retries per LLM
    timeout=10.0,   # Timeout per attempt (seconds)
    retry_interval=1.0  # Base interval for exponential backoff
)

# Initialize the FallbackAdapter
adapter = FallbackAdapter(
    llm_instances=[PrimaryLLM(), SecondaryLLM(), TertiaryLLM()],
    max_retry_per_llm=retry_options.max_retry,
    retry_interval=retry_options.retry_interval
)

# Example of handling a request
async def _try_generate(self, llm: LLM) -> AsyncIterable[ChatChunk]:
    try:
        async with llm.chat(
            chat_ctx=self.chat_ctx,
            tools=self.tools,
            conn_options=dataclasses.replace(
                self.conn_options,
                max_retry=self.fallback_adapter.max_retry_per_llm,
                timeout=self.fallback_adapter.attempt_timeout,
                retry_interval=self.fallback_adapter.retry_interval
            )
        ) as stream:
            async for chunk in stream:
                yield chunk
    except APIError as e:
        if e.retryable:
            logger.warning(f"{llm.label} failed, switching to next LLM")
            raise
This setup helps keep things reliable and available by retrying requests and switching to backups when needed. If you want me to walk through anything in more detail or share more examples, just let me know—I’m here to help!
p
no that's not what i need i need llm_node
t
Hey there! Just wanted to walk you through how the retry logic works in
llm_node
, which is managed by the
LLMStream
class. It’s designed to handle those pesky transient errors using exponential backoff. Here’s the scoop: • Exponential Backoff: When a request fails, the system waits a bit longer before trying again. For example: ◦ First retry: 1 second ◦ Second retry: 2 seconds ◦ Third retry: 4 seconds • Recoverable Errors: It only retries for temporary hiccups like network issues. If the error is something permanent, like an invalid request, it won’t keep retrying. • Configurable Options: You have control over things like max retries, timeout, and retry intervals through
APIConnectOptions
. Here’s a quick example:
Copy code
# Set up your retry options
conn_options = APIConnectOptions(
    max_retry=3,  # How many times to retry
    retry_interval=1.0,  # Starting wait time between retries (seconds)
    timeout=30.0  # How long to wait for each attempt (seconds)
)

# Use the retry logic in llm_node
async for chunk in super().llm_node(chat_ctx, tools, conn_options):
    # Handle each chunk as it comes in
    print(chunk)
This way,
llm_node
smartly retries transient errors without you having to worry. If anything’s unclear or you want more examples, just give me a shout—I’m happy to help! Sources: livekit-plugins/livekit-plugins-anthropic/livekit/plugins/anthropic/llm.py
p
safe_messages = [ ChatMessage(role="system", content=[self._pii_safe_system_prompt]), latest_user_message ] safe_chat_ctx = ChatContext(items=safe_messages) async for chunk in super().llm_node(safe_chat_ctx, tools, model_settings): is this way correct?
t
Absolutely, creating a
safe_chat_ctx
with specific messages and then using it in
super().llm_node
is the right approach. Here’s a quick rundown of why this works well: • ChatContext Usage: The
ChatContext
class is built to hold conversation messages. Initializing it with your specific messages, like
safe_messages
, is exactly how it’s meant to be used. • Message Addition: Adding messages with roles such as
system
and
user
helps keep the conversation structured and clear. • Passing to llm_node: By overriding
llm_node
and passing your custom
safe_chat_ctx
, you’re making sure the LLM only works with the safe messages you’ve set up. Here’s what your implementation looks like:
Copy code
safe_messages = [
    ChatMessage(role="system", content=[self._pii_safe_system_prompt]),
    latest_user_message
]

safe_chat_ctx = ChatContext(items=safe_messages)

async for chunk in super().llm_node(safe_chat_ctx, tools, model_settings):
    # Process chunks here
This is spot on and fits perfectly with LiveKit’s design. If you want to dive deeper or tweak anything, just let me know—I’m here to help! Sources: livekit-plugins/livekit-plugins-openai/livekit/plugins/openai/llm.py
p
the code stuck here it's not executing the for loop why is that
t
Hey there! If the
super().llm_node
method with your custom
ChatContext
isn’t running the for loop like you expect, here are a few things that might be causing that: • Custom ChatContext Setup: Double-check that your
safe_chat_ctx
is properly initialized and actually has valid messages inside. If it’s empty or set up incorrectly, the LLM might not produce any responses. • LLM Response Handling: The
llm_node
method streams responses from the LLM. If the LLM isn’t returning anything (maybe due to an error or misconfiguration), the loop won’t run. • Error Handling: Take a look for any errors that might pop up during LLM processing. If something throws an exception, it could stop the loop from executing. • Tool Calls: If your LLM responses include tool calls, make sure those tools are set up correctly and working. If a tool malfunctions, it might break the flow. Here are some friendly debugging tips: 1. Try adding some logging before and after the
super().llm_node
call to confirm it’s actually running. 2. Log what’s inside
safe_chat_ctx
to make sure it has the messages you expect. 3. Review your LLM setup and verify it’s configured properly to handle the context. If you want, I’m here to help you debug or explain anything further—just let me know! Sources: livekit-plugins/livekit-plugins-langchain/livekit/plugins/langchain/langgra...
p
what other params async def llm_node(self, chat_ctx, tools, model_settings): takes? if i havr my custom safe_chat_ctx do ineed to write it in the params
t
Hey there! The
async def llm_node
function in LiveKit takes a few parameters you’ll want to know about: • chat_ctx: This is your conversation context, and it’s of type
llm.ChatContext
. • tools: Here, you’ll pass a list of
FunctionTool
or
RawFunctionTool
. • model_settings: These are your configuration parameters, using the
ModelSettings
type. Got a custom ChatContext? No worries! If you have a custom
safe_chat_ctx
, you don’t need to add it as a parameter in the function definition. Instead, you can create and use it right inside the function. Here’s a quick example:
Copy code
async def llm_node(self, chat_ctx, tools, model_settings):
    safe_chat_ctx = ChatContext(items=[
        ChatMessage(role="system", content="Your custom prompt"),
        *chat_ctx.items
    ])

    async for chunk in super().llm_node(safe_chat_ctx, tools, model_settings):
        yield chunk
If anything’s unclear or you want more examples, just let me know—I’m here to help!
p
this error occur AttributeError: 'tuple' object has no attribute 'type' {"pid": 970424, "job_id": "AJ_4dEb9JPCPo9e"}
t
Hey there! Could you share a bit more about when this error pops up? For example: • What does the data you're passing to
llm_node
look like? • Are you using any custom tools or tweaks in your pipeline? • Which part of the code seems to be causing the error? Getting these details will really help me understand what's going on and find the best way to fix it for you. Sources: livekit-agents/livekit/agents/voice/agent_activity.py
Hey there! That error
AttributeError: 'tuple' object has no attribute 'type'
usually pops up when your code expects an object with a
type
attribute but instead gets a tuple. No worries though—here’s a friendly guide to help you fix it: 1. Check the Yielded Types: Make sure your
llm_node
function is only yielding
str
or
ChatChunk
objects. If it’s yielding a tuple, that’s likely the culprit. 2. Review the Implementation: Take a look at all the
yield
statements inside
llm_node
and confirm they return the expected types. For example:
Copy code
async def llm_node(chat_ctx: ChatContext, tools: list[FunctionTool], model_settings: ModelSettings):
    yield "Generated text"
    yield ChatChunk(id="1", delta=...)
1. Debugging: Try adding some logging or breakpoints to peek at the types of objects being yielded. If you spot any tuples, tweak them to return the right types instead. If you’re still stuck or want me to take a closer look, just share a bit more about your
llm_node
setup—I’m here to help! Sources: livekit-agents/livekit/agents/voice/agent_activity.py