<@U084APK40NS> i am tryıng to tyr egress servıce ...
# ask-ai
m
@millions-hair-24956 i am tryıng to tyr egress servıce for outbound calls . how can i get sip participant status , and whehn sip participant active i start egress servıce start record ? give me code snippet
g
It seems likely that you can monitor the SIP participant's status using the
sip.callStatus
attribute. When the status is
active
, it indicates that the call is connected, and you can start the egress service to record the call. Here's a basic code snippet to help you get started:
Copy code
import asyncio
from livekit import api

async def start_egress_when_active(participant):
    if participant.kind == api.ParticipantKind.SIP:
        while True:
            if participant.attributes.get('sip.callStatus') == 'active':
                await start_audio_egress(participant.room_name, participant.identity)
                break
            await asyncio.sleep(1)  # Check every second

async def start_audio_egress(room_name, participant_identity):
    req = api.ParticipantEgressRequest(
        room_name=room_name,
        identity=participant_identity,
        file_outputs=[
            api.EncodedFileOutput(
                filepath=f"{room_name}/{participant_identity}/audio_recording.mp3",
                s3=api.S3Upload(
                    bucket="my_bucket_name",
                    region="my_region_name",
                    access_key="my_access_key",
                    secret="my_secret",
                    force_path_style=True,
                ),
            )
        ]
    )

    try:
        egress_info = await lkapi.egress.start_participant_egress(req)
        print(f"Egress started: {egress_info.egress_id}")
        return egress_info
    except Exception as e:
        print(f"Failed to start egress: {e}")
        raise
This code snippet checks the
sip.callStatus
attribute of a SIP participant and starts the egress service when the status is
active
. Would you like more details on this process? Sources: - SIP participant attributes 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
is it prod -ready becasue chengin it every 1 seconds too obvious .
is it prod -ready becasue chengin it every 1 seconds too obvious . @millions-hair-24956
g
Polling every second might not be the most efficient approach for a production environment. Instead, you can listen for the
participant_attributes_changed
event to detect when the
sip.callStatus
changes to
active
. This event-driven approach is more efficient and reduces unnecessary polling. Here's an updated code snippet using event listeners:
Copy code
import asyncio
from livekit import api

async def monitor_participant_status(ctx):
    call_response_event = asyncio.Event()

    @ctx.room.on("participant_attributes_changed")
    def on_attributes_changed(changed_attributes, participant):
        if changed_attributes.get("sip.callStatus") == "active":
            call_response_event.set()

    await ctx.connect(auto_subscribe=api.AutoSubscribe.AUDIO_ONLY)
    participant = await ctx.wait_for_participant()
    await asyncio.wait_for(call_response_event.wait(), timeout=60)  # Wait for up to 60 seconds

    await start_audio_egress(ctx.room.name, participant.identity)

async def start_audio_egress(room_name, participant_identity):
    req = api.ParticipantEgressRequest(
        room_name=room_name,
        identity=participant_identity,
        file_outputs=[
            api.EncodedFileOutput(
                filepath=f"{room_name}/{participant_identity}/audio_recording.mp3",
                s3=api.S3Upload(
                    bucket="my_bucket_name",
                    region="my_region_name",
                    access_key="my_access_key",
                    secret="my_secret",
                    force_path_style=True,
                ),
            )
        ]
    )

    try:
        egress_info = await lkapi.egress.start_participant_egress(req)
        print(f"Egress started: {egress_info.egress_id}")
        return egress_info
    except Exception as e:
        print(f"Failed to start egress: {e}")
        raise
This approach uses the
participant_attributes_changed
event to trigger the egress service when the call status becomes
active
, making it more suitable for production. Would you like more details on implementing this? Sources: - https://livekit-users.slack.com/archives/C07FRP66XGE/p1731592376.663589 - https://docs.livekit.io/home/client/data/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
lets do this way when sip call status actıve start record , when si,p call status hangup stop record . gıve me code snıppet @millions-hair-24956
g
To handle starting and stopping the recording based on the SIP call status, you can listen for the
participant_attributes_changed
event to detect changes in the
sip.callStatus
attribute. Here's a code snippet that demonstrates this approach:
Copy code
import asyncio
from livekit import api

