i am using livekit and try to intregrate Langgraph...
# ask-ai
p
i am using livekit and try to intregrate Langgraph agent instad of llm now it giving KeyError: 'question' my state graph class GraphState(TypedDict): """ Represents a state of a graph. Attributes: question: Question generation: LLM Generation use_web_search: wether to use web search documents: List of documents """ question: str generation: str use_web_search: bool documents: List[Document] tone : str short_term : List[str] from_memory: bool hallucinated: bool
t
Thanks for reaching out, arpan. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
Hey there! The error
KeyError: 'question'
pops up because the LangGraph workflow is trying to grab a 'question' key from its state, but it hasn't been set up yet. No worries though—this usually means you just need to make sure the 'question' key is part of the initial state you pass into the LangGraph workflow. Here’s a simple way to fix it: 1. Make sure your state includes the 'question' key when you define your LangGraph
State
. 2. Inside your
entrypoint
function, pull out the 'question' from
ctx.job.metadata
. Using
.get()
is a handy trick here, since it lets you set a default if 'question' isn’t there. 3. Before you hand off to the LangGraph workflow, initialize the state dictionary with the 'question' you grabbed and any other keys you need. If you want me to walk you through any of these steps or if something’s still tricky, just give me a shout—I’m here to help! Sources: livekit-plugins/livekit-plugins-langchain/livekit/plugins/langchain/__init_... | LangChain integration guide | LiveKit Docs | avatars/tavus/tavus.py
p
so can give example of input of llm node format
t
Hey there! Just wanted to share a quick heads-up about how the input format for an LLM node in LangGraph works when you’re using it with LiveKit. The
LLMAdapter
takes care of everything behind the scenes—it turns LiveKit chat messages into a neat dictionary with a
"messages"
key that holds a list of message objects. Here’s a little example to make it clearer:
Copy code
{
    "messages": [
        SystemMessage(content="You are a helpful assistant.", id="sys_1"),
        HumanMessage(content="How does photosynthesis work?", id="usr_1"),
        AIMessage(content="Photosynthesis is the process by which plants convert light energy...", id="asst_1")
    ]
}
Here’s what the
LLMAdapter
does automatically for you: • It changes
system
and
developer
role messages into
SystemMessage
• It turns
user
role messages into
HumanMessage
• And it converts
assistant
role messages into
AIMessage
To get this rolling, just wrap your LangGraph workflow with the
LLMAdapter
when you create an
AgentSession
. If anything’s unclear or you want some more examples, just give me a shout—I’m here to help! Sources: LangChain integration guide | LiveKit Docs | examples/voice_agents/llamaindex-rag/chat_engine.py
p
in my state graph what modification neded
t
Hey there! Could you tell me a bit more about your current state graph? For example: • What nodes and edges do you have set up right now? • What are you hoping the graph will do? • Are you running into any particular issues or errors with it? If you share these details, I’ll be happy to help you figure out the best changes to make.
p
import os, sys import asyncio import boto3 from botocore.config import Config from typing import Dict, Any, List from concurrent.futures import ThreadPoolExecutor from langchain_aws import ChatBedrock from langchain.schema import Document from langgraph.graph import StateGraph, END from langchain_huggingface import HuggingFaceEmbeddings from langchain_aws.embeddings import BedrockEmbeddings from langchain_google_genai import ChatGoogleGenerativeAI from langchain_community.vectorstores import FAISS from langfuse.langchain import CallbackHandler from dotenv import load_dotenv load_dotenv() from src.utils.logger import logging from src.utils.exception import CustomException from src.utils.main_utils import measure_time from src.constants.agent_constants import GraphState from src.agentic_rag.web_search_tool import GoogleSearchTool from src.agentic_rag.generation_tool import GenerationTool from src.agentic_rag.retrieval_tool import RetrieverTool from src.agentic_rag.grade_tool import HallucinationGrader from src.agentic_rag.route_tool import QuestionRouter from src.constants.rag_constants import RAGConfig from src.agentic_rag.memory_manager import MemoryManager langfuse_handler = CallbackHandler() class AgenticRAG: def __init__(self) -> None: self.config = RAGConfig() self.embeddings_model_name = self.config.embedding_model llm , vectoreDB = self._load_model() logging.info("Models loaded successfully") # Initialize tools self.question_router = QuestionRouter(vectoreDB) self.rag_tool = RetrieverTool(llm,vectoreDB) self.web_search_tool = GoogleSearchTool() self.generation_tool = GenerationTool(llm) self.hallucination_grader = HallucinationGrader(llm) self.memory_db = MemoryManager() # Build the graph self.workflow = self._build_graph() @measure_time def _load_model(self): model_kwargs = {'device': 'cpu'} encode_kwargs = {'normalize_embeddings': False} bedrock_runtime=boto3.client( service_name="bedrock-runtime", region_name="eu-central-1", aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"), config=Config( connect_timeout=3600, # 60 minutes read_timeout=3600, # 60 minutes retries={'max_attempts': 3} ) ) with ThreadPoolExecutor(max_workers=4) as executor: # Start loading the LLM asynchronously llm_future = executor.submit(ChatGoogleGenerativeAI, model="gemini-1.5-pro", model_kwargs={"temperature": 0.6, "verbose": True} ) embedding_future = executor.submit(BedrockEmbeddings, model_id = "amazon.titan-embed-text-v2:0", normalize = False ) vectore_db_future = executor.submit( FAISS.load_local, folder_path=self.config.vectore_db_path, embeddings=embedding_future.result(), allow_dangerous_deserialization=True ) # Wait for it to complete return llm_future.result(), vectore_db_future.result() def _build_graph(self): workflow = StateGraph(GraphState) # Add nodes workflow.add_node("memory_node", self.memory_db.memory_node) workflow.add_node("route_question", self.question_router.route_question) workflow.add_node("rag_tool",self.rag_tool.search) workflow.add_node("web_search", self.web_search_tool.web_search) workflow.add_node("generate", self.generation_tool.generate) workflow.add_node("memory_saver", self.memory_db.save_memory_node) workflow.add_node("grade_hallucination", self.hallucination_grader.grade_hallucination) # Define the flow # Start → memory node workflow.set_entry_point("memory_node") # After memory check: if found -> grade hallucination, else -> route question workflow.add_conditional_edges( "memory_node", self._decide_after_memory_check, { "route_question": "route_question", "end": END } ) # Route to RAG or web search workflow.add_conditional_edges( "route_question", self._decide_next_step, { "rag_tool": "rag_tool", "web_search": "web_search" } ) # From rag_tool → generate workflow.add_edge("rag_tool", "generate") # From web search → generate workflow.add_edge("web_search", "generate") # After generate → memory saver workflow.add_edge("generate", "memory_saver") # After save memory → hallucination grader workflow.add_edge("memory_saver", "grade_hallucination") # After hallucination grading: if hallucinated → web_search again, else → end workflow.add_conditional_edges( "grade_hallucination", self._decide_after_hallucination_check, { "web_search": "web_search", "end": END } ) # app = workflow.compile() # Save image after compiling # app.get_graph().draw_mermaid_png(output_file_path="workflow_graph.png") # Saves as workflow_graph.png return workflow.compile().with_config({"callbacks": [langfuse_handler]},) def _decide_after_memory_check(self, state: GraphState) -> str: if state.get("from_memory", False): return "end" return "route_question" def _decide_next_step(self, state: GraphState) -> str: if state.get("use_web_search", False): return "web_search" return "rag_tool" def _decide_after_hallucination_check(self, state: GraphState) -> str: if state.get("hallucinated", False): return "web_search" return "end" @measure_time def run(self, question: str, documents: List[Document] = None, tone:str = "friendly") -> Dict[str, Any]: """Run the RAG agent workflow""" print(question) logging.info(f"user question from STT: {question}") logging.info(f"user doc from STT: {documents}") # question = question['user_transcript'] try: initial_state = { "question": question, "generation": "", "use_web_search": False, "documents": documents or [], "tone" :tone, "from_memory": False, "hallucinated": False } result = self.workflow.invoke(input=initial_state) return result except asyncio.TimeoutError as e: logging.info(f"Agent Workflow Error: {e}") CustomException(e,sys)
t
Hey there! To tweak your state graph and handle question input smoothly, here’s a simple step-by-step guide: 1. Define State Schema: Make sure your state includes a
messages
key with
add_messages
to keep track of the conversation history. Here’s a quick example:
Copy code
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages

