how to terminate the call based on when the user s...
# ask-ai
a
how to terminate the call based on when the user says terminate
t
Thanks for reaching out, Aditya. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
To terminate a call in LiveKit when the user says "terminate," you can implement an
end_call
function in your
CallActions
class, decorated with
@llm.ai_callable()
. This function will be triggered once the intent to terminate is detected:
Copy code
@llm.ai_callable()
async def end_call(self):
    """Called when the user wants to end the call"""
    <http://logger.info|logger.info>(f"ending the call for {self.participant.identity}")
    await self.hangup()
The
hangup
method removes the participant from the room, effectively ending the call:
Copy code
async def hangup(self):
    try:
        await self.api.room.remove_participant(api.RoomParticipantIdentity(
            room=self.room.name,
            identity=self.participant.identity,
        ))
    except Exception as e:
        # this error can be ignored if the user has already hung up
        <http://logger.info|logger.info>(f"received error while ending call: {e}")
You can also optionally add a goodbye message before the call ends for better user experience. This approach works once the intent "terminate" is detected by LiveKit's LLM intent detection mechanism. Let me know if you want details on how to integrate this with speech input detection.
a
how to use this in thie call? import os import pathlib import logging from dotenv import load_dotenv from livekit.agents import JobContext, WorkerOptions, cli from livekit.agents.voice import Agent, AgentSession from livekit.agents.voice.events import CloseEvent from livekit.plugins import deepgram, openai, silero logger = logging.getLogger("session_termination_audio") logger.setLevel(logging.INFO) load_dotenv() async def *entrypoint*(_ctx_: JobContext): await ctx.connect() session = AgentSession( stt=deepgram.STT(), llm=openai.LLM(), tts=silero.TTS(), vad=silero.VAD.load(), ) # Define termination audio file path custom_termination_audio = os.path.join( pathlib.Path(file).parent.absolute(), "../shinchan_hangup_message.wav" ) @session.on("close") async def *on_close*(_ev_: CloseEvent): logger.info(f"Session is closing, reason: {ev.reason}") # Play termination audio async def *audio_frames_from_file*(_file_path_: str): """ Generates audio frames from an audio file. """ # Implementation example using livekit.utils.audioread from livekit.utils import audioread from livekit.rtc import AudioFrame with audioread.open_file(_file_path_) as audio: for frame in audio.frames(): yield AudioFrame( data=frame.data, _sample_rate_=audio.sample_rate, _num_channels_=audio.num_channels, _samples_per_channel_=len(frame.data) // (audio.num_channels * 2) ) await session.say( "Thank you for using our service. Goodbye!", audio=audio_frames_from_file(custom_termination_audio), _allow_interruptions_=False, ) await session.start(agent=Agent(instructions="You are a helpful assistant."), room=ctx.room) if name == "__main__": cli.run_app(WorkerOptions(_entrypoint_fnc_=entrypoint))
t
Hey there! Let's talk about adding a smooth call termination feature to your LiveKit
AgentSession
. I totally get how you want to make the end of the call feel personal and considerate. Here's a friendly walkthrough to help you integrate this: • First up, set up an
on_close
event handler on the
session
that plays a warm goodbye audio. Your existing
session.say()
with
audio_frames_from_file
is perfect for this! • Create a termination method - maybe triggered when the user says something like "terminate" - that calls
await session.close()
or triggers the close event. This will smoothly activate your goodbye audio. • Make sure your
audio_frames_from_file
generator is all set to play those audio frames from your file, just like in your example. • Then just start your session with your
Agent
and
room
as you normally would. When the session closes, your goodbye message will play, giving the call a nice, personal touch before disconnecting. Want me to walk you through a detailed code example of how to set up termination detection and call
session.close()
? I'd be happy to help! Sources: tests/test_agent_session.py
a
give me the complete code example
t
Hey there! Let me walk you through a cool example of how to handle ending a call with a friendly touch. I've got a neat script that shows you how to play a goodbye audio and close your session smoothly when someone wants to wrap things up.
Copy code
import os
import pathlib
import logging
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli
from livekit.agents.voice import Agent, AgentSession
from livekit.agents.voice.events import CloseEvent
from livekit.plugins import deepgram, openai, silero

logger = logging.getLogger("session_termination_audio")
logger.setLevel(<http://logging.INFO|logging.INFO>)

load_dotenv()

async def entrypoint(ctx: JobContext):
    await ctx.connect()

    session = AgentSession(
        stt=deepgram.STT(),
        llm=openai.LLM(),
        tts=silero.TTS(),
        vad=silero.VAD.load(),
    )

    # Define termination audio file path
    custom_termination_audio = os.path.join(
        pathlib.Path(__file__).parent.absolute(),
        "../shinchan_hangup_message.wav"
    )

    async def audio_frames_from_file(file_path: str):
        """
        Generates audio frames from an audio file.
        """
        from livekit.utils import audioread
        from livekit.rtc import AudioFrame

        with audioread.open_file(file_path) as audio:
            for frame in audio.frames():
                yield AudioFrame(
                    data=frame.data,
                    sample_rate=audio.sample_rate,
                    num_channels=audio.num_channels,
                    samples_per_channel=len(frame.data) // (audio.num_channels * 2)
                )

    @session.on("close")
    async def on_close(ev: CloseEvent):
        <http://logger.info|logger.info>(f"Session is closing, reason: {ev.reason}")

        # Play termination audio
        await session.say(
            "Thank you for using our service. Goodbye!",
            audio=audio_frames_from_file(custom_termination_audio),
            allow_interruptions=False,
        )

    # Start the session
    await session.start(
        agent=Agent(instructions="You are a helpful assistant."),
        room=ctx.room
    )

if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
Let me break down what's happening here in a friendly way: • We're setting up an awesome AgentSession with all the cool tech: speech-to-text, language model, text-to-speech, and voice activity detection. • We've got a sweet way to handle closing the session, complete with a goodbye message. • Starting up the session is super straightforward! Quick pro tip: To actually end the call when someone says "terminate", you'll want to add some custom intent detection. It's a bit of a technical dance, but essentially, you'll listen for that magic word and then call
await session.close()
. The specifics might take a bit of custom coding, but you've got this!