how to save conversation in database
# ask-ai
p
how to save conversation in database
t
Thanks for reaching out, arpan. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
Hey there! If you're looking to save conversations in a database using LiveKit, here are a few friendly ways to do it: • Client-Side Storage: You can use the
StorageUtils
class to handle conversations right on your device with IndexedDB. For instance, you can create a conversation, add messages, and grab or update conversations using handy methods like
createConversation
,
appendMsg
, and
getAllConversations
. • Server-Side Recording: LiveKit’s Egress feature lets you record sessions and save them to cloud storage (like S3). This works great if you want centralized storage or need to meet compliance requirements. • Text Transcripts: You can listen for events like
conversation_item_added
to save messages as they happen, or pull the full session history to keep complete transcripts. If you want me to walk you through any of these options in more detail, or if you have specific needs like integrating with a certain database or formatting your data a certain way, just let me know—I’m here to help!
p
code i have database mongodb to store conversation
t
Hey there! If you want to store your LiveKit conversation data in MongoDB, here’s a simple way to get started: 1. Set Up MongoDB: First, connect to your database using a MongoDB client library like
pymongo
if you’re working with Python. 2. Define a Data Structure: Think about how you want to organize your conversations. Here’s an example schema you could use:
Copy code
{
  "conversation_id": "unique_id",
  "messages": [
    {
      "role": "user",
      "content": "Hello",
      "timestamp": ISODate("...")
    },
    {
      "role": "assistant",
      "content": "Hi! How can I help?",
      "timestamp": ISODate("...")
    }
  ],
  "created_at": ISODate("..."),
  "updated_at": ISODate("...")
}
3. Save Messages: You can capture new messages by using triggers or callbacks. For example:
Copy code
def on_conversation_item_added(event):
    message = event.item
    conversations.update_one(
        {"conversation_id": event.conversation_id},
        {"$push": {"messages": message.to_dict()}},
        upsert=True
    )
4. Access Conversation History: To keep things up-to-date, save the session history regularly or listen for events like
conversation_item_added
to update your MongoDB in real-time. If you’d like, I’m happy to help with a full code example or guide you through integrating MongoDB with your LiveKit setup. Just let me know!
p
example
t
Here's a friendly example of how you can store LiveKit conversation data in MongoDB using Python:
Copy code
from pymongo import MongoClient
from datetime import datetime

# Connect to MongoDB
client = MongoClient('<mongodb://localhost:27017>')
db = client['livekit_conversations']
conversations = db['conversations']

# Function to save a new conversation
def save_conversation(conversation_id):
    initial_data = {
        "conversation_id": conversation_id,
        "messages": [],
        "created_at": datetime.now(),
        "updated_at": datetime.now()
    }
    conversations.insert_one(initial_data)

# Function to save a message to a conversation
def save_message(conversation_id, role, content):
    message = {
        "role": role,
        "content": content,
        "timestamp": datetime.now()
    }
    conversations.update_one(
        {"conversation_id": conversation_id},
        {"$push": {"messages": message}, "$set": {"updated_at": datetime.now()}},
        upsert=True
    )

# Example usage
conversation_id = "unique_conversation_id"
save_conversation(conversation_id)
save_message(conversation_id, "user", "Hello, how are you?")
save_message(conversation_id, "assistant", "I am fine, thank you!")
This little example shows you how to: • Create a new conversation document in MongoDB with a unique ID. • Add messages to the conversation, including who said what and when. • Keep the
updated_at
timestamp fresh every time a new message comes in. If you want me to walk you through any part of this or need more examples, just let me know — I’m here to help!
p
how can can i add it in entrypoint
t
Hey there! To help you integrate MongoDB storage into the
entrypoint
of your LiveKit agent, here’s a friendly example you can follow:
Copy code
from pymongo import MongoClient
from datetime import datetime
from livekit import agents
from livekit.agents import Agent, ChatContext, AgentSession

