powerful-rose-63872
02/28/2025, 12:51 AMgray-monkey-83525
02/28/2025, 12:51 AMpowerful-rose-63872
02/28/2025, 12:53 AMgray-monkey-83525
02/28/2025, 12:53 AMfrom livekit import api
# Initialize LiveKit API
lkapi = api.LiveKitAPI(LIVEKIT_API_URL, api_key=LIVEKIT_API_KEY, api_secret=LIVEKIT_API_SECRET)
async def start_audio_egress(room_name: str, participant_identity: str, filepath: str):
req = api.ParticipantEgressRequest(
room_name=room_name,
identity=participant_identity,
file_outputs=[
api.EncodedFileOutput(
filepath=filepath,
s3=api.S3Upload(
bucket="your_bucket",
region="your_region",
access_key="your_access_key",
secret="your_secret",
force_path_style=True,
),
)
]
)
try:
egress_info = await lkapi.egress.start_participant_egress(req)
print(f"Egress started: {egress_info.egress_id}")
return egress_info
except Exception as e:
print(f"Failed to start egress: {e}")
raise
async def monitor_egress(egress_id: str):
try:
while True:
egress_list = await lkapi.egress.list_egress(api.ListEgressRequest(egress_id=egress_id))
if egress_list.items:
egress = egress_list.items[0]
status_name = api.EgressStatus.Name(egress.status)
print(f"Egress status: {status_name}")
if egress.status in [api.EgressStatus.EGRESS_COMPLETE, api.EgressStatus.EGRESS_FAILED]:
if egress.status == api.EgressStatus.EGRESS_COMPLETE:
print("Egress completed successfully.")
else:
print("Egress failed.")
break
else:
print("Egress not found.")
break
await asyncio.sleep(5) # Poll every 5 seconds
except Exception as e:
print(f"Error while monitoring egress: {e}")
raise
# Example usage
async def main():
agent_egress_info = await start_audio_egress("room_name", "agent_identity", "agent_audio.mp4")
participant_egress_info = await start_audio_egress("room_name", "participant_identity", "participant_audio.mp4")
await monitor_egress(agent_egress_info.egress_id)
await monitor_egress(participant_egress_info.egress_id)
# Run the main function
import asyncio
asyncio.run(main())
gray-monkey-83525
02/28/2025, 12:54 AMyour_bucket
, your_region
, etc., with your actual configuration details.
Sources:
- Recording Participants 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.powerful-rose-63872
02/28/2025, 12:55 AMgray-monkey-83525
02/28/2025, 12:55 AMpowerful-rose-63872
02/28/2025, 12:56 AMgray-monkey-83525
02/28/2025, 12:56 AMJobRequest
object when setting up the agent. The identity is typically set when the agent accepts a job request. Here's how you can dynamically set and retrieve the agent's identity:
1. Set the Agent's Identity: When you accept a job request, you can specify the agent's identity using the accept
method of the JobRequest
object.
2. Access the Identity: Once the agent is connected to a room, you can access its identity through the participant
object.
Here's an example in Python:
from livekit import rtc
from livekit.agents import JobContext, JobRequest
async def request_fnc(req: JobRequest):
# Set the agent's identity dynamically
agent_identity = f"agent_{req.room.name}"
await req.accept(
name="agent_name",
identity=agent_identity,
)
async def entrypoint(ctx: JobContext):
# Connect to the room
await ctx.connect(auto_subscribe=rtc.AutoSubscribe.AUDIO_ONLY)
# Wait for the first participant to connect
participant = await ctx.wait_for_participant()
print(f"Connected to room {ctx.room.name} with participant {participant.identity}")
# Initialize the worker with entrypoint and request_fnc
opts = rtc.WorkerOptions(
entrypoint_fnc=entrypoint,
request_fnc=request_fnc
)
# Start the worker
rtc.cli.run_app(opts)
In this example, the agent's identity is set based on the room name, and you can access it through the participant.identity
attribute once the agent is connected to the room.
Sources:
- Setting Agent Identity 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.