class State(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
1. Use LLMAdapter: Wrap your compiled graph with
LLMAdapter
so it can automatically handle input conversion for you. Check this out:
Copy code
agent = Agent(
    instructions="",
    llm=langchain.LLMAdapter(graph),
)
1. Initialize State: Make sure
LLMAdapter
initializes the state with a
messages
key. This helps avoid any
KeyError
when nodes try to access
state["messages"]
. 2. Handle User Input: Use
session.generate_reply(user_input="...")
to feed the user’s input into your graph. 3. Access State Safely: Inside your nodes, you can safely access
state["messages"]
since the adapter makes sure it’s always there. Following these steps will help your state graph handle user input without a hitch. If you want me to walk you through any part or share more examples, just let me know—I’m happy to help! Sources: tests/test_agent_session.py | LangChain integration guide | LiveKit Docs
p
is anything wrong with my code that not changing params import os import sys import re import librosa import numpy as np from dotenv import load_dotenv from livekit.agents.llm import function_tool from livekit.agents import Agent, RunContext, get_job_context from livekit.plugins import aws from livekit.agents.llm import ChatContext, ChatMessage, StopResponse from src.voice_agent.voice_agent import CustomSTT from livekit.plugins import openai from src.utils.logger import logging from src.utils.exception import CustomException from src.prompt.voice_agent_prompt import Agent_instructions, Language_Agent load_dotenv() class IrishAgent(Agent): def __init__(self,tool,llm_instance , vad)-> None: super().__init__( instructions=Language_Agent, ##stt=aws.STT(language="en-US",region="eu-central-1",language_model_name="Eng_US"), llm=llm_instance, tts=aws.TTS(region="eu-central-1",api_key=os.getenv("AWS_ACCESS_KEY_ID"),api_secret=os.getenv("AWS_SECRET_ACCESS_KEY")), tools=tool, vad=vad ) self.current_language = "en" self.current_language_model = "Eng" self.aws_languages ={ "en-GB": "en-GB", "de-DE": "de-DE", "hi-IN" : "hi-In" } self.language_names={ "en-GB": "English (United Kingdom)", "de-DE": "German (Germany)", "hi-In" : "Hindi" } self.aws_speech_engine ={ "en-GB": "neural", "de-DE": "standard", "hi-IN": "standard" } self.aws_voice ={ "en-GB": "Emma", "de-DE": "Vicki", "hi-In" :"Aditi" } self.custom_language_model={ "en-GB": "Eng_Uk", "de-DE": "German_data", } async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatMessage) -> None: # callback before generating a reply after user turn committed if not new_message.text_content: # raise StopResponse to stop the agent from generating a reply logging.info("ignore empty user turn") raise StopResponse() async def _switch_language(self, language_code: str): """Helper Function To Switch Languages""" try: if language_code == self.current_language: await self.session.say("You are already using this language.") return # Validate inputs if language_code not in self.aws_languages: raise ValueError(f"Invalid language code: {language_code}") # Look up default configs voice = self.aws_voice[language_code] speech_engine = self.aws_speech_engine[language_code] switch_language = self.aws_languages[language_code] language_model = self.custom_language_model[language_code] # Update TTS options if self.tts is not None: self.tts.update_options(language_code=switch_language, voice=voice, speech_engine=speech_engine) # Update STT options if self.stt is not None: self.stt.update_options(language_code=switch_language,language_model=language_model) # Update current language self.current_language = language_code # Confirmation message await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.") except Exception as e: logging.error(f"Error switching language to {language_code}: {e}") raise CustomException(e, sys) @function_tool async def switch_to_en_gb(self): "switch to english (United Kingdom)" await self._switch_language("en-GB") @function_tool async def switch_to_de_de(self): "switch to german (Germany)" await self._switch_language("de-DE") import os import sys import re import librosa import numpy as np from dotenv import load_dotenv from livekit.agents.llm import function_tool from livekit.agents import Agent, RunContext, get_job_context from livekit.plugins import aws from livekit.agents.llm import ChatContext, ChatMessage, StopResponse from src.voice_agent.voice_agent import CustomSTT from livekit.plugins import openai,sarvam from src.utils.logger import logging from src.utils.exception import CustomException from src.prompt.voice_agent_prompt import Agent_instructions, Language_Agent load_dotenv() hi=""" You are a helpful voice agent that always responds in clear and simple Hindi. Instructions: 1. Always answer in Hindi. Do not use unnecessary English words (keep technical names like "LiveKit", "WiFi", "Server" as is). 2. Keep responses short and natural, like real spoken conversation. 3. If the user greets you, reply politely. 4. If the question is unclear, politely ask for clarification. 5. Explain technical terms in simple Hindi. 6. Your role is to assist the customer, understand requests, and give immediate helpful answers. Examples: - User: "Hello" - Agent: "नमस्ते, मैं आपकी कैसे मदद कर सकता हूँ?" - User: "My call quality is not good." - Agent: "ठीक है, क्या आप जाँच सकते हैं कि आपका इंटरनेट सही से काम कर रहा है?" - User: "How do I connect to the LiveKit server?" - Agent: "आपको अपने अकाउंट से लॉगिन करना होगा, फिर सर्वर का पता डालना होगा।" """ prompt=( "This audio may contain English, Hindi, or German. " "Transcribe speech exactly in the spoken language. " "Do not translate. Keep original script (Devanagari for Hindi, " "Latin for English/German)." ) class SarvamAgent(Agent): def __init__(self,tools,llm , vad)-> None: super().__init__( instructions=Language_Agent, stt=openai.STT(language="en",prompt=prompt,api_key=os.getenv("OPENAI_API_KEY"),use_realtime=True), llm=llm, tts=aws.TTS(region="eu-central-1",api_key=os.getenv("AWS_ACCESS_KEY_ID"),api_secret=os.getenv("AWS_SECRET_ACCESS_KEY")), tools=tools, vad=vad ) self.current_language = "en" self.current_language_model = "Eng" self.aws_languages ={ "en-GB": "en-GB", "de-DE": "de-DE", "hi-IN" : "hi-In" } self.language_names={ "en-GB": "English (United Kingdom)", "de-DE": "German (Germany)", "hi-IN" : "Hindi" } self.aws_speech_engine ={ "en-GB": "neural", "de-DE": "standard", "hi-IN": "standard" } self.aws_voice ={ "en-GB": "Emma", "de-DE": "Vicki", "hi-IN" :"Aditi" } self.custom_language_model={ "en-GB": "Eng_Uk", "de-DE": "German_data", } async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatMessage) -> None: # callback before generating a reply after user turn committed if not new_message.text_content: # raise StopResponse to stop the agent from generating a reply logging.info("ignore empty user turn") raise StopResponse() async def _switch_language(self, language_code: str): """Helper Function To Switch Languages""" try: if language_code == self.current_language: await self.session.say("You are already using this language.") return # Validate inputs if language_code not in self.aws_languages: raise ValueError(f"Invalid language code: {language_code}") # Look up default configs voice = self.aws_voice[language_code] speech_engine = self.aws_speech_engine[language_code] switch_language = self.aws_languages[language_code] language_model = self.custom_language_model[language_code] if language_code !="hi-IN": # Update TTS options if self.tts is not None: self.tts.update_options(language_code=switch_language, voice=voice, speech_engine=speech_engine) # Update STT options if self.stt is not None: self.stt.update_options(language_code=switch_language,language_model=language_model) else: super().__init__( instructions=hi, stt=sarvam.STT(api_key="sk_gfkxkm91_4zZNvQqc52t3wUknTj0ExTUO",language="hi-IN"), llm=openai.LLM( max_completion_tokens=500, temperature=0, api_key=os.getenv("OPENAI_API_KEY") ), tts=sarvam.TTS(api_key="sk_gfkxkm91_4zZNvQqc52t3wUknTj0ExTUO",target_language_code="hi-IN",speaker="anushka") ) print("+"*1000) # Update current language self.current_language = language_code # Confirmation message await self.session.say(f"Language switched to {self.language_names[language_code]} with voice {voice} using {speech_engine}.") except Exception as e: logging.error(f"Error switching language to {language_code}: {e}") raise CustomException(e, sys) @function_tool async def switch_to_en_gb(self): "switch to english (United Kingdom)" await self._switch_language("en-GB") @function_tool async def switch_to_de_de(self): "switch to german (Germany)" await self._switch_language("de-DE") @function_tool async def switch_to_hi_in(self): "switch to Hindi(India)" await self._switch_language("hi-IN")
t
Your code for language switching and parameter updates looks well put together! Here are a few friendly pointers to help make sure everything runs smoothly: 1. Dynamic Parameter Updates: Double-check that the
update_options
method is properly updating the STT and TTS components. It should dynamically adjust settings like language, voice, and speech engine for all active streams. 2. Language Switching Logic: Make sure the
_switch_language
method is handling language-specific settings correctly—for instance, applying Hindi-specific configurations only when
language_code
is "hi-IN". 3. Function Tools: The
@function_tool
decorators for switching languages seem spot on. These let the LLM trigger language changes on the fly. 4. Real-Time Updates: If you’re working with real-time STT (like OpenAI or Deepgram), just confirm that the new language settings kick in during ongoing sessions without needing a restart. 5. Error Handling: Your use of
CustomException
for error handling is solid! Just be sure all exceptions are logged with enough detail to make debugging easier. If you’re running into any specific hiccups—like parameters not updating or errors while switching languages—feel free to share! I’m here to help with some targeted troubleshooting. Sources: README.md | livekit-plugins/livekit-plugins-baseten/livekit/plugins/baseten/stt.py | Gladia integration guide | LiveKit Docs
p
how expariment to see if my class is working or not
t
To try out and make sure your class is working well, here are a few easy ways you can test it: 1. Local Console Testing: You can run your agent right in the terminal with this command:
python myagent.py console
This is a great way to check the basics without needing to connect to a LiveKit server. 2. Integration Testing: Start your agent in development mode by running:
python myagent.py dev
Then hop into the LiveKit room using a client SDK or the Agents Playground to see how it interacts in a real scenario. 3. Automated Testing: You can write Python tests using the LiveKit testing framework. Here’s a simple example:
Copy code
import pytest
from livekit.agents import AgentSession
@pytest.mark.asyncio
async def test_agent():
    async with AgentSession() as session:
        await session.start(MyAgent())
        result = await session.run(user_input="Hello")
        assert "Hello" in result.response
1. Agent-to-Agent Testing: You can have one agent test another by turning on AGENT participants in your room setup. 2. Load Testing: Use the LiveKit CLI to simulate multiple rooms and see how your setup performs:
lk perf agent-load-test --rooms 5 --agent-name test-agent --duration 5m
If you want a hand setting up any of these, just let me know—I’m here to help! Sources: README.md | README.md | README.md | Vision Agent Quickstart | LiveKit Docs | Testing and evaluation | LiveKit Docs
p
language switch
t
Here’s a friendly guide to testing the language switching feature in your LiveKit agent: 1. Manual Testing: Start by running your agent in development mode with:
python myagent.py dev
Next, hop into the LiveKit room using a client SDK or the Agents Playground. Try asking the agent to switch languages (like saying "Switch to Spanish") and see if it confirms the change and replies in the new language. 2. Automated Testing: You can also write tests using the LiveKit testing framework. For example:
Copy code
from livekit.agents import AgentSession
@pytest.mark.asyncio
async def test_language_switch():
    async with AgentSession() as session:
        await session.start(MyAgent())
        result = await session.run(user_input="Switch to Spanish")
        assert "Spanish" in result.response
1. Agent-to-Agent Testing: Another neat trick is to have one agent simulate user interactions and check how the other agent responds. Just set up the room to include AGENT participants for this. 2. Performance Metrics: Don’t forget to enable metrics collection! It helps you keep an eye on things like latency, speech recognition accuracy, and how smooth the language switch feels for users. If you want a hand setting any of this up or writing specific tests, just let me know—I’m here to help! Sources: livekit-plugins/livekit-plugins-gladia/README.md | Vision Agent Quickstart | LiveKit Docs | Gladia integration guide | LiveKit Docs | README.md | Testing and evaluation | LiveKit Docs