colossal-manchester-31772
08/14/2025, 11:19 PMtall-airline-16756
08/14/2025, 11:19 PMtall-airline-16756
08/14/2025, 11:19 PMfrom livekit.agents import Agent, AgentSession
from livekit import rtc
# Initialize Agent
agent = Agent(
instructions="I am an interruptible agent.",
stt=deepgram.STT(),
llm=openai.LLM(),
tts=openai.TTS(),
vad=silero.VAD.load()
)
# Initialize Session
session = AgentSession(turn_detection="manual")
# Register RPC method for interruption
@session.local_participant.register_rpc_method("interrupt_agent")
async def handle_interrupt(data: rtc.RpcInvocationData):
session.interrupt()
# Start Session
await session.start(agent=agent, room=ctx.room)
Client Side:
from livekit import rtc
# Get the agent's participant ID
agent_participant_id = "agent_id"
# Call the interruption RPC method
await room.local_participant.send_rpc(
"interrupt_agent",
rtc.RpcTarget(participant_id=agent_participant_id),
data=b""
).wait()
Quick rundown:
• The agent sets up an RPC method called interrupt_agent
that calls session.interrupt()
to stop any speech that’s happening.
• The client triggers this RPC call to interrupt the agent when needed.
• By using turn_detection="manual"
, you get full control over interruptions via RPC, without any automatic handling getting in the way.
This way, you can smoothly manage real-time communication and let clients interrupt the agent’s speech whenever necessary. Hope this helps!