import logging
from livekit
import api
import asyncio
import json
import os
from livekit.protocol.sip
import (
TransferSIPParticipantRequest,
ListSIPOutboundTrunkRequest,
CreateSIPOutboundTrunkRequest,
SIPOutboundTrunkInfo,
)
from livekit.protocol.room
import ListParticipantsRequest
from brilo.functions.context
import ToolState
from livekit.agents
import RunContext
from typing
import Dict, Any
from brilo.utils.function_utils
import get_fn_data_from_ctx
logger = logging.getLogger(
name)
async def get_outbound_trunk_id_from_number(_livekit_api_, _phone_number_):
response =
await livekit_api.sip.list_sip_outbound_trunk(
ListSIPOutboundTrunkRequest()
)
for trunk_info
in response.items:
if phone_number in trunk_info.numbers:
logger.info(
f"Found SIP outbound Trunk ID for {phone_number}: {trunk_info.sip_trunk_id}"
)
return trunk_info.sip_trunk_id
logger.info(f"No outbound SIP trunk found for number: {phone_number}")
logger.info(f"Creating new outbound trunk for {phone_number}")
# Create a new SIP outbound Trunk
create_req = CreateSIPOutboundTrunkRequest(
trunk=SIPOutboundTrunkInfo(
name=f"Trunk for {phone_number}",
numbers=[phone_number],
_krisp_enabled_=True,
metadata="{}",
),
)
new_trunk =
await livekit_api.sip.create_sip_outbound_trunk(create_req)
logger.info(
f"Created new outbound trunk {new_trunk.sip_trunk_id} for {phone_number}"
)
return new_trunk.sip_trunk_id
async def call_transfer(_context_: RunContext[ToolState]) -> Dict[str, Any]:
"""
Transfers the call to the configured phone number using LiveKit's SIP transfer API.
Args:
context: The RunContext containing the tool state
Returns:
Dict containing the result of the operation
"""
print("*"*100)
print(context.
dict)
print("*"*100)
# Get function metadata from shared context
function_data = get_fn_data_from_ctx(context)
if not function_data:
return "Function configuration not found."
user_data = context.userdata
participant = user_data.participant
ctx = user_data.call_ctx
print("*"*100)
print(user_data)
print(ctx)
print("*"*100)
call_sid = participant.attributes.get("sip.twilio.callSid")
config = function_data.get("config", {})
config = json.loads(config)
transfer_to_phone = config.get("phone_number")
transfer_type = config.get("transfer_type")
transfer_message = config.get("transfer_prompt")
warm_transfer_message = config.get("warm_transfer_prompt")
# Get participant identity and room name
participant_identity = participant.identity
room_name = ctx.room.name
if not transfer_to_phone:
logger.error("No transfer_to_phone configured for call_transfer.")
return "Transfer number not configured."
_try_:
session = context.session
# Format the phone number (ensure it has + prefix and tel: protocol)
if not transfer_to_phone.startswith("+"):
transfer_to_phone = f"+{transfer_to_phone.lstrip('+')}"
# Add tel: protocol if not present
if not transfer_to_phone.startswith("tel:"):
transfer_to_phone = f"tel:{transfer_to_phone}"
# Inform the caller about the transfer
await session.say(transfer_message, _add_to_chat_ctx_=True)
# Initialize LiveKit API
livekit_api = user_data.lkapi
if transfer_type == "cold":
# Create transfer request
transfer_request = TransferSIPParticipantRequest(
_participant_identity_=participant_identity,
_room_name_=room_name,
_transfer_to_=transfer_to_phone,
_play_dialtone_=True,
)
logger.debug(f"Cold Transfer request: {transfer_request}")
# Transfer caller
await livekit_api.sip.transfer_sip_participant(transfer_request)
logger.info(
f"Successfully transferred participant {participant_identity} to {transfer_to_phone} (cold transfer)"
)
return transfer_message
elif transfer_type in ["warm_message", "warm_summary"]:
target_phone_number = transfer_to_phone.replace("tel:", "")
# Warm transfer - add agent to current room
sip_trunk_id =
await get_outbound_trunk_id_from_number(
livekit_api, target_phone_number
)
# Create SIP participant request to add new agent to room
supervisor_identity = f"supervisor-{participant_identity}"
create_request = api.CreateSIPParticipantRequest(
_sip_trunk_id_=sip_trunk_id,
_sip_call_to_=target_phone_number,
_room_name_=room_name,
_participant_identity_=supervisor_identity,
_krisp_enabled_=True,
_wait_until_answered_=True,
)
logger.debug(f"Warm transfer request: {create_request}")
# Add new agent to the room
await livekit_api.sip.create_sip_participant(create_request)
logger.info(
f"Successfully added new agent to room {room_name} ({transfer_type})"
)
# Wait for the supervisor to actually connect to the room
supervisor_connected = False
max_wait_time = 30
# Maximum 30 seconds to wait
wait_interval = 1
# Check every second
for _
in range(max_wait_time):
# Check if supervisor is in the room
room_participants =
await livekit_api.room.list_participants(
api.ListParticipantsRequest(
room=room_name)
)
supervisor_in_room = any(
p.identity == supervisor_identity
for p
in room_participants.participants
)
if supervisor_in_room:
supervisor_connected = True
logger.info(f"Supervisor {supervisor_identity} connected to room")
break
await asyncio.sleep(wait_interval)
if not supervisor_connected:
logger.warning(
f"Supervisor did not connect within {max_wait_time} seconds"
)
return "Supervisor could not be reached. Please try again."
# Get supervisor's audio track SID
participant_info =
await livekit_api.room.get_participant(
api.RoomParticipantIdentity(
room=room_name,
identity=supervisor_identity
)
)
audio_track_sid = None
for track
in participant_info.tracks:
if track.type == api.TrackType.AUDIO:
audio_track_sid = track.sid
break
if not audio_track_sid:
logger.warning(
f"No audio track found for supervisor {supervisor_identity}"
)
return "Failed to mute supervisor: No audio track found."
# Mute the supervisor's audio track
await livekit_api.room.mute_published_track(
api.MuteRoomTrackRequest(
room=room_name,
identity=supervisor_identity,
_track_sid_=audio_track_sid,
muted=True,
)
)
logger.info(f"Supervisor {supervisor_identity} audio track muted")
# Wait a brief moment for supervisor to settle
await asyncio.sleep(2)
# Deliver the warm transfer message
if warm_transfer_message:
await session.say(warm_transfer_message, _add_to_chat_ctx_=True)
await asyncio.sleep(3)
# Wait for message to complete
# Unmute the supervisor's audio track
await livekit_api.room.mute_published_track(
api.MuteRoomTrackRequest(
room=room_name,
identity=supervisor_identity,
_track_sid_=audio_track_sid,
muted=False,
)
)
logger.info(f"Supervisor {supervisor_identity} audio track unmuted")
# Brief pause before agent disconnects
await asyncio.sleep(1)
# Disconnect the current agent
await session.aclose()
logger.info("Current agent disconnected - supervisor taking over")
logger.info(
f"Successfully transferred participant {participant_identity} to {transfer_to_phone} (warm transfer)"
)
return warm_transfer_message
except Exception
as e:
error_msg = f"Failed to transfer call: {e}"
logger.error(error_msg)
return error_msg
How to get the cusrrent session history .