How to record the telephony calls made using SIP T...
# ask-ai
g
How to record the telephony calls made using SIP Trunk
t
Thanks for reaching out, Pavan. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
Hey there! If you want to record telephony calls made using a SIP Trunk in LiveKit, here’s a simple way to do it: 1. Set Up the Dispatch Rule with Auto-Egress: You’ll want to tweak your dispatch rule to turn on automatic recording (Auto-Egress) for your SIP calls. 2. Here’s an Example Configuration: Check out this code snippet to create or update a dispatch rule that kicks off recording automatically:
Copy code
dispatch_rule_request = CreateSIPDispatchRuleRequest(
    trunk_ids=[inbound_trunk.sip_trunk_id],
    rule=SIPDispatchRule(
        dispatch_rule_individual=SIPDispatchRuleIndividual(
            room_prefix="call",
            room_config=RoomConfig(
                egress=EgressConfig(
                    auto=True,  # Enable automatic recording
                )
            )
        )
    )
)
dispatch_rule = await lk_api.sip.create_sip_dispatch_rule(dispatch_rule_request)
1. A Bit More About How It Works: ◦ `EgressConfig`: Setting
auto=True
means as soon as a call connects, recording starts without you having to do a thing. ◦ `SIPDispatchRuleIndividual`: This part decides how calls are handled, and the
room_prefix
helps organize where your recordings get saved or how they’re named. 1. What You’ll Get: Every incoming SIP call that goes through this dispatch rule will be recorded automatically. You can then access these recordings anytime through LiveKit’s APIs or management tools to review or analyze them. If anything’s unclear or you want a hand setting this up, just let me know—I’m here to help!
g
give me the python code to do that
t
Here’s some Python code to help you set up automatic recording for your SIP Trunk calls:
Copy code
from livekit_api import LiveKitAPI, CreateSIPDispatchRuleRequest, SIPDispatchRule, SIPDispatchRuleIndividual, RoomConfig, EgressConfig

# Initialize the LiveKit API client
lk_api = LiveKitAPI(api_key="your_api_key", api_secret="your_api_secret")

# Define the SIP trunk ID
inbound_trunk_id = "your_inbound_trunk_id"

# Create the dispatch rule request
dispatch_rule_request = CreateSIPDispatchRuleRequest(
    trunk_ids=[inbound_trunk_id],
    rule=SIPDispatchRule(
        dispatch_rule_individual=SIPDispatchRuleIndividual(
            room_prefix="call",
            room_config=RoomConfig(
                egress=EgressConfig(
                    auto=True  # Enable automatic recording
                )
            )
        )
    )
)

# Create the SIP dispatch rule
dispatch_rule = lk_api.sip.create_sip_dispatch_rule(dispatch_rule_request)

print("Dispatch rule created:", dispatch_rule)
Just swap out
your_api_key
,
your_api_secret
, and
your_inbound_trunk_id
with your actual LiveKit API credentials and SIP trunk ID. If you hit any bumps or want to chat more about this, I’m here to help!
g
I am using this code integrate that in this code only import asyncio import os import logging from pathlib import Path from dotenv import load_dotenv from livekit import api # Load environment variables load_dotenv() # Set up logging logger = logging.getLogger("make-call") logger.setLevel(logging.INFO) # Configuration room_name = "my-room" agent_name = "test-agent" outbound_trunk_id = os.getenv("SIP_OUTBOUND_TRUNK_ID") async def make_call(phone_number): """Create a dispatch and add a SIP participant to call the phone number""" lkapi = api.LiveKitAPI() # Create agent dispatch logger.info(f"Creating dispatch for agent {agent_name} in room {room_name}") dispatch = await lkapi.agent_dispatch.create_dispatch( api.CreateAgentDispatchRequest( agent_name=agent_name, room=room_name, metadata=phone_number ) ) logger.info(f"Created dispatch: {dispatch}") # Create SIP participant to make the call if not outbound_trunk_id or not outbound_trunk_id.startswith("ST_"): logger.error("SIP_OUTBOUND_TRUNK_ID is not set or invalid") return logger.info(f"Dialing {phone_number} to room {room_name}") try: # Create SIP participant to initiate the call sip_participant = await lkapi.sip.create_sip_participant( api.CreateSIPParticipantRequest( room_name=room_name, sip_trunk_id=outbound_trunk_id, sip_call_to=phone_number, participant_identity="phone_user", ) ) logger.info(f"Created SIP participant: {sip_participant}") except Exception as e: logger.error(f"Error creating SIP participant: {e}") # Close API connection await lkapi.aclose() async def main(): # Replace with the actual phone number including country code phone_number = "+919XXXXXXX" await make_call(phone_number) if name == "__main__": asyncio.run(main())
t
Here’s a friendly way to add automatic recording into your existing code:
Copy code
import asyncio
import os
import logging
from pathlib import Path
from dotenv import load_dotenv
from livekit import api

