better-horse-7195
08/07/2025, 9:09 PMfrom __future__ import annotations
import asyncio
import json
import logging
import os
from typing import Any, Set
from dotenv import load_dotenv
from livekit import api, rtc
from livekit.agents import (
Agent, AgentSession, JobContext, JobProcess, RunContext,
cli, WorkerOptions, RoomInputOptions, function_tool,
BackgroundAudioPlayer, AudioConfig, BuiltinAudioClip, get_job_context,
)
from livekit.plugins import deepgram, openai, cartesia, silero, noise_cancellation
from livekit.plugins.noise_cancellation import BVCTelephony
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from metadata import JobMetadata # your pydantic model
load_dotenv(".env.local")
logger = logging.getLogger("outbound-caller")
logger.setLevel(<http://logging.INFO|logging.INFO>)
PLEASANTRIES: Set[str] = {"hi", "hello", "hey", "yes"}
# ───────────────────────────── Agent ──────────────────────────────── #
class OutboundCaller(Agent):
"""Minimal agent – greeting & hang-up handled in entrypoint."""
def __init__(self, *, instructions: str):
super().__init__(instructions=instructions)
self.participant: rtc.RemoteParticipant | None = None
def set_participant(self, participant: rtc.RemoteParticipant):
self.participant = participant
# ------ LLM-visible tools ----------------------------------------
@function_tool()
async def end_call(self, ctx: RunContext):
"""Hang up when user or LLM decides the call is over."""
await get_job_context().api.room.delete_room(
api.DeleteRoomRequest(
room=get_job_context().room.name,
)
)
@function_tool()
async def detected_answering_machine(self, ctx: RunContext):
"""Hang up if voicemail is detected."""
<http://logger.info|logger.info>("AMD Detected")
await get_job_context().api.room.delete_room(
api.DeleteRoomRequest(
room=get_job_context().room.name,
)
)
# ────────────────────────── Pre-warm VAD ────────────────────────── #
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
# ─────────────────────────── Entry point ──────────────────────────── #
async def entrypoint(ctx: JobContext):
session: AgentSession | None = None
try:
# 0 Parse metadata & inject API keys
meta = JobMetadata(**json.loads(ctx.job.metadata))
os.environ.update(
DEEPGRAM_API_KEY=meta.deepgram_api_key,
CARTESIA_API_KEY=meta.cartesia_api_key,
OPENAI_API_KEY=meta.openai_api_key,
)
# 1 Build agent & session
agent = OutboundCaller(instructions=meta.instructions)
session = AgentSession(
vad=ctx.proc.userdata["vad"],
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3", interim_results=True, language="multi"),
tts=cartesia.TTS(model="sonic-2", voice="694f9389-aac1-45b6-b726-9d9369183238"),
turn_detection="stt",
preemptive_generation=True,
allow_interruptions=True,
)
# Pleasantry filter & first-turn latch
first_turn = asyncio.Event()
@session.on("user_input_transcribed")
def _filter(ev):
if ev.is_final:
if ev.transcript.strip().lower() in PLEASANTRIES:
ev.add_to_chat_ctx = False
first_turn.set()
# Background ambience
background_audio = BackgroundAudioPlayer(
ambient_sound=AudioConfig(BuiltinAudioClip.OFFICE_AMBIENCE, volume=0.8),
thinking_sound=[
AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING, volume=0.8),
AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING2, volume=0.7),
],
)
# 2 Start session as background task & dial
session_started = asyncio.create_task(
session.start(
agent=agent,
room=ctx.room,
room_input_options=RoomInputOptions(
noise_cancellation=noise_cancellation.BVCTelephony()
),
)
)
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
room_name=ctx.room.name,
sip_trunk_id=meta.sip_outbound_trunk_id,
sip_call_to=meta.phone_number,
participant_identity=meta.phone_number,
wait_until_answered=True,
)
)
# 3 Wait for session start and participant join
await session_started
participant = await ctx.wait_for_participant(identity=meta.phone_number)
agent.set_participant(participant)
# Start background audio after session is fully established
await background_audio.start(room=ctx.room, agent_session=session)
try:
await asyncio.wait_for(first_turn.wait(), timeout=1.5)
except asyncio.TimeoutError:
pass # silent pick-up
# 4 Deterministic greeting (valid SSML)
greeting = (
meta.greeting or "Hello, this is Sara from ABC Finance."
)
await session.say(greeting, allow_interruptions=True, add_to_chat_ctx=False)
# Session will continue running naturally - no session.run() needed
except Exception as exc:
logger.exception(f"Outbound-caller fatal error: {exc}")
# best-effort room cleanup
try:
await ctx.api.room.delete_room(api.DeleteRoomRequest(room=ctx.room.name))
except Exception:
pass
finally:
# Dump conversation history (works on all SDK versions)
if session and getattr(session, "history", None):
h = session.history
try:
out = json.dumps(h.to_dict(), indent=2) # ≥1.0.2
except AttributeError:
out = getattr(h, "to_json", lambda **_: str(h))(indent=2)
print("\n--- Call Transcript ---")
print(out)
print("--- End Transcript ---\n")
# ─────────────────────────── CLI runner ──────────────────────────── #
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
agent_name="outbound-caller",
prewarm_fnc=prewarm, # drop if cold-start latency isn't a concern
)
)
how to play greeting only once and never again when participant joins the calltall-airline-16756
08/07/2025, 9:09 PMtall-airline-16756
08/07/2025, 9:10 PMclass OutboundCaller(Agent):
def __init__(self, *, instructions: str):
super().__init__(instructions=instructions)
self.greeted_participants = set()
def set_participant(self, participant: rtc.RemoteParticipant):
if participant.identity not in self.greeted_participants:
self.greeted_participants.add(participant.identity)
# Play greeting here
self.session.say(f"Hello, {participant.identity}!")
This ensures that each participant hears the greeting only once, even if they rejoin the call.better-horse-7195
08/07/2025, 9:20 PMtall-airline-16756
08/07/2025, 9:20 PMPLEASANTRIES = {"hi", "hello", "hey"}
@session.on("user_input_transcribed")
def handle_input(event):
if event.is_final:
if event.transcript.strip().lower() in PLEASANTRIES:
# You can say hi back or simply skip
session.say("Hello! How can I assist you today?")
event.add_to_chat_ctx = False
This way, those little greetings get the friendly attention they deserve without interrupting the flow of your call.
Sources: README.md | examples/demo/demo.ts | telephony/warm_handoff.pybetter-horse-7195
08/07/2025, 9:22 PMtall-airline-16756
08/07/2025, 9:23 PMgenerate_reply
method to send a friendly greeting. For example:
await session.generate_reply(
instructions="Greet the user and offer your assistance."
)
This way, your greeting is created on the spot and sent right after the user picks up the call, making the interaction smooth and welcoming.
Sources: Agents telephony integration | LiveKit Docsbetter-horse-7195
08/07/2025, 9:24 PMtall-airline-16756
08/07/2025, 9:24 PMPLEASANTRIES = {"hi", "hello", "hey"}
@session.on("user_input_transcribed")
def handle_input(event):
if event.is_final:
if event.transcript.strip().lower() in PLEASANTRIES:
# Respond to the greeting
session.say("Hello! This is Sara from ABC Finance. How can I assist you today?")
event.add_to_chat_ctx = False
This way, your agent naturally acknowledges the user's greeting before moving on, making the conversation feel more friendly and engaging.better-horse-7195
08/07/2025, 9:26 PMtall-airline-16756
08/07/2025, 9:26 PMfrom livekit.plugins.turn_detector.multilingual import MultilingualModel
session = AgentSession(
turn_detection=MultilingualModel(),
min_endpointing_delay=1000, # Adjust silence threshold
)
1. Wait for the User’s Turn to End: Once the user stops speaking, then play your greeting:
# Wait for the user's turn to end
handle = session.generate_reply(
instructions="Greet the user and offer your assistance."
)
await handle.wait_for_playout()
This way, the greeting plays only after the user’s initial response, no matter what they say. It feels more natural and respectful of their time.better-horse-7195
08/07/2025, 9:27 PMtall-airline-16756
08/07/2025, 9:28 PMgenerate_reply
method in LiveKit: it doesn’t automatically wait for the user to finish their turn before jumping in with a response. By default, it starts crafting a reply as soon as it gets the final transcript, especially if preemptive generation is turned on. This helps keep things snappy, but it might not always match up with waiting for the user to fully finish speaking.
If you’d prefer the greeting to play only after the user’s done talking, no worries! You can simply turn off preemptive generation or use turn detection to make sure generate_reply
kicks in right after the user’s turn wraps up.better-horse-7195
08/07/2025, 9:36 PMfrom __future__ import annotations
import asyncio
import json
import logging
import os
import time
from typing import Any, Set
from dotenv import load_dotenv
from livekit import api, rtc
from livekit.agents import (
Agent, AgentSession, JobContext, JobProcess, RunContext,
cli, WorkerOptions, RoomInputOptions, function_tool,
BackgroundAudioPlayer, AudioConfig, BuiltinAudioClip, get_job_context,
)
from livekit.plugins import deepgram, openai, cartesia, silero, noise_cancellation
from livekit.plugins.noise_cancellation import BVCTelephony
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from metadata import JobMetadata # your pydantic model
load_dotenv(".env.local")
logger = logging.getLogger("outbound-caller")
logger.setLevel(<http://logging.INFO|logging.INFO>)
PLEASANTRIES: Set[str] = {"hi", "hello", "hey", "yes"}
# ───────────────────────────── Agent ──────────────────────────────── #
class OutboundCaller(Agent):
"""Outbound Agent that greets each participant only once and then let LLM drive the conversation"""
def __init__(self, *, instructions: str):
super().__init__(instructions=instructions)
self.greeted_participants: Set[str] = set()
self.participant: rtc.RemoteParticipant | None = None
def set_participant(self, participant: rtc.RemoteParticipant):
self.participant = participant
async def greet_participant_once(self, participant: rtc.RemoteParticipant, session: AgentSession, greeting: str):
"""Greet participant only if not already greeted."""
if participant.identity not in self.greeted_participants:
self.greeted_participants.add(participant.identity)
await asyncio.sleep(3) # 3 second delay before greeting
await session.say(greeting, allow_interruptions=True)
#await session.say(greeting, allow_interruptions=True, add_to_chat_ctx=False)
<http://logger.info|logger.info>(f"Greeted participant: {participant.identity}")
else:
<http://logger.info|logger.info>(f"Participant {participant.identity} already greeted, skipping greeting")
# ------ LLM-visible tools ----------------------------------------
@function_tool()
async def end_call(self, ctx: RunContext):
"""Hang up when user or LLM decides the call is over."""
await get_job_context().api.room.delete_room(
api.DeleteRoomRequest(
room=get_job_context().room.name,
)
)
@function_tool()
async def detected_answering_machine(self, ctx: RunContext):
"""Hang up if voicemail is detected."""
<http://logger.info|logger.info>("AMD Detected")
await get_job_context().api.room.delete_room(
api.DeleteRoomRequest(
room=get_job_context().room.name,
)
)
# ────────────────────────── Pre-warm VAD ────────────────────────── #
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
# ─────────────────────────── Entry point ──────────────────────────── #
async def entrypoint(ctx: JobContext):
session: AgentSession | None = None
try:
# 0 Parse metadata & inject API keys
meta = JobMetadata(**json.loads(ctx.job.metadata))
os.environ.update(
DEEPGRAM_API_KEY=meta.deepgram_api_key,
CARTESIA_API_KEY=meta.cartesia_api_key,
OPENAI_API_KEY=meta.openai_api_key,
)
# 1 Build agent & session
agent = OutboundCaller(instructions=meta.instructions)
session = AgentSession(
vad=ctx.proc.userdata["vad"],
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3", interim_results=True, language="multi"),
tts=cartesia.TTS(model="sonic-2", voice="694f9389-aac1-45b6-b726-9d9369183238"),
turn_detection="stt",
preemptive_generation=True,
allow_interruptions=True,
)
# Pleasantry filter & first-turn latch
first_turn = asyncio.Event()
@session.on("user_input_transcribed")
def _filter(ev):
if ev.is_final:
if ev.transcript.strip().lower() in PLEASANTRIES:
ev.add_to_chat_ctx = False
first_turn.set()
# Background ambience
background_audio = BackgroundAudioPlayer(
ambient_sound=AudioConfig(BuiltinAudioClip.OFFICE_AMBIENCE, volume=0.8),
thinking_sound=[
AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING, volume=0.8),
AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING2, volume=0.7),
],
)
# 2 Start session as background task & dial
session_started = asyncio.create_task(
session.start(
agent=agent,
room=ctx.room,
room_input_options=RoomInputOptions(
noise_cancellation=noise_cancellation.BVCTelephony()
),
)
)
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
room_name=ctx.room.name,
sip_trunk_id=meta.sip_outbound_trunk_id,
sip_call_to=meta.phone_number,
participant_identity=meta.phone_number,
wait_until_answered=True,
)
)
# 3 Wait for session start and participant join
await session_started
participant = await ctx.wait_for_participant(identity=meta.phone_number)
agent.set_participant(participant)
# Start background audio after session is fully established
await background_audio.start(room=ctx.room, agent_session=session)
try:
await asyncio.wait_for(first_turn.wait(), timeout=1.5)
except asyncio.TimeoutError:
pass # silent pick-up
# 4 Play greeting only once using agent's method
greeting = (
meta.greeting or "Hello, this is Sara from ABC Finance."
)
# Use the agent's greeting method instead of session.say directly
await agent.greet_participant_once(participant, session, greeting)
# Session will continue running naturally - no session.run() needed
# LLM now takes over the conversation
except Exception as exc:
logger.exception(f"Outbound-caller fatal error: {exc}")
# best-effort room cleanup
try:
await ctx.api.room.delete_room(api.DeleteRoomRequest(room=ctx.room.name))
except Exception:
pass
finally:
# Dump conversation history (works on all SDK versions)
if session and getattr(session, "history", None):
h = session.history
try:
out = json.dumps(h.to_dict(), indent=2) # ≥1.0.2
except AttributeError:
out = getattr(h, "to_json", lambda **_: str(h))(indent=2)
print("\n--- Call Transcript ---")
print(out)
print("--- End Transcript ---\n")
# ─────────────────────────── CLI runner ──────────────────────────── #
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
agent_name="outbound-caller",
prewarm_fnc=prewarm, # drop if cold-start latency isn't a concern
)
)
any issues with this code?better-horse-7195
08/07/2025, 9:36 PMtall-airline-16756
08/07/2025, 9:36 PMLIVEKIT_URL
, LIVEKIT_API_KEY
, LIVEKIT_API_SECRET
) are set up correctly—this helps avoid any hiccups with authentication.
2. SIP Trunk Configuration: Make sure the sip_trunk_id
and other SIP settings are spot-on so your calls connect without a hitch.
3. Greeting Timing: I love that you’re waiting for the user’s turn before playing the greeting. Just confirm that your first_turn
logic reliably knows when the user’s done speaking.
4. Error Handling: It’s a great idea to add strong error handling for things like failed SIP participant creation or room cleanup—this way, you won’t run into unexpected crashes.
5. Third-Party API Integration: Take a moment to double-check your setup with APIs like OpenAI and Deepgram to make sure speech recognition and synthesis are working like a charm.
6. Voicemail Detection: Your detected_answering_machine
method is a smart touch! Be sure to test it well so it catches voicemail situations effectively.
Addressing these points should help your outbound calls run nice and smooth. You’re on the right track!
Sources: Making calls using SIP | LiveKit Docs | README.md | Agents telephony integration | LiveKit Docstall-airline-16756
08/07/2025, 9:36 PMturn_detection="stt"
makes sense, but make sure it fits with how you want to detect when the user has finished speaking.
3. Error Handling: You’ve got error handling for room cleanup, which is great! It might be helpful to add some more detailed error handling around SIP participant creation and session start, so you can catch any issues early on.
4. Voicemail Detection: The detected_answering_machine
method is there but isn’t currently triggered in the flow. If voicemail detection is important for your use case, make sure it’s called at the right time.
5. Greeting Timing: The await asyncio.sleep(3)
delay before the greeting might not always sync perfectly with when the user is ready to hear it. You might want to try using turn detection to decide the best moment to play the greeting dynamically.
6. Logging: Your logging setup looks solid! Just keep an eye on it to avoid accidentally logging sensitive info like API keys or participant details.
All in all, your code is looking strong! Tweaking the greeting timing and beefing up error handling a bit could make it even more reliable. Keep up the great work!better-horse-7195
08/07/2025, 9:37 PMawait asyncio.sleep(3)
delay before the greeting might not always sync perfectly with when the user is ready to hear it. You might want to try using turn detection to decide the best moment to play the greeting dynamically.
can u help how can i handle end of turn detection before running the agent.say?tall-airline-16756
08/07/2025, 9:38 PMagent.say()
, LiveKit has some handy turn detection tools. Here’s how you can set it up:
1. Set up VAD (Voice Activity Detection): You can tweak the silence thresholds to figure out when the user stops speaking:
session = AgentSession(
min_endpointing_delay=0.8, # Minimum silence before EOT (e.g., 800ms)
max_endpointing_delay=4.0 # Maximum time to wait for EOT (e.g., 4 seconds)
)
1. Try the Transformer-Based EOU Model: For even better accuracy, go for the multilingual turn detector like this:
from livekit.plugins.turn_detector.multilingual import MultilingualModel
session = AgentSession(
turn_detection=MultilingualModel(),
min_endpointing_delay=0.5, # Adjust as needed
max_endpointing_delay=6.0
)
These settings help your agent wait patiently until the user finishes talking before jumping in. If you want a hand adding this to your code, just give me a shout!
Sources: AI voice agents | LiveKit Docs | Speech & Audio Processing | livekit/livekit_composite | DeepWiki | livekit-agents/livekit/agents/voice/audio_recognition.py | How does end-of-utterance detection work in conversations?better-horse-7195
08/07/2025, 9:39 PMturn_detection="stt",
preemptive_generation=True,
allow_interruptions=True,
i am already using turn detectiontall-airline-16756
08/07/2025, 9:39 PMturn_detection="stt"
, preemptive_generation=True
, and allow_interruptions=True
, here’s a friendly tip to fine-tune things so the greeting plays only after the user’s turn wraps up:
1. Turn Off Preemptive Generation Just for the Greeting: Preemptive generation helps speed things up, but it might jump the gun before the user finishes talking. You can switch it off temporarily when playing the greeting like this:
session = AgentSession(
turn_detection="stt",
preemptive_generation=False, # Disable for greeting
allow_interruptions=True
)
1. Tap into Event Listeners: Use the user_input_transcribed
event to catch when the user’s done speaking and then play the greeting:
@session.on("user_input_transcribed")
def handle_user_turn(event):
if event.is_final:
# User's turn has ended
session.say("Hello! This is Sara from ABC Finance. How can I assist you today?")
This way, the greeting feels natural and only pops up right after the user finishes talking—no interruptions, just smooth conversation.better-horse-7195
08/07/2025, 9:50 PMfrom __future__ import annotations
import asyncio
import json
import logging
import os
import time
from typing import Any, Set
from dotenv import load_dotenv
from livekit import api, rtc
from livekit.agents import (
Agent, AgentSession, JobContext, JobProcess, RunContext,
cli, WorkerOptions, RoomInputOptions, function_tool,
BackgroundAudioPlayer, AudioConfig, BuiltinAudioClip, get_job_context,
)
from livekit.plugins import deepgram, openai, cartesia, silero, noise_cancellation
from livekit.plugins.noise_cancellation import BVCTelephony
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from metadata import JobMetadata # your pydantic model
load_dotenv(".env.local")
logger = logging.getLogger("outbound-caller")
logger.setLevel(<http://logging.INFO|logging.INFO>)
PLEASANTRIES: Set[str] = {"hi", "hello", "hey", "yes"}
# ───────────────────────────── Agent ──────────────────────────────── #
class OutboundCaller(Agent):
"""Outbound Agent that greets each participant only once and then let LLM drive the conversation"""
def __init__(self, *, instructions: str):
super().__init__(instructions=instructions)
self.greeted_participants: Set[str] = set()
self.participant: rtc.RemoteParticipant | None = None
def set_participant(self, participant: rtc.RemoteParticipant):
self.participant = participant
async def greet_participant_once(self, participant: rtc.RemoteParticipant, session: AgentSession, greeting: str):
"""Greet participant only if not already greeted."""
if participant.identity not in self.greeted_participants:
self.greeted_participants.add(participant.identity)
await asyncio.sleep(3) # 3 second delay before greeting
await session.say(greeting, allow_interruptions=True)
#await session.say(greeting, allow_interruptions=True, add_to_chat_ctx=False)
<http://logger.info|logger.info>(f"Greeted participant: {participant.identity}")
else:
<http://logger.info|logger.info>(f"Participant {participant.identity} already greeted, skipping greeting")
# ------ LLM-visible tools ----------------------------------------
@function_tool()
async def end_call(self, ctx: RunContext):
"""Hang up when user or LLM decides the call is over."""
await get_job_context().api.room.delete_room(
api.DeleteRoomRequest(
room=get_job_context().room.name,
)
)
@function_tool()
async def detected_answering_machine(self, ctx: RunContext):
"""Hang up if voicemail is detected."""
<http://logger.info|logger.info>("AMD Detected")
await get_job_context().api.room.delete_room(
api.DeleteRoomRequest(
room=get_job_context().room.name,
)
)
# ────────────────────────── Pre-warm VAD ────────────────────────── #
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
# ─────────────────────────── Entry point ──────────────────────────── #
async def entrypoint(ctx: JobContext):
session: AgentSession | None = None
try:
# 0 Parse metadata & inject API keys
meta = JobMetadata(**json.loads(ctx.job.metadata))
os.environ.update(
DEEPGRAM_API_KEY=meta.deepgram_api_key,
CARTESIA_API_KEY=meta.cartesia_api_key,
OPENAI_API_KEY=meta.openai_api_key,
)
# 1 Build agent & session
agent = OutboundCaller(instructions=meta.instructions)
session = AgentSession(
vad=ctx.proc.userdata["vad"],
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3", interim_results=True, language="multi"),
tts=cartesia.TTS(model="sonic-2", voice="694f9389-aac1-45b6-b726-9d9369183238"),
turn_detection="stt",
preemptive_generation=True,
allow_interruptions=True,
)
# Pleasantry filter & first-turn latch
first_turn = asyncio.Event()
@session.on("user_input_transcribed")
def _filter(ev):
if ev.is_final:
if ev.transcript.strip().lower() in PLEASANTRIES:
ev.add_to_chat_ctx = False
first_turn.set()
# Background ambience
background_audio = BackgroundAudioPlayer(
ambient_sound=AudioConfig(BuiltinAudioClip.OFFICE_AMBIENCE, volume=0.8),
thinking_sound=[
AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING, volume=0.8),
AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING2, volume=0.7),
],
)
# 2 Start session as background task & dial
session_started = asyncio.create_task(
session.start(
agent=agent,
room=ctx.room,
room_input_options=RoomInputOptions(
noise_cancellation=noise_cancellation.BVCTelephony()
),
)
)
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
room_name=ctx.room.name,
sip_trunk_id=meta.sip_outbound_trunk_id,
sip_call_to=meta.phone_number,
participant_identity=meta.phone_number,
wait_until_answered=True,
)
)
# 3 Wait for session start and participant join
await session_started
participant = await ctx.wait_for_participant(identity=meta.phone_number)
agent.set_participant(participant)
# Start background audio after session is fully established
await background_audio.start(room=ctx.room, agent_session=session)
try:
await asyncio.wait_for(first_turn.wait(), timeout=1.5)
except asyncio.TimeoutError:
pass # silent pick-up
# 4 Play greeting only once using agent's method
greeting = (
meta.greeting or "Hello, this is Sara from ABC Finance."
)
# Use the agent's greeting method instead of session.say directly
await agent.greet_participant_once(participant, session, greeting)
# Session will continue running naturally - no session.run() needed
# LLM now takes over the conversation
except Exception as exc:
logger.exception(f"Outbound-caller fatal error: {exc}")
# best-effort room cleanup
try:
await ctx.api.room.delete_room(api.DeleteRoomRequest(room=ctx.room.name))
except Exception:
pass
finally:
# Dump conversation history (works on all SDK versions)
if session and getattr(session, "history", None):
h = session.history
try:
out = json.dumps(h.to_dict(), indent=2) # ≥1.0.2
except AttributeError:
out = getattr(h, "to_json", lambda **_: str(h))(indent=2)
print("\n--- Call Transcript ---")
print(out)
print("--- End Transcript ---\n")
# ─────────────────────────── CLI runner ──────────────────────────── #
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
agent_name="outbound-caller",
prewarm_fnc=prewarm, # drop if cold-start latency isn't a concern
)
)
is this better?tall-airline-16756
08/07/2025, 9:50 PMturn_detection="stt"
is a smart choice for catching when the user stops talking. Just double-check that your first_turn
logic consistently grabs the user’s first response.
2. Greeting Timing: The greet_participant_once
method is a nice touch! Instead of using await asyncio.sleep(3)
, you might find it better to lean on the first_turn
event so the greeting plays right when it feels natural.
3. Error Handling: Your approach to cleaning up the room on errors is solid. It might help to add some specific error messages for things like SIP participant creation or session start issues—makes troubleshooting a bit smoother.
4. Background Audio: Starting background audio after the session kicks off is a good move. Just keep an eye on it so it doesn’t clash with picking up the user’s speech.
5. Voicemail Detection: The detected_answering_machine
method is there but not currently hooked into the flow. Make sure to trigger it if you want voicemail detection to work as expected.
All in all, this update is definitely a step forward. If you want to polish the greeting timing even more, try weaving the first_turn
event right into the greet_participant_once
method—that way, the greeting will line up perfectly with when the user starts talking.better-horse-7195
08/07/2025, 9:54 PMfrom __future__ import annotations
import asyncio
import json
import logging
import os
import time
from typing import Any, Set
from dotenv import load_dotenv
from livekit import api, rtc
from livekit.agents import (
Agent, AgentSession, JobContext, JobProcess, RunContext,
cli, WorkerOptions, RoomInputOptions, function_tool,
BackgroundAudioPlayer, AudioConfig, BuiltinAudioClip, get_job_context,
)
from livekit.plugins import deepgram, openai, cartesia, silero, noise_cancellation
from livekit.plugins.noise_cancellation import BVCTelephony
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from metadata import JobMetadata # your pydantic model
load_dotenv(".env.local")
logger = logging.getLogger("outbound-caller")
logger.setLevel(<http://logging.INFO|logging.INFO>)
PLEASANTRIES: Set[str] = {"hi", "hello", "hey", "yes"}
# ───────────────────────────── Agent ──────────────────────────────── #
class OutboundCaller(Agent):
"""Outbound Agent that greets each participant only once and then let LLM drive the conversation"""
def __init__(self, *, instructions: str):
super().__init__(instructions=instructions)
self.greeted_participants: Set[str] = set()
self.participant: rtc.RemoteParticipant | None = None
def set_participant(self, participant: rtc.RemoteParticipant):
self.participant = participant
async def greet_participant_once(self, participant: rtc.RemoteParticipant, session: AgentSession, greeting: str,
first_turn_event: asyncio.Event):
"""Greet participant only if not already greeted, timing it with first turn."""
if participant.identity not in self.greeted_participants:
self.greeted_participants.add(participant.identity)
# Wait for user's first turn OR timeout for silent pickup
try:
await asyncio.wait_for(first_turn_event.wait(), timeout=5.0)
# User spoke, wait for natural pause then greet
await asyncio.sleep(1.0)
except asyncio.TimeoutError:
# Silent pickup - proceed with greeting
pass
await session.say(greeting, allow_interruptions=True)
# await session.say(greeting, allow_interruptions=True, add_to_chat_ctx=False)
<http://logger.info|logger.info>(f"Greeted participant: {participant.identity}")
else:
<http://logger.info|logger.info>(f"Participant {participant.identity} already greeted, skipping greeting")
# ------ LLM-visible tools ----------------------------------------
@function_tool()
async def end_call(self, ctx: RunContext):
"""Hang up when user or LLM decides the call is over."""
await get_job_context().api.room.delete_room(
api.DeleteRoomRequest(
room=get_job_context().room.name,
)
)
@function_tool()
async def detected_answering_machine(self, ctx: RunContext):
"""Hang up if voicemail is detected."""
<http://logger.info|logger.info>("AMD Detected")
await get_job_context().api.room.delete_room(
api.DeleteRoomRequest(
room=get_job_context().room.name,
)
)
# ────────────────────────── Pre-warm VAD ────────────────────────── #
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
# ─────────────────────────── Entry point ──────────────────────────── #
async def entrypoint(ctx: JobContext):
session: AgentSession | None = None
try:
# 0 Parse metadata & inject API keys
meta = JobMetadata(**json.loads(ctx.job.metadata))
os.environ.update(
DEEPGRAM_API_KEY=meta.deepgram_api_key,
CARTESIA_API_KEY=meta.cartesia_api_key,
OPENAI_API_KEY=meta.openai_api_key,
)
# 1 Build agent & session
agent = OutboundCaller(instructions=meta.instructions)
session = AgentSession(
vad=ctx.proc.userdata["vad"],
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(model="nova-3", interim_results=True, language="multi"),
tts=cartesia.TTS(model="sonic-2", voice="694f9389-aac1-45b6-b726-9d9369183238"),
turn_detection="stt",
preemptive_generation=True,
allow_interruptions=True,
)
# Greeting tracking
greeting_played = asyncio.Event()
first_turn = asyncio.Event()
@session.on("user_input_transcribed")
def _filter(ev):
if ev.is_final:
if not greeting_played.is_set():
# Block LLM until after greeting and trigger first turn
ev.add_to_chat_ctx = False
first_turn.set()
elif ev.transcript.strip().lower() in PLEASANTRIES:
# Normal pleasantry filtering
ev.add_to_chat_ctx = False
# Background ambience
background_audio = BackgroundAudioPlayer(
ambient_sound=AudioConfig(BuiltinAudioClip.OFFICE_AMBIENCE, volume=0.8),
thinking_sound=[
AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING, volume=0.8),
AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING2, volume=0.7),
],
)
# 2 Start session as background task & dial
session_started = asyncio.create_task(
session.start(
agent=agent,
room=ctx.room,
room_input_options=RoomInputOptions(
noise_cancellation=noise_cancellation.BVCTelephony()
),
)
)
await ctx.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
room_name=ctx.room.name,
sip_trunk_id=meta.sip_outbound_trunk_id,
sip_call_to=meta.phone_number,
participant_identity=meta.phone_number,
wait_until_answered=True,
)
)
# 3 Wait for session start and participant join
await session_started
participant = await ctx.wait_for_participant(identity=meta.phone_number)
agent.set_participant(participant)
# 4 Play greeting with natural timing using first_turn event
greeting = (
meta.greeting or "Hello, this is Sara from ABC Finance."
)
# Use the agent's greeting method with first_turn timing
await agent.greet_participant_once(participant, session, greeting, first_turn)
greeting_played.set()
# Start background audio after greeting
await background_audio.start(room=ctx.room, agent_session=session)
# Session will continue running naturally - no session.run() needed
# LLM now takes over the conversation
except Exception as exc:
logger.exception(f"Outbound-caller fatal error: {exc}")
# best-effort room cleanup
try:
await ctx.api.room.delete_room(api.DeleteRoomRequest(room=ctx.room.name))
except Exception:
pass
finally:
# Dump conversation history (works on all SDK versions)
if session and getattr(session, "history", None):
h = session.history
try:
out = json.dumps(h.to_dict(), indent=2) # ≥1.0.2
except AttributeError:
out = getattr(h, "to_json", lambda **_: str(h))(indent=2)
print("\n--- Call Transcript ---")
print(out)
print("--- End Transcript ---\n")
# ─────────────────────────── CLI runner ──────────────────────────── #
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
agent_name="outbound-caller",
prewarm_fnc=prewarm, # drop if cold-start latency isn't a concern
)
)
so this is good?tall-airline-16756
08/07/2025, 9:54 PMfirst_turn
to sync the greeting with the user’s first response is a smart move. Plus, having a fallback timeout is a nice touch to keep things moving if the user stays quiet.
2. Greeting Played Flag: Adding the greeting_played
event is a clever way to avoid any overlapping or repeated greetings.
3. Background Audio: Starting the background audio after the greeting is perfect—it keeps things clear and doesn’t interrupt the user’s initial response or the greeting itself.
4. Error Handling: Your error handling around room cleanup and session management looks solid. Maybe think about logging specific errors for SIP participant creation or session start to make debugging a bit easier.
5. Voicemail Detection: Just double-check that the detected_answering_machine
method fits smoothly into the flow if you need to handle voicemail.
All in all, this version is really well put together and should work nicely for what you’re aiming for. Feel free to reach out if you want to tweak anything or need a hand with the next steps!