creamy-tiger-14805
09/11/2025, 2:57 PMtall-airline-16756
09/11/2025, 2:57 PMtall-airline-16756
09/11/2025, 2:58 PMcreamy-tiger-14805
09/12/2025, 5:27 AMtall-airline-16756
09/12/2025, 5:28 AMAgentSession
, you can enable interruptions by using parameters like allow_interruptions
, min_interruption_duration
, and min_interruption_words
. Here's a simple example to get you started:
session = AgentSession(
allow_interruptions=True,
min_interruption_duration=0.5,
min_interruption_words=1,
)
• Dynamic Handling: While a tool call is running, interruptions can be managed through the RunContext
. You can check if an interruption has occurred using run_ctx.speech_handle.interrupted
and respond accordingly. Here's a snippet that shows how this works:
@function_tool
async def search_web(self, query: str, run_ctx: RunContext):
wait_for_result = asyncio.ensure_future(self._a_long_running_task(query))
await run_ctx.speech_handle.wait_if_not_interrupted([wait_for_result])
if run_ctx.speech_handle.interrupted:
wait_for_result.cancel()
return None
If you'd like, I'm here to help with more details or additional examples—just let me know!
Sources: livekit-agents/livekit/agents/voice/agent_activity.py | Agent speech and audio | LiveKit Docs | Agent speech and audio | LiveKit Docs | Turn detection and interruptions | LiveKit Docs