async def monitor_participant_status(ctx):
    egress_info = None

    @ctx.room.on("participant_attributes_changed")
    def on_attributes_changed(changed_attributes, participant):
        nonlocal egress_info
        call_status = changed_attributes.get("sip.callStatus")

        if call_status == "active" and egress_info is None:
            egress_info = asyncio.create_task(start_audio_egress(ctx.room.name, participant.identity))
        elif call_status == "hangup" and egress_info is not None:
            asyncio.create_task(stop_audio_egress(egress_info))

    await ctx.connect(auto_subscribe=api.AutoSubscribe.AUDIO_ONLY)
    await ctx.wait_for_participant()

async def start_audio_egress(room_name, participant_identity):
    req = api.ParticipantEgressRequest(
        room_name=room_name,
        identity=participant_identity,
        file_outputs=[
            api.EncodedFileOutput(
                filepath=f"{room_name}/{participant_identity}/audio_recording.mp3",
                s3=api.S3Upload(
                    bucket="my_bucket_name",
                    region="my_region_name",
                    access_key="my_access_key",
                    secret="my_secret",
                    force_path_style=True,
                ),
            )
        ]
    )

    try:
        egress_info = await lkapi.egress.start_participant_egress(req)
        print(f"Egress started: {egress_info.egress_id}")
        return egress_info
    except Exception as e:
        print(f"Failed to start egress: {e}")
        raise

async def stop_audio_egress(egress_info):
    try:
        await lkapi.egress.stop_egress(egress_info.egress_id)
        print(f"Egress stopped: {egress_info.egress_id}")
    except Exception as e:
        print(f"Failed to stop egress: {e}")
        raise
