able-jackal-84140
07/08/2025, 1:48 PMtall-airline-16756
07/08/2025, 1:48 PMtall-airline-16756
07/08/2025, 1:49 PMAgentSession
with parameters for voice activity detection (VAD), language model (LLM), speech-to-text (STT), and text-to-speech (TTS).
• Use event listeners to monitor changes in user activity state, e.g., detecting when a user becomes inactive or "away."
• Implement an asynchronous task to wait for inactivity (silence) for a configured threshold (e.g., 5 seconds) and then delete the room.
Here is a concise example that matches your snippet and demonstrates how you can implement this:
import asyncio
SILENCE_THRESHOLD = 5 # seconds
async def monitor_interaction(user_last_spoke_time, ctx):
while True:
if time.time() - user_last_spoke_time > SILENCE_THRESHOLD:
await ctx.delete_room()
await asyncio.sleep(1)
In a LiveKit agent context, this is part of a more complete setup that involves:
• Loading VAD to detect when a user stops speaking.
• Listening for user state changes (active to away).
• Creating an inactivity task that triggers after silence threshold.
• Deleting the room via ctx.delete_room()
to end the session.
The official example setup includes configuring the AgentSession
with silence thresholds and user away timeouts and handling user state changes to properly manage the lifecycle of the room.
If you want, I can provide the full example code illustrating this setup in the LiveKit environment.able-jackal-84140
07/08/2025, 1:50 PMtall-airline-16756
07/08/2025, 1:52 PMimport asyncio
import logging
from livekit.agents import (
Agent,
AgentSession,
JobContext,
UserStateChangedEvent,
WorkerOptions,
cli,
)
from livekit.plugins import cartesia, deepgram, openai, silero
logger = logging.getLogger("inactive_user")
async def entrypoint(ctx: JobContext):
session = AgentSession(
vad=silero.VAD.load(),
llm=openai.LLM(model="gpt-4o-mini"),
stt=deepgram.STT(),
tts=cartesia.TTS(),
user_away_timeout=12.5,
)
inactivity_task: asyncio.Task | None = None
async def user_presence_task():
# Hey there! Looks like you might have stepped away. Just checking in to see if you're still around.
for _ in range(3):
await session.generate_reply(
instructions=(
"The user has been inactive. Politely check if the user is still present."
)
)
await asyncio.sleep(10)
await asyncio.shield(session.aclose())
ctx.delete_room()
@session.on("user_state_changed")
def _user_state_changed(ev: UserStateChangedEvent):
nonlocal inactivity_task
if ev.new_state == "away":
inactivity_task = asyncio.create_task(user_presence_task())
return
if inactivity_task is not None:
inactivity_task.cancel()
await session.start(agent=Agent(instructions="You are a helpful assistant."), room=ctx.room)
await ctx.connect()
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
Hi there! Let me break down what this cool little agent does. It sets up a smart session that keeps an eye on user activity using some awesome voice and language tools. If you seem to drift away, it'll gently check in with you a few times. No worries – we just want to make sure you're still around! 😊
Pro tip: Feel free to tweak the user_away_timeout
and ping settings to match exactly what works best for your specific needs.
Sources: livekit-plugins/livekit-plugins-baseten/livekit/plugins/baseten/stt.pyable-jackal-84140
07/08/2025, 1:57 PMtall-airline-16756
07/08/2025, 1:57 PM