# Connect to MongoDB
client = MongoClient('<mongodb://localhost:27017>')
db = client['livekit_conversations']
conversations = db['conversations']

# Function to save a new conversation
def save_conversation(conversation_id):
    initial_data = {
        "conversation_id": conversation_id,
        "messages": [],
        "created_at": datetime.now(),
        "updated_at": datetime.now()
    }
    conversations.insert_one(initial_data)

# Function to save a message to a conversation
def save_message(conversation_id, role, content):
    message = {
        "role": role,
        "content": content,
        "timestamp": datetime.now()
    }
    conversations.update_one(
        {"conversation_id": conversation_id},
        {"$push": {"messages": message}, "$set": {"updated_at": datetime.now()}},
        upsert=True
    )

class CustomAgent(Agent):
    def __init__(self, chat_ctx: ChatContext) -> None:
        super().__init__(chat_ctx=chat_ctx, instructions="You are a helpful assistant.")

    async def entrypoint(ctx: agents.JobContext):
        # Generate a unique conversation ID
        conversation_id = ctx.job.metadata.get('conversation_id', str(uuid.uuid4()))

        # Save the initial conversation to MongoDB
        save_conversation(conversation_id)

        # Initialize the session
        session = AgentSession(
            chat_context=ChatContext()
        )

        # Add a callback to save messages to MongoDB
        session.chat_context.on_message_added = lambda message: save_message(
            conversation_id, message.role, message.content
        )

        # Start the session
        await session.start()
Here’s a quick rundown: • The
entrypoint
kicks things off by saving a new conversation with a unique ID to MongoDB. • It sets up a handy callback (
on_message_added
) that saves every new message to MongoDB as it comes in. • The
save_conversation
and
save_message
functions take care of the database work behind the scenes. If you want me to help tweak this or if anything’s unclear, just let me know—I’m here to help!
p
if i have generate file from session i want to save that particular id conversation
t
If you'd like to generate a file from a session and save it to a specific conversation ID in MongoDB, here’s a simple way to do it:
Copy code
from pymongo import MongoClient
from datetime import datetime
import json

# Connect to MongoDB
client = MongoClient('<mongodb://localhost:27017>')
db = client['livekit_conversations']
conversations = db['conversations']

# Function to save a file reference to a conversation
def save_file_to_conversation(conversation_id, file_path):
    conversations.update_one(
        {"conversation_id": conversation_id},
        {"$set": {"file_path": file_path, "updated_at": datetime.now()}},
        upsert=True
    )

# Function to generate a file from session data
def generate_file_from_session(conversation_id, session_data):
    file_path = f"/path/to/files/{conversation_id}_session.json"
    with open(file_path, 'w') as file:
        json.dump(session_data, file, indent=4)

    # Save the file reference to MongoDB
    save_file_to_conversation(conversation_id, file_path)

# Example usage
conversation_id = "unique_conversation_id"
session_data = {
    "messages": [
        {"role": "user", "content": "Hello"},
        {"role": "assistant", "content": "Hi! How can I help?"}
    ]
}

generate_file_from_session(conversation_id, session_data)
Here's a quick rundown: • The
generate_file_from_session
function takes your session data, creates a neat JSON file, and saves it in your chosen folder. • The
save_file_to_conversation
function then updates your MongoDB record for that conversation with the file’s location. • You can run these functions whenever you want—whether right after the session ends or anytime you want to save and link the session data. If you have any questions or want me to walk you through more examples, just let me know—I’m here to help!
p
in entrypoint add
t
Hey there! To help you integrate the functionality of generating a file from session data and saving it to a specific conversation ID right into the
entrypoint
, here’s a friendly walkthrough with the updated code:
Copy code
from pymongo import MongoClient
from datetime import datetime
import json
from livekit import agents
from livekit.agents import Agent, ChatContext, AgentSession