This code listens for changes in the
sip.callStatus
attribute and starts recording when the status is
active
and stops when it is
hangup
. Would you like more details on this implementation? Sources: - https://livekit-users.slack.com/archives/C07FRP66XGE/p1726232886.061249 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
hey ım usıng pyhton and your code ıs wrong . 0-3-egress | 2025-02-28 175800,673 [INFO] livekit_api:signal clientsignal stream96livekit apisignal client:signal_stream - connecting to ws://livekit:7880/rtc?sdk=python&amp;protocol=15&amp;auto_subscribe=0&amp;adaptive_stream=0&amp;version=0.20.3&amp;access_token=... 20-3-egress | 2025-02-28 175800,673 [INFO] livekit_api:signal clientsignal stream96livekit apisignal client:signal_stream - connecting to ws://livekit:7880/rtc?sdk=python&amp;protocol=15&amp;auto_subscribe=0&amp;adaptive_stream=0&amp;version=0.20.3&amp;access_token=... 20-3-egress | {"message": "livekit_api:signal clientsignal stream96livekit apisignal client:signal_stream - connecting to ws://livekit:7880/rtc?sdk=python&amp;protocol=15&amp;auto_subscribe=0&amp;adaptive_stream=0&amp;version=0.20.3&amp;access_token=...", "level": "INFO", "name": "livekit", "pid": 23, "job_id": "AJ_Y2W2XF3dDmZ9", "timestamp": "2025-02-28T175800.673643+00:00"} 20-3-egress | 2025-02-28 175800,677 [DEBUG] tungstenite:🤝client95tungstenite🤝:client - Client handshake done. 20-3-egress | 2025-02-28 175800,833 [INFO] Successfully connected to room 20-3-egress | 2025-02-28 175800,833 [INFO] Successfully connected to room 20-3-egress | {"message": "Successfully connected to room", "level": "INFO", "name": "agent-20-outbound", "pid": 23, "job_id": "AJ_Y2W2XF3dDmZ9", "timestamp": "2025-02-28T175800.833289+00:00"} 20-3-egress | 2025-02-28 175801,056 [INFO] SIP participant created: phone_user 20-3-egress | 2025-02-28 175801,056 [INFO] Participant found: phone_user 20-3-egress | 2025-02-28 175801,056 [INFO] SIP participant created: phone_user 20-3-egress | {"message": "SIP participant created: phone_user", "level": "INFO", "name": "agent-20-outbound", "pid": 23, "job_id": "AJ_Y2W2XF3dDmZ9", "timestamp": "2025-02-28T175801.056544+00:00"} 20-3-egress | 2025-02-28 175801,056 [INFO] Participant found: phone_user 20-3-egress | {"message": "Participant found: phone_user", "level": "INFO", "name": "agent-20-outbound", "pid": 23, "job_id": "AJ_Y2W2XF3dDmZ9", "timestamp": "2025-02-28T175801.056833+00:00"} 20-3-egress | 2025-02-28 175801,056 [ERROR] unhandled exception while running the job task 20-3-egress | Traceback (most recent call last): 20-3-egress | File "/app/agent.py", line 113, in entrypoint 20-3-egress | @ctx.room.on("participant_attributes_changed") 20-3-egress | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py", line 172, in decorator 20-3-egress | self.on(event, callback) 20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/room.py", line 313, in on 20-3-egress | return super().on(event, callback) 20-3-egress | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ 20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py", line 161, in on 20-3-egress | raise ValueError( 20-3-egress | ValueError: Cannot register an async callback with
.on()
. Use
asyncio.create_task
within your synchronous callback instead. 20-3-egress | 2025-02-28 175801,056 [ERROR] unhandled exception while running the job task 20-3-egress | Traceback (most recent call last): 20-3-egress | File "/app/agent.py", line 113, in entrypoint 20-3-egress | @ctx.room.on("participant_attributes_changed") 20-3-egress | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py", line 172, in decorator 20-3-egress | self.on(event, callback) 20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/room.py", line 313, in on 20-3-egress | return super().on(event, callback) 20-3-egress | ^^^^^^^^^^^^^^^^^^^^^^^^^^^ 20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py", line 161, in on 20-3-egress | raise ValueError( 20-3-egress | ValueError: Cannot register an async callback with
.on()
. Use
asyncio.create_task
within your synchronous callback instead. 20-3-egress | {"message": "unhandled exception while running the job task\nTraceback (most recent call last):\n File \"/app/agent.py\", line 113, in entrypoint\n @ctx.room.on(\"participant_attributes_changed\")\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py\", line 172, in decorator\n self.on(event, callback)\n File \"/usr/local/lib/python3.11/site-packages/livekit/rtc/room.py\", line 313, in on\n return super().on(event, callback)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py\", line 161, in on\n raise ValueError(\nValueError: Cannot register an async callback with
.on()
. Use
asyncio.create_task
within your synchronous callback instead.", "level": "ERROR", "name": "livekit.agents", "pid": 23, "job_id": "AJ_Y2W2XF3dDmZ9", "timestamp": "2025-02-28T175801.056996+00:00"} 20-3-egress | 2025-02-28 175802,410 [INFO] initializing job process 20-3-egress | 2025-02-28 175802,410 [INFO] initializing job process 20-3-egress | {"message": "initializing job process", "level": "INFO", "name": "livekit.agents", "pid": 135, "timestamp": "2025-02-28T175802.410337+00:00"} 20-3-egress | 2025-02-28 175802,513 [INFO] job process initialized 20-3-egress | 2025-02-28 175802,513 [DEBUG] Using selector: EpollSelector 20-3-egress | 2025-02-28 175802,513 [INFO] job process initialized 20-3-egress | {"message": "job process initialized", "level": "INFO", "name": "livekit.agents", "pid": 135, "timestamp": "2025-02-28T175802.513013+00:00"} 20-3-egress | 2025-02-28 175822,085 [DEBUG] tungstenite:protocol666tungstenite:protocol - Received close frame: Some(CloseFrame { code: Normal, reason: "" }) 20-3-egress | 2025-02-28 175822,086 [DEBUG] shutting down job task 20-3-egress | 2025-02-28 175822,086 [DEBUG] tungstenite:protocol683tungstenite:protocol - Replying to close with Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Control(Close), mask: None }, payload: [3, 232] } 20-3-egress | 2025-02-28 175822,086 [DEBUG] http_session(): closing the httpclient ctx 20-3-egress | 2025-02-28 175822,086 [INFO] process exiting 20-3-egress | 2025-02-28 175822,087 [DEBUG] http_session(): creating a new httpclient ctx 20-3-egress | {"message": "process exiting", "level": "INFO", "name": "livekit.agents", "reason": "room disconnected", "pid": 23, "job_id": "AJ_Y2W2XF3dDmZ9", "timestamp": "2025-02-28T175822.086925+00:00"} 20-3-egress | 2025-02-28 175822,088 [WARNING] livekit:rtc engine446livekit:rtc_engine - received session close: "signal client closed: \"stream closed\"" UnknownReason Resume @millions-hair-24956
g
It seems likely that the error you're encountering is due to attempting to register an asynchronous callback using the
.on()
method, which only supports synchronous callbacks. To resolve this, you should use
asyncio.create_task
within your synchronous callback to handle any asynchronous operations. Here's how you can modify your code:
Copy code
import asyncio
from livekit import api

async def monitor_participant_status(ctx):
    egress_info = None

    def on_attributes_changed(changed_attributes, participant):
        nonlocal egress_info
        call_status = changed_attributes.get("sip.callStatus")

        if call_status == "active" and egress_info is None:
            egress_info = asyncio.create_task(start_audio_egress(ctx.room.name, participant.identity))
        elif call_status == "hangup" and egress_info is not None:
            asyncio.create_task(stop_audio_egress(egress_info))

    ctx.room.on("participant_attributes_changed", on_attributes_changed)

    await ctx.connect(auto_subscribe=api.AutoSubscribe.AUDIO_ONLY)
    await ctx.wait_for_participant()

async def start_audio_egress(room_name, participant_identity):
    req = api.ParticipantEgressRequest(
        room_name=room_name,
        identity=participant_identity,
        file_outputs=[
            api.EncodedFileOutput(
                filepath=f"{room_name}/{participant_identity}/audio_recording.mp3",
                s3=api.S3Upload(
                    bucket="my_bucket_name",
                    region="my_region_name",
                    access_key="my_access_key",
                    secret="my_secret",
                    force_path_style=True,
                ),
            )
        ]
    )

    try:
        egress_info = await lkapi.egress.start_participant_egress(req)
        print(f"Egress started: {egress_info.egress_id}")
        return egress_info
    except Exception as e:
        print(f"Failed to start egress: {e}")
        raise

async def stop_audio_egress(egress_info):
    try:
        await lkapi.egress.stop_egress(egress_info.egress_id)
        print(f"Egress stopped: {egress_info.egress_id}")
    except Exception as e:
        print(f"Failed to stop egress: {e}")
        raise
This code uses a synchronous callback for the
participant_attributes_changed
event and creates tasks for asynchronous operations. Would you like more details on this implementation? Sources: - https://livekit-users.slack.com/archives/C07FRP66XGE/p1730288245.597009 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
m
how to catch sip call status ? give me code snıppet @millions-hair-24956
g
If you're looking for further help, please follow-up on one of the other channels available for community support!
m
is sip call status changıng dynamıcally or we need to check ıt ? @millions-hair-24956
g
If you're looking for further help, please follow-up on one of the other channels available for community support!