# Choose turn detector and STT language based on project language
language_name_safe = (languages.language_name or “”).strip().lower()
is_english_language = (
language_name_safe.startswith(“english”) or language_name_safe == “en” or language_name_safe.startswith(“en-“)
)
selected_turn_detector = EnglishModel() if is_english_language else MultilingualModel()
if hasattr(selected_turn_detector, “on”):
try:
@selected_turn_detector.on(“turn_detected”)
async def _on_turn_detected(data=None, **_):
logger.info(f”Turn detected. Info: {data}“)
@selected_turn_detector.on(“error”)
async def _on_turn_error(error, **_):
logger.error(f”Turn detection failed: {error}“)
except Exception as e:
logger.debug(f”Could not attach turn detection hooks: {e}“)
stt_language_code = None
try:
stt_language_code = country_map[language_name_safe][“short_country”]
except Exception:
stt_language_code = None
openai_language_code = “en” if is_english_language else None
openai_detect_language = openai_language_code is None
stt_adapter = STTFallbackAdapter(
stt=[
deepgram.STT(
model=“nova-3-general”,
mip_opt_out=True,
filler_words=True,
language=stt_language_code or “multi”,
),
StreamAdapter(
stt=openai.STT(
model=“gpt-4o-transcribe”,
language=openai_language_code,
detect_language=openai_detect_language,
),
vad=ctx.proc.userdata[“vad”],
),
assemblyai.STT(),
],
attempt_timeout=20.0,
max_retry_per_stt=2,
retry_interval=5.0,
)
if hasattr(stt_adapter, “on”):
try:
@stt_adapter.on(“transcription”)
async def _on_stt_success(text, **_):
logger.info(f”STT succeeded: {text}“)
@stt_adapter.on(“error”)
async def _on_stt_error(error, **_):
logger.error(f”STT failed: {error}“)
except Exception as e:
logger.debug(f”Could not attach STT hooks: {e}“)
llm_adapter = LLMFallbackAdapter(
llm=[
anthropic.LLM(model=“claude-3-5-sonnet-20241022”),
google.LLM(model=“gemini-2.5-flash”, api_key=envconfig.GEMINI_API_KEY),
],
attempt_timeout=120.0, # Increased from 60s
max_retry_per_llm=1, # Reduced from 2 to avoid delays
retry_interval=2.0, # Reduced from 5s
)
if hasattr(llm_adapter, “on”):
try:
@llm_adapter.on(“llm_response”)
async def _on_llm_response(text, **_):
logger.info(f”LLM response chunk: {text}“)
@llm_adapter.on(“error”)
async def _on_llm_error(error, **_):
logger.error(f”LLM failed: {error}“)
except Exception as e:
logger.debug(f”Could not attach LLM hooks: {e}“)
tts_adapter = TTSFallbackAdapter(
tts=[get_voice(voices, languages)]
+ [
TTSStreamAdapter(
tts=openai.TTS(
voice=“alloy”, model=“gpt-4o-mini-tts”, instructions=openai_instructions(languages)
)
),
cartesia.TTS(voice=“97f4b8fb-f2fe-444b-bb9a-c109783a857a”, model=“sonic-2"),
],
max_retry_per_tts=4,
)
if hasattr(tts_adapter, “on”):
try:
@tts_adapter.on(“tts_speaking_started”)
async def _on_tts_speaking_started(data=None, **_):
logger.info(f”TTS speaking started. Info: {data}“)
@tts_adapter.on(“tts_speaking_finished”)
async def _on_tts_speaking_finished(data=None, **_):
logger.info(f”TTS speaking finished. Info: {data}“)
@tts_adapter.on(“error”)
async def _on_tts_error(error, **_):
logger.error(f”TTS failed: {error}“)
except Exception as e:
logger.debug(f”Could not attach TTS hooks: {e}“)
await agent_session.start(
room=ctx.room,
agent=Agent(
turn_detection=selected_turn_detector,
instructions=system_content,
vad=ctx.proc.userdata[“vad”],
min_consecutive_speech_delay=0.8,
stt=stt_adapter,
llm=llm_adapter,
tts=tts_adapter,
),
room_input_options=RoomInputOptions(noise_cancellation=noise_cancellation.BVC(), close_on_disconnect=False),
)
@agent_session.on(“metrics_collected”)
def on_metrics_collected(agent_metrics: metrics.AgentMetrics):
metrics.log_metrics(agent_metrics)
usage_collector.collect(agent_metrics)
try:
logger.debug(
f”Metrics collected - turns: {getattr(agent_metrics, ‘turns’, ‘N/A’)}, ”
f”transcripts: {getattr(agent_metrics, ‘transcripts’, ‘N/A’)}, ”
f”latency_ms: {getattr(agent_metrics, ‘latency_ms’, ‘N/A’)}”
)
except Exception:
pass
async def write_transcript():
logger.info(“Logging transcript...“)
logger.info(
agent_session.history.to_dict(exclude_timestamp=False))
async def get_egress_id_for_room(lkapi: api.LiveKitAPI, room_name: str):
try:
egress_list = await lkapi.egress.list_egress(api.ListEgressRequest(room_name=room_name))
for egress in egress_list.items:
logger.info(f”EGRESS STATE: {egress.status}“)
if egress.status in [api.EgressStatus.EGRESS_STARTING, api.EgressStatus.EGRESS_ACTIVE]:
logger.info(f”Found active egress for room {room_name}: {egress.egress_id}“)
return egress.egress_id
logger.warning(f”No active egress found for room {room_name}“)
return None
except Exception as e:
logger.error(f”Failed to list egresses for room {room_name}: {e}“)
return None
async def safe_close_lkapi():
try:
lkapi.aclose()
close = getattr(lkapi, “close”, None)
if callable(close):
close()
except Exception as e:
logger.warning(f”Error closing LiveKitAPI client: {e}“)
async def cleanup():
if not preview_mode:
logger.info(“Stopping egress...“)
egress_id = await get_egress_id_for_room(lkapi, ctx.room.name)
if egress_id:
await lkapi.egress.stop_egress(
api.StopEgressRequest(
egress_id=egress_id,
)
)
logger.info(“Egress stopped.“)
else:
logger.error(“Egress id not found.“)
await agent_session.aclose()
await safe_close_lkapi()
logger.info(“Agent closing.“)
cleanup_task = None
# Update participant connected handler to show count
@ctx.room.on(“participant_connected”)
def on_participant_connected(participant: proto_models.ParticipantInfo):
nonlocal cleanup_task
logger.info(f”Participant connected: {participant.identity}, total participants: {getattr(ctx.room, ‘num_participants’, ‘unknown’)}“)
if cleanup_task:
logger.info(“Participant reconnected, cancelling scheduled cleanup.“)
cleanup_task.cancel()
cleanup_task = None
@ctx.room.on(“participant_disconnected”)
def on_participant_disconnected(participant: proto_models.ParticipantInfo):
nonlocal cleanup_task
logger.info(f”Participant disconnected: {participant.identity}. Current participants: {getattr(ctx.room, ‘num_participants’, 0)}“)
if getattr(ctx.room, “num_participants”, 0) > 0:
logger.info(
f”Remaining participants count: {ctx.room.num_participants}, not shutting down.”
)
return
logger.info(“Last participant left. Scheduling cleanup in 20 seconds.“)
async def delayed_cleanup():
await asyncio.sleep(5)
logger.info(“Running cleanup after delay.“)
await cleanup()
cleanup_task = asyncio.create_task(delayed_cleanup())
try:
logger.info(f”connecting to room {ctx.room.name}“)
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
participant = await ctx.wait_for_participant()
if “preview” in participant.identity:
preview_mode = True
logger.info(“Preview mode enabled. No egress will be started.“)
if not preview_mode:
egress_info = await start_egress_with_retry(ctx, lkapi, egress_req)
logger.info(f”Starting recording for room {ctx.room.name} with filename {egress_req.file.filepath}“)
logger.info(f”Egress started with ID: {egress_info.egress_id}“)
else:
logger.info(“Preview mode enabled, skipping egress start.“)
translated_intro_msg = translate_message(intro_msg, languages.language_name.lower())
# await agent_session.say(translated_intro_msg, allow_interruptions=False)
await agent_session.say(
text=translated_intro_msg,
allow_interruptions=False,
) # generate_reply needed for realtime model
ctx.add_shutdown_callback(write_transcript)
async def shutdown_close_clients():
await safe_close_lkapi()
ctx.add_shutdown_callback(shutdown_close_clients)
except Exception as e:
logger.error(f”Error starting egress: {e}“)
await safe_close_lkapi()
raise e