# Connect to MongoDB
client = MongoClient('<mongodb://localhost:27017>')
db = client['livekit_conversations']
conversations = db['conversations']

# Function to save a file reference to a conversation
def save_file_to_conversation(conversation_id, file_path):
    conversations.update_one(
        {"conversation_id": conversation_id},
        {"$set": {"file_path": file_path, "updated_at": datetime.now()}},
        upsert=True
    )

# Function to generate a file from session data
def generate_file_from_session(conversation_id, session_data):
    file_path = f"/path/to/files/{conversation_id}_session.json"
    with open(file_path, 'w') as file:
        json.dump(session_data, file, indent=4)

    # Save the file reference to MongoDB
    save_file_to_conversation(conversation_id, file_path)

class CustomAgent(Agent):
    def __init__(self, chat_ctx: ChatContext) -> None:
        super().__init__(chat_ctx=chat_ctx, instructions="You are a helpful assistant.")

    async def entrypoint(ctx: agents.JobContext):
        # Generate a unique conversation ID
        conversation_id = ctx.job.metadata.get('conversation_id', str(uuid.uuid4()))

        # Initialize the session
        session = AgentSession(
            chat_context=ChatContext()
        )

        # Add a callback to save messages to MongoDB
        session.chat_context.on_message_added = lambda message: save_message(
            conversation_id, message.role, message.content
        )

        # Start the session
        await session.start()

        # After the session ends, generate a file and save it to MongoDB
        session_data = {
            "messages": [
                {"role": message.role, "content": message.content} for message in session.chat_context.messages
            ]
        }
        generate_file_from_session(conversation_id, session_data)