# Load environment variables
load_dotenv()

# Set up logging
logger = logging.getLogger("make-call")
logger.setLevel(<http://logging.INFO|logging.INFO>)

# Configuration
room_name = "my-room"
agent_name = "test-agent"
outbound_trunk_id = os.getenv("SIP_OUTBOUND_TRUNK_ID")
inbound_trunk_id = os.getenv("SIP_INBOUND_TRUNK_ID")  # Add this for inbound trunk

async def configure_dispatch_rule():
    """Set up the dispatch rule to enable automatic recording"""
    lkapi = api.LiveKitAPI()

    try:
        # Create the dispatch rule request
        dispatch_rule_request = api.CreateSIPDispatchRuleRequest(
            trunk_ids=[inbound_trunk_id],
            rule=api.SIPDispatchRule(
                dispatch_rule_individual=api.SIPDispatchRuleIndividual(
                    room_prefix="call",
                    room_config=api.RoomConfig(
                        egress=api.EgressConfig(
                            auto=True  # Turn on automatic recording here
                        )
                    )
                )
            )
        )

        # Create the SIP dispatch rule
        dispatch_rule = await lkapi.sip.create_sip_dispatch_rule(dispatch_rule_request)
        <http://logger.info|logger.info>(f"Dispatch rule created: {dispatch_rule}")
    except Exception as e:
        logger.error(f"Oops, there was an error creating the dispatch rule: {e}")
    finally:
        await lkapi.aclose()

async def make_call(phone_number):
    """Create a dispatch and add a SIP participant to call the phone number"""
    lkapi = api.LiveKitAPI()

    # Create agent dispatch
    <http://logger.info|logger.info>(f"Creating dispatch for agent {agent_name} in room {room_name}")
    dispatch = await lkapi.agent_dispatch.create_dispatch(
        api.CreateAgentDispatchRequest(
            agent_name=agent_name, room=room_name, metadata=phone_number
        )
    )
    <http://logger.info|logger.info>(f"Created dispatch: {dispatch}")

    # Create SIP participant to make the call
    if not outbound_trunk_id or not outbound_trunk_id.startswith("ST_"):
        logger.error("SIP_OUTBOUND_TRUNK_ID is not set or looks invalid")
        return

    <http://logger.info|logger.info>(f"Dialing {phone_number} to room {room_name}")

    try:
        # Create SIP participant to initiate the call
        sip_participant = await lkapi.sip.create_sip_participant(
            api.CreateSIPParticipantRequest(
                room_name=room_name,
                sip_trunk_id=outbound_trunk_id,
                sip_call_to=phone_number,
                participant_identity="phone_user",
            )
        )
        <http://logger.info|logger.info>(f"Created SIP participant: {sip_participant}")
    except Exception as e:
        logger.error(f"Something went wrong creating the SIP participant: {e}")
    finally:
        # Close API connection
        await lkapi.aclose()

async def main():
    # First, set up the dispatch rule for automatic recording
    await configure_dispatch_rule()

    # Don’t forget to replace this with the actual phone number, including country code
    phone_number = "+91XXXXXXX"
    await make_call(phone_number)

if __name__ == "__main__":
    asyncio.run(main())
This update adds a new function,
configure_dispatch_rule
, that sets up automatic recording for your SIP calls. It runs right before the call in the
main
function. If you have any questions or want me to help tweak this further, just let me know!
g
do this for outbound calls
t
I don't have the answer you're looking for. You could also try asking your question: • in one of the other Slack channels or • to https://deepwiki.com/livekit/livekit_composite which is trained on all LiveKit source code If you find the answer, please post it here to help others!
g
getting this error: Oops, there was an error creating the dispatch rule: module 'livekit.api' has no attribute 'RoomConfig'