i want to save the chats of livekit rooms in db wh...
# ask-ai
b
i want to save the chats of livekit rooms in db what are the possible ways of doing this
t
Thanks for reaching out, Aman. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
LiveKit doesn’t come with built-in chat persistence, but no worries—you can set up chat history storage yourself using a database. Here are a couple of common ways to do it: • Using Key-Value Storage: You might try a key-value store like Vercel KV to save and pull up chat history. For example, you can save messages with a unique key and link them to users or rooms using pipelines. • Using a SQL Database: Another option is creating a database table to hold chat messages, including details like room ID, participant ID, message content, and timestamp. Then you can use SQL queries to add and fetch messages whenever you need. When you’re ready for production, it’s a good idea to use scalable databases like PostgreSQL or MySQL. Also, think about adding features like user authentication, rate limiting, and regular cleanup to keep everything running smoothly and your storage in check.
b
how to implement it . i have a db but hoe to implement the chat i want to save the chats based on the room_name , it will always be unique.
t
Hey there! If you want to save your LiveKit chats in a database using unique room names, here’s a friendly step-by-step guide to help you out: 1. Database Schema: Start by creating tables to keep track of rooms, participants, and messages. Here’s a simple example:
Copy code
CREATE TABLE rooms (
    id SERIAL PRIMARY KEY,
    room_name VARCHAR(255) UNIQUE NOT NULL,
    created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);