Quick recap: • The
entrypoint
function kicks things off by initializing the session and setting up a handy callback to save messages as they come in. • Once the session wraps up, it gathers all the session data, creates a neat JSON file, and saves the file path to MongoDB linked to the conversation ID. If anything’s unclear or you want me to tweak it further, just let me know—I’m here to help!
p
is it fine import sys import os import json from datetime import datetime from livekit.agents import Agent, AgentSession, JobContext, llm,ChatContext from livekit.plugins import silero, groq, cartesia, deepgram from dotenv import load_dotenv # Load environment variables load_dotenv() # Import your custom modules with error handling try: from src.utils.logger import logging from src.utils.exception import CustomException from src.pipeline.agentic_rag_pipeline import AgenticRAG print(" Successfully imported custom modules") except ImportError as e: print(f" Import error: {e}") # Use basic logging as fallback import logging logging.basicConfig(level=logging.INFO) # Create a mock Tool class for testing class Tool: def run(self, input_data): return {"generation": f"Mock response for: {input_data.get('user_transcript', 'No query')}"} class CustomException(Exception): pass # Model configuration - with fallbacks STT_MODEL = os.getenv("STT_MODEL", "nova-2") LLM_MODEL = os.getenv("LLM_MODEL", "llama3-70b-8192") TTS_MODEL = os.getenv("TTS_MODEL", "sonic-english") # Initialize tool agent with error handling try: tool_agent = AgenticRAG() print(" Tool agent initialized successfully") except Exception as e: print(f" Failed to initialize Tool agent: {e}") # Create a mock agent for testing class MockTool: def run(self, input_data): return {"generation": f"Mock response for: {input_data.get('user_transcript', 'No query')}"} tool_agent = MockTool() def create_tool_function(): """ Create a custom LangChain tool that wraps the Tool workflow instance. """ @llm.function_tool( name="information_tool", description="Tool for retrieving factual information and answering questions" ) async def information_tool(query: str) -> str: """ Tool for information retrieval and research. Args: query: The user's question that needs information Returns: Response with relevant information """ try: logging.info(f"Information tool invoked with query: '{query}'") # Prepare input for the tool input_data = {"user_transcript": query} print(input_data) # Invoke the tool workflow result = tool_agent.run(query) # Extract response if isinstance(result, dict) and 'generation' in result: response = result['generation'] else: response = str(result) logging.info(f"Tool response: {response}") return response except Exception as e: error_msg = f"Error processing query: {str(e)}" logging.error(error_msg) return error_msg return information_tool async def entrypoint(ctx: JobContext): """Main entrypoint for the LiveKit agent""" try: print("🚀 Starting LiveKit agent...") logging.info("Starting LiveKit agent...") # Connect to the room await ctx.connect() print(" Connected to room") # Create the information tool info_tool = create_tool_function() print(" Information tool created") # Create session with error handling for each component print("🔧 Creating session components...") try: vad = silero.VAD.load() print(" VAD loaded") stt = deepgram.STT( model=STT_MODEL, language="multi", api_key=os.getenv("DEEPGRAM_API_KEY") ) print(" STT configured") llm_instance = groq.LLM( model=LLM_MODEL, api_key=os.getenv("GROQ_API_KEY") ) print(" LLM configured") tts = cartesia.TTS( model=TTS_MODEL, voice="f786b574-daa5-4673-aa0c-cbe3e8534c02", api_key=os.getenv("CARTESIA_API_KEY") ) print(" TTS configured") except Exception as e: print(f" TTS error: {e}") raise CustomException(e,sys) # Create session session = AgentSession( vad=vad, stt=stt, llm=llm_instance, tts=tts, chat_context=ChatContext() ) print(" Session created") conversation_id = ctx.job.metadata.get('conversation_id', str(uuid.uuid4())) print(" Transcription logging set up") # Create agent with tools agent = Agent( llm=llm_instance, instructions="""You are a helpful AI assistant named Irish. Use the information_tool only when you don't have knowledge of user questions that require factual information or research. Keep responses conversational and friendly.""", tools=[info_tool] ) print(" Agent created with tools") session._chat_ctx.add_message = lambda msg: save_message( conversation_id, msg.role, msg.content ) # Start the session await session.start(agent=agent, room=ctx.room) print(" Session started") # Send initial greeting await session.generate_reply( instructions="Greet the user briefly and ask how you can help them today.Tell Your Name" ) print(" Initial greeting sent") print("🎙️ Agent is now active and listening...") # After the session ends, generate a file and save it to MongoDB session_data = { "messages": [ {"role": message.role, "content": message.content} for message in session._chat_ctx.merge ] } generate_file_from_session(conversation_id, session_data) except Exception as e: error_msg = f"Voice agent error: {e}" print(f" {error_msg}") logging.error(error_msg) raise CustomException(e, sys)
t
Your code structure for the
entrypoint
looks really solid and well put together! I can see you’ve included error handling, environment variable loading, and integrated tools like VAD, STT, LLM, and TTS. Plus, the way you’re saving session data to MongoDB and generating files fits nicely into the flow. Just a few friendly pointers to help everything run smoothly: • MongoDB Integration: Make sure the
save_message
and
generate_file_from_session
functions are properly defined and imported into your script. These should handle saving messages and session data to MongoDB, just like in the earlier examples. • Session Data Collection: When you’re generating the session data file, double-check that
session._chat_ctx.merge
is actually pulling all the messages from the session. If it’s not quite right, you might want to try
session.chat_context.messages
or something similar instead. • Error Handling: Using
CustomException
is a smart move for consistent error reporting. Just be sure that the
CustomException
class is properly defined and imported. • Environment Variables: Take a moment to verify that all your needed environment variables (like API keys for Deepgram, Groq, and Cartesia) are correctly set up in your environment or
.env
file. • File Path: Don’t forget to update the
/path/to/files/
directory in your
generate_file_from_session
function to a valid folder on your system where your app can write files. If you want to dive deeper into any part of the code or need more examples, just give me a shout—I’m happy to help!
p
if any mistacke generate full code
t
Here is the complete and corrected code for your
entrypoint
function, including MongoDB integration, session data saving, and error handling:
Copy code
import sys
import os
import json
import uuid
from datetime import datetime
from livekit.agents import Agent, AgentSession, JobContext, llm, ChatContext
from livekit.plugins import silero, groq, cartesia, deepgram
from dotenv import load_dotenv
from pymongo import MongoClient

