busy-breakfast-32623
08/14/2025, 10:22 AMtall-airline-16756
08/14/2025, 10:22 AMtall-airline-16756
08/14/2025, 10:23 AMbusy-breakfast-32623
08/14/2025, 10:25 AMtall-airline-16756
08/14/2025, 10:26 AMCREATE TABLE rooms (
id SERIAL PRIMARY KEY,
room_name VARCHAR(255) UNIQUE NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE messages (
id SERIAL PRIMARY KEY,
room_id INT REFERENCES rooms(id),
participant_id VARCHAR(255),
message TEXT NOT NULL,
sent_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);
2. Save Room Information: When a new room is created, just pop its unique name into the rooms
table.
3. Save Chat Messages: Every time someone sends a message, save it in the messages
table along with the room ID and participant info.
4. Implementation: Use your app’s logic to catch chat events and store them in your database. Here’s a quick Python example: def save_message(room_name, participant_id, message):
room_id = get_room_id(room_name) # Grab the room ID from your database
conn.execute(
"INSERT INTO messages (room_id, participant_id, message, sent_at) VALUES (?, ?, ?, ?)",
(room_id, participant_id, message, datetime.now())
)
If you want, I’m happy to help you dive deeper into any part of this or provide more code examples. Just let me know!busy-breakfast-32623
08/15/2025, 3:52 PMcreate_sip_participant
starts dialing the user
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
room_name=ctx.room.name,
sip_trunk_id=outbound_trunk_id,
sip_call_to=phone_number,
participant_identity=user_identity,
)
)
# a participant is created as soon as we start dialing
participant = await ctx.wait_for_participant(identity=user_identity)
run_voice_pipeline_agent(ctx, participant, instructions)
# in addition, you can monitor the call status separately
start_time = perf_counter()
while perf_counter() - start_time < 30:
call_status = participant.attributes.get("sip.callStatus")
if call_status == "active":
logger.info("user has picked up")
return
elif call_status == "automation":
# if DTMF is used in the sip_call_to
number, typically used to dial
# an extension or enter a PIN.
# during DTMF dialing, the participant will be in the "automation" state
pass
elif participant.disconnect_reason == rtc.DisconnectReason.USER_REJECTED:
logger.info("user rejected the call, exiting job")
break
elif participant.disconnect_reason == rtc.DisconnectReason.USER_UNAVAILABLE:
logger.info("user did not pick up, exiting job")
break
await asyncio.sleep(0.1)
logger.info("session timed out, exiting job")
ctx.shutdown()
class CallActions(llm.FunctionContext):
"""
Detect user intent and perform actions
"""
def __init__(
self, *, api: api.LiveKitAPI, participant: rtc.RemoteParticipant, room: rtc.Room
):
super().__init__()
self.api = api
self.participant = participant
self.room = room
async def hangup(self):
try:
await self.api.room.remove_participant(
api.RoomParticipantIdentity(
room=self.room.name,
identity=self.participant.identity,
)
)
except Exception as e:
# it's possible that the user has already hung up, this error can be ignored
logger.info(f"received error while ending call: {e}")
@llm.ai_callable()
async def end_call(self):
"""Called when the user wants to end the call"""
logger.info(f"ending the call for {self.participant.identity}")
await self.hangup()
@llm.ai_callable()
async def look_up_availability(
self,
date: Annotated[str, "The date of the appointment to check availability for"],
):
"""Called when the user asks about alternative appointment availability"""
logger.info(
f"looking up availability for {self.participant.identity} on {date}"
)
await asyncio.sleep(3)
return json.dumps(
{
"available_times": ["1pm", "2pm", "3pm"],
}
)
@llm.ai_callable()
async def confirm_appointment(
self,
date: Annotated[str, "date of the appointment"],
time: Annotated[str, "time of the appointment"],
):
"""Called when the user confirms their appointment on a specific date. Use this tool only when they are certain about the date and time."""
logger.info(
f"confirming appointment for {self.participant.identity} on {date} at {time}"
)
return "reservation confirmed"
@llm.ai_callable()
async def detected_answering_machine(self):
"""Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting"""
logger.info(f"detected answering machine for {self.participant.identity}")
await self.hangup()
def run_voice_pipeline_agent(
ctx: JobContext, participant: rtc.RemoteParticipant, instructions: str
):
logger.info("starting voice pipeline agent")
initial_ctx = llm.ChatContext().append(
role="system",
text=instructions,
)
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=deepgram.STT(model="nova-2-phonecall"),
llm=openai.LLM(),
tts=openai.TTS(),
chat_ctx=initial_ctx,
fnc_ctx=CallActions(api=ctx.api, participant=participant, room=ctx.room),
)
agent.start(ctx.room, participant)
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
if name == "__main__":
if not outbound_trunk_id or not outbound_trunk_id.startswith("ST_"):
raise ValueError(
"SIP_OUTBOUND_TRUNK_ID is not set"
)
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
# giving this agent a name will allow us to dispatch it via API
# automatic dispatch is disabled when agent_name
is set
agent_name="outbound-caller",
# prewarm by loading the VAD model, needed only for VoicePipelineAgent
prewarm_fnc=prewarm,
)
)tall-airline-16756
08/15/2025, 3:53 PMasync def on_message_received(room_name, participant_id, message):
save_message(room_name, participant_id, message)
3. Keep the Chat Context Fresh: If you’re using LiveKit’s chat context, you can add the new message there and make sure it sticks around: turn_ctx.add_message(
role="assistant",
content=message
)
await self.update_chat_ctx(turn_ctx)
Does this help? If you want, I’m here to walk you through adding this into your specific codebase—just let me know!