CREATE TABLE messages (
    id SERIAL PRIMARY KEY,
    room_id INT REFERENCES rooms(id),
    participant_id VARCHAR(255),
    message TEXT NOT NULL,
    sent_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
2. Save Room Information: When a new room is created, just pop its unique name into the
rooms
table. 3. Save Chat Messages: Every time someone sends a message, save it in the
messages
table along with the room ID and participant info. 4. Implementation: Use your app’s logic to catch chat events and store them in your database. Here’s a quick Python example:
Copy code
def save_message(room_name, participant_id, message):
    room_id = get_room_id(room_name)  # Grab the room ID from your database
    conn.execute(
        "INSERT INTO messages (room_id, participant_id, message, sent_at) VALUES (?, ?, ?, ?)",
        (room_id, participant_id, message, datetime.now())
    )
If you want, I’m happy to help you dive deeper into any part of this or provide more code examples. Just let me know!
b
okey where to call the save message from .. import asyncio import logging from dotenv import load_dotenv import json import os from time import perf_counter from typing import Annotated from livekit import rtc, api from livekit.agents import ( AutoSubscribe, JobContext, JobProcess, WorkerOptions, cli, llm, ) from livekit.agents.pipeline import VoicePipelineAgent from livekit.plugins import deepgram, openai, silero # load environment variables, this is optional, only used for local development load_dotenv(dotenv_path=".env.local") logger = logging.getLogger("outbound-caller") logger.setLevel(logging.INFO) outbound_trunk_id = os.getenv("SIP_OUTBOUND_TRUNK_ID") _default_instructions = ( "You are a scheduling assistant for a game development studio. Your interface with user will be voice." "You will be on a call with a customer who has an upcoming appointment. Your goal is to confirm the appointment details." "As a customer service representative, you will be polite and professional at all times. Allow user to end the conversation." ) async def entrypoint(ctx: JobContext): global _default_instructions, outbound_trunk_id logger.info(f"connecting to room {ctx.room.name}") await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) user_identity = "phone_user" # the phone number to dial is provided in the job metadata phone_number = ctx.job.metadata logger.info(f"dialing {phone_number} to room {ctx.room.name}") # look up the user's phone number and appointment details instructions = ( _default_instructions + "The customer's name is Kyle. His appointment is next Tuesday at 3pm." ) #
create_sip_participant
starts dialing the user await ctx.api.sip.create_sip_participant( api.CreateSIPParticipantRequest( room_name=ctx.room.name, sip_trunk_id=outbound_trunk_id, sip_call_to=phone_number, participant_identity=user_identity, ) ) # a participant is created as soon as we start dialing participant = await ctx.wait_for_participant(identity=user_identity) run_voice_pipeline_agent(ctx, participant, instructions) # in addition, you can monitor the call status separately start_time = perf_counter() while perf_counter() - start_time < 30: call_status = participant.attributes.get("sip.callStatus") if call_status == "active": logger.info("user has picked up") return elif call_status == "automation": # if DTMF is used in the
sip_call_to
number, typically used to dial # an extension or enter a PIN. # during DTMF dialing, the participant will be in the "automation" state pass elif participant.disconnect_reason == rtc.DisconnectReason.USER_REJECTED: logger.info("user rejected the call, exiting job") break elif participant.disconnect_reason == rtc.DisconnectReason.USER_UNAVAILABLE: logger.info("user did not pick up, exiting job") break await asyncio.sleep(0.1) logger.info("session timed out, exiting job") ctx.shutdown() class CallActions(llm.FunctionContext): """ Detect user intent and perform actions """ def __init__( self, *, api: api.LiveKitAPI, participant: rtc.RemoteParticipant, room: rtc.Room ): super().__init__() self.api = api self.participant = participant self.room = room async def hangup(self): try: await self.api.room.remove_participant( api.RoomParticipantIdentity( room=self.room.name, identity=self.participant.identity, ) ) except Exception as e: # it's possible that the user has already hung up, this error can be ignored logger.info(f"received error while ending call: {e}") @llm.ai_callable() async def end_call(self): """Called when the user wants to end the call""" logger.info(f"ending the call for {self.participant.identity}") await self.hangup() @llm.ai_callable() async def look_up_availability( self, date: Annotated[str, "The date of the appointment to check availability for"], ): """Called when the user asks about alternative appointment availability""" logger.info( f"looking up availability for {self.participant.identity} on {date}" ) await asyncio.sleep(3) return json.dumps( { "available_times": ["1pm", "2pm", "3pm"], } ) @llm.ai_callable() async def confirm_appointment( self, date: Annotated[str, "date of the appointment"], time: Annotated[str, "time of the appointment"], ): """Called when the user confirms their appointment on a specific date. Use this tool only when they are certain about the date and time.""" logger.info( f"confirming appointment for {self.participant.identity} on {date} at {time}" ) return "reservation confirmed" @llm.ai_callable() async def detected_answering_machine(self): """Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting""" logger.info(f"detected answering machine for {self.participant.identity}") await self.hangup() def run_voice_pipeline_agent( ctx: JobContext, participant: rtc.RemoteParticipant, instructions: str ): logger.info("starting voice pipeline agent") initial_ctx = llm.ChatContext().append( role="system", text=instructions, ) agent = VoicePipelineAgent( vad=ctx.proc.userdata["vad"], stt=deepgram.STT(model="nova-2-phonecall"), llm=openai.LLM(), tts=openai.TTS(), chat_ctx=initial_ctx, fnc_ctx=CallActions(api=ctx.api, participant=participant, room=ctx.room), ) agent.start(ctx.room, participant) def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load() if name == "__main__": if not outbound_trunk_id or not outbound_trunk_id.startswith("ST_"): raise ValueError( "SIP_OUTBOUND_TRUNK_ID is not set" ) cli.run_app( WorkerOptions( entrypoint_fnc=entrypoint, # giving this agent a name will allow us to dispatch it via API # automatic dispatch is disabled when
agent_name
is set agent_name="outbound-caller", # prewarm by loading the VAD model, needed only for VoicePipelineAgent prewarm_fnc=prewarm, ) )
t
Hey there! To save chat messages in your LiveKit setup, you can simply call the save message function whenever a new message pops up—whether it’s sent or received. Here’s a friendly little guide on how to weave it into your code: 1. Spot the Event: Keep an eye out for the right event or callback in your LiveKit SDK that tells you when a message is sent or received. For instance, you can listen for chat-related events right in your app’s logic. 2. Call That Save Function: Once you catch the event, just call your save message function with the details it needs, like the room name, participant ID, and the message itself. Something like this:
Copy code
async def on_message_received(room_name, participant_id, message):
    save_message(room_name, participant_id, message)
3. Keep the Chat Context Fresh: If you’re using LiveKit’s chat context, you can add the new message there and make sure it sticks around:
Copy code
turn_ctx.add_message(
    role="assistant",
    content=message
)
await self.update_chat_ctx(turn_ctx)
Does this help? If you want, I’m here to walk you through adding this into your specific codebase—just let me know!