# Load environment variables
load_dotenv()

# Connect to MongoDB
client = MongoClient('<mongodb://localhost:27017>')
db = client['livekit_conversations']
conversations = db['conversations']

# Function to save a new message to MongoDB
def save_message(conversation_id, role, content):
    message = {
        "role": role,
        "content": content,
        "timestamp": datetime.now()
    }
    conversations.update_one(
        {"conversation_id": conversation_id},
        {"$push": {"messages": message}, "$set": {"updated_at": datetime.now()}},
        upsert=True
    )

# Function to generate a file from session data and save it to MongoDB
def generate_file_from_session(conversation_id, session_data):
    file_path = f"/path/to/files/{conversation_id}_session.json"
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path, 'w') as file:
        json.dump(session_data, file, indent=4)

    # Save the file reference to MongoDB
    conversations.update_one(
        {"conversation_id": conversation_id},
        {"$set": {"file_path": file_path, "updated_at": datetime.now()}},
        upsert=True
    )

# Main entrypoint function
async def entrypoint(ctx: JobContext):
    try:
        print("🚀  Starting LiveKit agent...")

        # Connect to the room
        await ctx.connect()
        print("✅  Connected to room")

        # Initialize session components
        vad = silero.VAD.load()
        stt = deepgram.STT(
            model=os.getenv("STT_MODEL", "nova-2"),
            language="multi",
            api_key=os.getenv("DEEPGRAM_API_KEY")
        )
        llm_instance = groq.LLM(
            model=os.getenv("LLM_MODEL", "llama3-70b-8192"),
            api_key=os.getenv("GROQ_API_KEY")
        )
        tts = cartesia.TTS(
            model=os.getenv("TTS_MODEL", "sonic-english"),
            voice="f786b574-daa5-4673-aa0c-cbe3e8534c02",
            api_key=os.getenv("CARTESIA_API_KEY")
        )

        # Create session
        session = AgentSession(
            vad=vad,
            stt=stt,
            llm=llm_instance,
            tts=tts,
            chat_context=ChatContext()
        )
        print("✅  Session created")

        # Generate a unique conversation ID
        conversation_id = ctx.job.metadata.get('conversation_id', str(uuid.uuid4()))

        # Add a callback to save messages to MongoDB
        session.chat_context.on_message_added = lambda message: save_message(
            conversation_id, message.role, message.content
        )

        # Create the agent
        agent = Agent(
            llm=llm_instance,
            instructions="""You are a helpful AI assistant named Irish.
            Use the information_tool only when you don't have knowledge of user questions that require factual information or research.
            Keep responses conversational and friendly."""
        )
        print("✅  Agent created")

        # Start the session
        await session.start(agent=agent, room=ctx.room)
        print("✅  Session started")

        # Send initial greeting
        await session.generate_reply(
            instructions="Greet the user briefly and ask how you can help them today. Tell Your Name"
        )
        print("✅  Initial greeting sent")

        # After the session ends, generate a file and save it to MongoDB
        session_data = {
            "messages": [
                {"role": message.role, "content": message.content} for message in session.chat_context.messages
            ]
        }
        generate_file_from_session(conversation_id, session_data)

    except Exception as e:
        error_msg = f"Voice agent error: {e}"
        print(f"❌  {error_msg}")
        raise
Key Features: • Handles MongoDB integration for saving messages and session data. • Generates a JSON file for session data and saves its path to MongoDB. • Includes error handling for all critical operations. • Ensures directories for file storage are created if they don’t exist. If you have any questions or want me to tweak anything, just let me know—I’m here to help!