melodic-vegetable-78787
02/28/2025, 5:43 PMgray-monkey-83525
02/28/2025, 5:44 PMsip.callStatus
attribute. When the status is active
, it indicates that the call is connected, and you can start the egress service to record the call. Here's a basic code snippet to help you get started:
import asyncio
from livekit import api
async def start_egress_when_active(participant):
if participant.kind == api.ParticipantKind.SIP:
while True:
if participant.attributes.get('sip.callStatus') == 'active':
await start_audio_egress(participant.room_name, participant.identity)
break
await asyncio.sleep(1) # Check every second
async def start_audio_egress(room_name, participant_identity):
req = api.ParticipantEgressRequest(
room_name=room_name,
identity=participant_identity,
file_outputs=[
api.EncodedFileOutput(
filepath=f"{room_name}/{participant_identity}/audio_recording.mp3",
s3=api.S3Upload(
bucket="my_bucket_name",
region="my_region_name",
access_key="my_access_key",
secret="my_secret",
force_path_style=True,
),
)
]
)
try:
egress_info = await lkapi.egress.start_participant_egress(req)
print(f"Egress started: {egress_info.egress_id}")
return egress_info
except Exception as e:
print(f"Failed to start egress: {e}")
raise
This code snippet checks the sip.callStatus
attribute of a SIP participant and starts the egress service when the status is active
. Would you like more details on this process?
Sources:
- SIP participant attributes 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.melodic-vegetable-78787
02/28/2025, 5:47 PMmelodic-vegetable-78787
02/28/2025, 5:47 PMgray-monkey-83525
02/28/2025, 5:47 PMparticipant_attributes_changed
event to detect when the sip.callStatus
changes to active
. This event-driven approach is more efficient and reduces unnecessary polling.
Here's an updated code snippet using event listeners:
import asyncio
from livekit import api
async def monitor_participant_status(ctx):
call_response_event = asyncio.Event()
@ctx.room.on("participant_attributes_changed")
def on_attributes_changed(changed_attributes, participant):
if changed_attributes.get("sip.callStatus") == "active":
call_response_event.set()
await ctx.connect(auto_subscribe=api.AutoSubscribe.AUDIO_ONLY)
participant = await ctx.wait_for_participant()
await asyncio.wait_for(call_response_event.wait(), timeout=60) # Wait for up to 60 seconds
await start_audio_egress(ctx.room.name, participant.identity)
async def start_audio_egress(room_name, participant_identity):
req = api.ParticipantEgressRequest(
room_name=room_name,
identity=participant_identity,
file_outputs=[
api.EncodedFileOutput(
filepath=f"{room_name}/{participant_identity}/audio_recording.mp3",
s3=api.S3Upload(
bucket="my_bucket_name",
region="my_region_name",
access_key="my_access_key",
secret="my_secret",
force_path_style=True,
),
)
]
)
try:
egress_info = await lkapi.egress.start_participant_egress(req)
print(f"Egress started: {egress_info.egress_id}")
return egress_info
except Exception as e:
print(f"Failed to start egress: {e}")
raise
This approach uses the participant_attributes_changed
event to trigger the egress service when the call status becomes active
, making it more suitable for production. Would you like more details on implementing this?
Sources:
- https://livekit-users.slack.com/archives/C07FRP66XGE/p1731592376.663589
- https://docs.livekit.io/home/client/data/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.melodic-vegetable-78787
02/28/2025, 5:49 PMgray-monkey-83525
02/28/2025, 5:49 PMparticipant_attributes_changed
event to detect changes in the sip.callStatus
attribute. Here's a code snippet that demonstrates this approach:
import asyncio
from livekit import api
async def monitor_participant_status(ctx):
egress_info = None
@ctx.room.on("participant_attributes_changed")
def on_attributes_changed(changed_attributes, participant):
nonlocal egress_info
call_status = changed_attributes.get("sip.callStatus")
if call_status == "active" and egress_info is None:
egress_info = asyncio.create_task(start_audio_egress(ctx.room.name, participant.identity))
elif call_status == "hangup" and egress_info is not None:
asyncio.create_task(stop_audio_egress(egress_info))
await ctx.connect(auto_subscribe=api.AutoSubscribe.AUDIO_ONLY)
await ctx.wait_for_participant()
async def start_audio_egress(room_name, participant_identity):
req = api.ParticipantEgressRequest(
room_name=room_name,
identity=participant_identity,
file_outputs=[
api.EncodedFileOutput(
filepath=f"{room_name}/{participant_identity}/audio_recording.mp3",
s3=api.S3Upload(
bucket="my_bucket_name",
region="my_region_name",
access_key="my_access_key",
secret="my_secret",
force_path_style=True,
),
)
]
)
try:
egress_info = await lkapi.egress.start_participant_egress(req)
print(f"Egress started: {egress_info.egress_id}")
return egress_info
except Exception as e:
print(f"Failed to start egress: {e}")
raise
async def stop_audio_egress(egress_info):
try:
await lkapi.egress.stop_egress(egress_info.egress_id)
print(f"Egress stopped: {egress_info.egress_id}")
except Exception as e:
print(f"Failed to stop egress: {e}")
raise
This code listens for changes in the sip.callStatus
attribute and starts recording when the status is active
and stops when it is hangup
. Would you like more details on this implementation?
Sources:
- https://livekit-users.slack.com/archives/C07FRP66XGE/p1726232886.061249 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.melodic-vegetable-78787
02/28/2025, 5:59 PM.on()
. Use asyncio.create_task
within your synchronous callback instead.
20-3-egress | 2025-02-28 175801,056 [ERROR] unhandled exception while running the job task
20-3-egress | Traceback (most recent call last):
20-3-egress | File "/app/agent.py", line 113, in entrypoint
20-3-egress | @ctx.room.on("participant_attributes_changed")
20-3-egress | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py", line 172, in decorator
20-3-egress | self.on(event, callback)
20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/room.py", line 313, in on
20-3-egress | return super().on(event, callback)
20-3-egress | ^^^^^^^^^^^^^^^^^^^^^^^^^^^
20-3-egress | File "/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py", line 161, in on
20-3-egress | raise ValueError(
20-3-egress | ValueError: Cannot register an async callback with .on()
. Use asyncio.create_task
within your synchronous callback instead.
20-3-egress | {"message": "unhandled exception while running the job task\nTraceback (most recent call last):\n File \"/app/agent.py\", line 113, in entrypoint\n @ctx.room.on(\"participant_attributes_changed\")\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py\", line 172, in decorator\n self.on(event, callback)\n File \"/usr/local/lib/python3.11/site-packages/livekit/rtc/room.py\", line 313, in on\n return super().on(event, callback)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/usr/local/lib/python3.11/site-packages/livekit/rtc/event_emitter.py\", line 161, in on\n raise ValueError(\nValueError: Cannot register an async callback with .on()
. Use asyncio.create_task
within your synchronous callback instead.", "level": "ERROR", "name": "livekit.agents", "pid": 23, "job_id": "AJ_Y2W2XF3dDmZ9", "timestamp": "2025-02-28T175801.056996+00:00"}
20-3-egress | 2025-02-28 175802,410 [INFO] initializing job process
20-3-egress | 2025-02-28 175802,410 [INFO] initializing job process
20-3-egress | {"message": "initializing job process", "level": "INFO", "name": "livekit.agents", "pid": 135, "timestamp": "2025-02-28T175802.410337+00:00"}
20-3-egress | 2025-02-28 175802,513 [INFO] job process initialized
20-3-egress | 2025-02-28 175802,513 [DEBUG] Using selector: EpollSelector
20-3-egress | 2025-02-28 175802,513 [INFO] job process initialized
20-3-egress | {"message": "job process initialized", "level": "INFO", "name": "livekit.agents", "pid": 135, "timestamp": "2025-02-28T175802.513013+00:00"}
20-3-egress | 2025-02-28 175822,085 [DEBUG] tungstenite:protocol666tungstenite:protocol - Received close frame: Some(CloseFrame { code: Normal, reason: "" })
20-3-egress | 2025-02-28 175822,086 [DEBUG] shutting down job task
20-3-egress | 2025-02-28 175822,086 [DEBUG] tungstenite:protocol683tungstenite:protocol - Replying to close with Frame { header: FrameHeader { is_final: true, rsv1: false, rsv2: false, rsv3: false, opcode: Control(Close), mask: None }, payload: [3, 232] }
20-3-egress | 2025-02-28 175822,086 [DEBUG] http_session(): closing the httpclient ctx
20-3-egress | 2025-02-28 175822,086 [INFO] process exiting
20-3-egress | 2025-02-28 175822,087 [DEBUG] http_session(): creating a new httpclient ctx
20-3-egress | {"message": "process exiting", "level": "INFO", "name": "livekit.agents", "reason": "room disconnected", "pid": 23, "job_id": "AJ_Y2W2XF3dDmZ9", "timestamp": "2025-02-28T175822.086925+00:00"}
20-3-egress | 2025-02-28 175822,088 [WARNING] livekit:rtc engine446livekit:rtc_engine - received session close: "signal client closed: \"stream closed\"" UnknownReason Resume @millions-hair-24956gray-monkey-83525
02/28/2025, 5:59 PM.on()
method, which only supports synchronous callbacks. To resolve this, you should use asyncio.create_task
within your synchronous callback to handle any asynchronous operations. Here's how you can modify your code:
import asyncio
from livekit import api
async def monitor_participant_status(ctx):
egress_info = None
def on_attributes_changed(changed_attributes, participant):
nonlocal egress_info
call_status = changed_attributes.get("sip.callStatus")
if call_status == "active" and egress_info is None:
egress_info = asyncio.create_task(start_audio_egress(ctx.room.name, participant.identity))
elif call_status == "hangup" and egress_info is not None:
asyncio.create_task(stop_audio_egress(egress_info))
ctx.room.on("participant_attributes_changed", on_attributes_changed)
await ctx.connect(auto_subscribe=api.AutoSubscribe.AUDIO_ONLY)
await ctx.wait_for_participant()
async def start_audio_egress(room_name, participant_identity):
req = api.ParticipantEgressRequest(
room_name=room_name,
identity=participant_identity,
file_outputs=[
api.EncodedFileOutput(
filepath=f"{room_name}/{participant_identity}/audio_recording.mp3",
s3=api.S3Upload(
bucket="my_bucket_name",
region="my_region_name",
access_key="my_access_key",
secret="my_secret",
force_path_style=True,
),
)
]
)
try:
egress_info = await lkapi.egress.start_participant_egress(req)
print(f"Egress started: {egress_info.egress_id}")
return egress_info
except Exception as e:
print(f"Failed to start egress: {e}")
raise
async def stop_audio_egress(egress_info):
try:
await lkapi.egress.stop_egress(egress_info.egress_id)
print(f"Egress stopped: {egress_info.egress_id}")
except Exception as e:
print(f"Failed to stop egress: {e}")
raise
This code uses a synchronous callback for the participant_attributes_changed
event and creates tasks for asynchronous operations. Would you like more details on this implementation?
Sources:
- https://livekit-users.slack.com/archives/C07FRP66XGE/p1730288245.597009 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.melodic-vegetable-78787
03/01/2025, 12:52 PMgray-monkey-83525
03/01/2025, 12:52 PMmelodic-vegetable-78787
03/01/2025, 1:44 PMgray-monkey-83525
03/01/2025, 1:44 PM