helpful-machine-32005
06/27/2025, 9:13 AMasync def entrypoint(ctx: agents.JobContext):
req = api.TrackCompositeEgressRequest(
room_name=ctx.room.name,
audio_track_id="",
file_outputs=[
api.EncodedFileOutput(
file_type=api.EncodedFileType.MP4,
filepath="recordings/",
s3=api.S3Upload(
bucket="egress-recordings-fn",
region="us-east-1",
access_key=os.getenv("S3_ACCESS_KEY"),
secret=os.getenv("S3_SECRET_KEY"),
),
)
]
)
lkapi = api.LiveKitAPI()
res = await lkapi.egress.start_track_composite_egress(req)
await lkapi.aclose()
tall-airline-16756
06/27/2025, 9:13 AMtall-airline-16756
06/27/2025, 9:13 AMaudio_track_id
for your TrackCompositeEgressRequest
, you need to obtain the ID of the audio track from your LiveKit room. This ID is required to specify which audio track to composite. Typically, you can retrieve the track ID from the room's track information before making the egress request. If you need further assistance on how to retrieve track information, let me know.
Sources: Egress API | LiveKit Docs | TrackCompositeOptions | LiveKit JS Server SDK - v2.9.7 | EgressClient | LiveKit JS Server SDK - v2.9.7helpful-machine-32005
06/27/2025, 9:14 AMtall-airline-16756
06/27/2025, 9:14 AMdef find_micro_track_id(room: rtc.Room, identity: str) -> str:
p: rtc.RemoteParticipant | rtc.LocalParticipant | None = room.remote_participants.get(identity)
if identity == room.local_participant.identity:
p = room.local_participant
if p is None:
raise ValueError(f"participant {identity} not found")
# find first microphone track
track_id = None
for track in p.track_publications.values():
if track.source == rtc.TrackSource.SOURCE_MICROPHONE:
track_id = track.sid
break
if track_id is None:
raise ValueError(f"participant {identity} does not have a microphone track")
return track_id
I've got you covered! This function is pretty straightforward ā just pass in the LiveKit room and the participant's identity, and it'll hunt down their microphone track's session ID. If something goes wrong (like not finding the participant or their mic track), it'll let you know with a helpful error message. Need me to break it down further or help you with anything else? I'm all ears! š Sources: GitHub::livekit::agents | livekit-agents/livekit/agents/voice/room_io/_outp...helpful-machine-32005
06/27/2025, 9:22 AMasync def entrypoint(ctx: agents.JobContext):
global current_session, current_room_io, all_histories
COMPANY_ID = get_active_company_id()
call_metadata = {
"start_time": None,
"end_time": None,
"caller_number": None,
"agent_names": set(),
"call_id": str(uuid.uuid4())
}
req = api.TrackCompositeEgressRequest(
room_name=ctx.room.name,
audio_track_id=ctx.room.local_participant.track_publications,
file_outputs=[
api.EncodedFileOutput(
file_type=api.EncodedFileType.MP4,
filepath="recordings/",
s3=api.S3Upload(
bucket="egress-recordings-fn",
region="us-east-1",
access_key=os.getenv("S3_ACCESS_KEY"),
secret=os.getenv("S3_SECRET_KEY"),
),
)
]
)
lkapi = api.LiveKitAPI()
# res = await lkapi.egress.start_room_composite_egress(req)
res = await lkapi.egress.start_track_composite_egress(req)
await lkapi.aclose()
async def write_transcript():
print("[Entrypoint] Writing transcript to file...")
current_date = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"./transcriptions/transcript_{ctx.room.name}_{current_date}.json"
transcript = all_histories.copy()
if current_session:
transcript.append(current_session.history.to_dict())
with open(filename, 'w', encoding='utf-8') as f:
json.dump(transcript, f, indent=2, ensure_ascii=False)
print(f"Transcript for {ctx.room.name} saved to {filename}")
# Save to DynamoDB
call_metadata["end_time"] = datetime.now(lebanon_tz).isoformat()
start = datetime.fromisoformat(call_metadata["start_time"])
end = datetime.fromisoformat(call_metadata["end_time"])
duration = Decimal(str((end - start).total_seconds()))
item = {
"company_id": COMPANY_ID,
"call_id": call_metadata["call_id"],
"start_time": call_metadata["start_time"],
"end_time": call_metadata["end_time"],
"duration": duration,
"caller_number": call_metadata["caller_number"],
"agent_names": list(call_metadata["agent_names"]),
"full_transcript": transcript,
}
logs_table.put_item(Item=item)
print(f"[Entrypoint] Call log saved for {COMPANY_ID} - {item['call_id']}")
ctx.add_shutdown_callback(write_transcript)
await ctx.connect()
print(f"Agent Name: {ctx.job.agent_name}")
print(f"[Entrypoint] Model: {model_name}")
call_metadata["start_time"] = datetime.now(lebanon_tz).isoformat()
current_session = AgentSession(
llm=openai.realtime.RealtimeModel(
turn_detection=None,
),
allow_interruptions=False,
)
current_room_io = RoomIO(current_session, room=ctx.room)
await current_room_io.start()
agent = Assistant(company_id=COMPANY_ID, dtmf=None)
call_metadata["agent_names"].add(agent.name)
await current_session.start(
agent=agent,
)
for identity, participant in ctx.room.remote_participants.items():
if identity.startswith("sip_"):
phone_number = identity[4:] # Remove 'sip_' prefix
print(f"Caller Number: {phone_number}")
call_metadata["caller_number"] = phone_number
else:
print(f" - {identity}")
await current_session.generate_reply(instructions=agent.instructions)
print(f"[Entrypoint] Agent: {ctx.agent.name}")
@ctx.room.on("sip_dtmf_received")
def on_dtmf(dtmf_event):
digit = dtmf_event.digit
print(f"[Entrypoint] DTMF received: {digit}")
asyncio.create_task(handle_dtmf_input(digit, ctx, call_metadata))
dtmf_agents_map: Dict[str, Assistant] = {}
async def handle_dtmf_input(digit: str, ctx: agents.JobContext, call_metadata: dict):
global current_session, current_room_io, all_histories, dtmf_agents_map
COMPANY_ID = get_active_company_id()
print(f"CTX: {ctx.room.local_participant}")
print(f"SESSION: {ctx.job.participant.attributes}")
if current_room_io:
print("[Handle DTMF] Stopping current RoomIO...")
await current_room_io.aclose()
current_room_io = None
current_session.interrupt()
current_session._chat_ctx.to_dict()
if current_session:
print("[Handle DTMF] Saving current session history...")
all_histories.append(current_session.history.to_dict())
print("[Handle DTMF] Closing current session...")
await current_session.aclose()
current_session = None
await asyncio.sleep(0.2)
if not dtmf_agents_map:
agents = get_agents_for_company(COMPANY_ID)
dtmf_agents_map = {
agent["dtmf"]: Assistant(company_id=COMPANY_ID, dtmf=agent["dtmf"])
for agent in agents if agent.get("dtmf") is not None
}
new_agent = dtmf_agents_map.get(digit)
if not new_agent:
print(f"[Handle DTMF] Invalid or undefined DTMF input: {digit}")
return
call_metadata["agent_names"].add(new_agent.name)
print(f"[Handle DTMF] Switching to agent for DTMF {digit}...")
current_session = AgentSession(
llm=openai.realtime.RealtimeModel(
turn_detection=TurnDetection(
type="server_vad",
threshold=0.5,
prefix_padding_ms=300,
silence_duration_ms=500,
create_response=True,
interrupt_response=True,
)),
)
current_room_io = RoomIO(current_session, room=ctx.room)
await current_room_io.start()
await current_session.start(agent=new_agent)
print(f"[Handle DTMF] New session started: {current_session}")
print(f"ALL PARTICIOANTS: {ctx.room.remote_participants}")
for identity, participant in ctx.room.remote_participants.items():
if identity.startswith("sip_"):
phone_number = identity[4:] # Remove 'sip_' prefix
print(f"Caller Number: {phone_number}")
else:
print(f" - {identity}")
reply_instuctions = new_agent.instructions
print(f"[Handle DTMF] New agent instructions: {reply_instuctions}")
await current_session.generate_reply(instructions=new_agent.instructions)
if __name__ == "__main__":
agents.cli.run_app(agents.WorkerOptions(
entrypoint_fnc=entrypoint,
agent_name="my-telephony-agent",
))
tall-airline-16756
06/27/2025, 9:22 AMfind_micro_track_id
function in your code. It's actually pretty straightforward! You'll want to retrieve the audio_track_id
for the local participant before creating the TrackCompositeEgressRequest
. Here's a friendly walkthrough of how to modify your entrypoint
function:
async def entrypoint(ctx: agents.JobContext):
global current_session, current_room_io, all_histories
COMPANY_ID = get_active_company_id()
# Retrieve the audio track ID for the local participant
audio_track_id = find_micro_track_id(ctx.room, ctx.room.local_participant.identity)
call_metadata = {
"start_time": None,
"end_time": None,
"caller_number": None,
"agent_names": set(),
"call_id": str(uuid.uuid4())
}
req = api.TrackCompositeEgressRequest(
room_name=ctx.room.name,
audio_track_id=audio_track_id,
file_outputs=[
api.EncodedFileOutput(
file_type=api.EncodedFileType.MP4,
filepath="recordings/",
s3=api.S3Upload(
bucket="egress-recordings-fn",
region="us-east-1",
access_key=os.getenv("S3_ACCESS_KEY"),
secret=os.getenv("S3_SECRET_KEY"),
),
)
]
)
lkapi = api.LiveKitAPI()
res = await lkapi.egress.start_track_composite_egress(req)
await lkapi.aclose()
# Rest of your code remains unchanged
This approach dynamically grabs the audio_track_id
for the local participant before making the egress request. Pretty neat, right? š Need any more help or have questions? I'm here for you! Just ask, and I'll be happy to explain further. šhelpful-machine-32005
06/27/2025, 9:25 AMtall-airline-16756
06/27/2025, 9:25 AMrtc
in the find_micro_track_id
function. If you're working with the LiveKit Python SDK, definitely keep that rtc
- it's super helpful! Think of it like your backstage pass to the room, giving you easy access to all the participants and their track details. Looks like you're diving into some cool video/audio functionality. Anything specific about rtc
you want to chat about? I'm all ears and ready to help you smooth out any bumps in your code! šhelpful-machine-32005
06/27/2025, 9:25 AMtall-airline-16756
06/27/2025, 9:26 AMfrom livekit import rtc
import os
import uuid
from datetime import datetime
import json
from decimal import Decimal
# Function to find the microphone track ID
def find_micro_track_id(room: rtc.Room, identity: str) -> str:
p: rtc.RemoteParticipant | rtc.LocalParticipant | None = room.remote_participants.get(identity)
if identity == room.local_participant.identity:
p = room.local_participant
if p is None:
raise ValueError(f"Participant {identity} not found")
# Find the first microphone track
track_id = None
for track in p.track_publications.values():
if track.source == rtc.TrackSource.SOURCE_MICROPHONE:
track_id = track.sid
break
if track_id is None:
raise ValueError(f"Participant {identity} does not have a microphone track")
return track_id
# Main entrypoint function
async def entrypoint(ctx: agents.JobContext):
global current_session, current_room_io, all_histories
COMPANY_ID = get_active_company_id()
# Retrieve the audio track ID for the local participant
audio_track_id = find_micro_track_id(ctx.room, ctx.room.local_participant.identity)
call_metadata = {
"start_time": None,
"end_time": None,
"caller_number": None,
"agent_names": set(),
"call_id": str(uuid.uuid4())
}
req = api.TrackCompositeEgressRequest(
room_name=ctx.room.name,
audio_track_id=audio_track_id,
file_outputs=[
api.EncodedFileOutput(
file_type=api.EncodedFileType.MP4,
filepath="recordings/",
s3=api.S3Upload(
bucket="egress-recordings-fn",
region="us-east-1",
access_key=os.getenv("S3_ACCESS_KEY"),
secret=os.getenv("S3_SECRET_KEY"),
),
)
]
)
lkapi = api.LiveKitAPI()
res = await lkapi.egress.start_track_composite_egress(req)
await lkapi.aclose()
async def write_transcript():
print("[Entrypoint] Writing transcript to file...")
current_date = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"./transcriptions/transcript_{ctx.room.name}_{current_date}.json"
transcript = all_histories.copy()
if current_session:
transcript.append(current_session.history.to_dict())
with open(filename, 'w', encoding='utf-8') as f:
json.dump(transcript, f, indent=2, ensure_ascii=False)
print(f"Transcript for {ctx.room.name} saved to {filename}")
# Save to DynamoDB
call_metadata["end_time"] = datetime.now(lebanon_tz).isoformat()
start = datetime.fromisoformat(call_metadata["start_time"])
end = datetime.fromisoformat(call_metadata["end_time"])
duration = Decimal(str((end - start).total_seconds()))
item = {
"company_id": COMPANY_ID,
"call_id": call_metadata["call_id"],
"start_time": call_metadata["start_time"],
"end_time": call_metadata["end_time"],
"duration": duration,
"caller_number": call_metadata["caller_number"],
"agent_names": list(call_metadata["agent_names"]),
"full_transcript": transcript,
}
logs_table.put_item(Item=item)
print(f"[Entrypoint] Call log saved for {COMPANY_ID} - {item['call_id']}")
ctx.add_shutdown_callback(write_transcript)
await ctx.connect()
print(f"Agent Name: {ctx.job.agent_name}")
print(f"[Entrypoint] Model: {model_name}")
call_metadata["start_time"] = datetime.now(lebanon_tz).isoformat()
current_session = AgentSession(
llm=openai.realtime.RealtimeModel(
turn_detection=None,
),
allow_interruptions=False,
)
current_room_io = RoomIO(current_session, room=ctx.room)
await current_room_io.start()
agent = Assistant(company_id=COMPANY_ID, dtmf=None)
call_metadata["agent_names"].add(agent.name)
await current_session.start(
agent=agent,
)
for identity, participant in ctx.room.remote_participants.items():
if identity.startswith("sip_"):
phone_number = identity[4:] # Remove 'sip_' prefix
print(f"Caller Number: {phone_number}")
call_metadata["caller_number"] = phone_number
else:
print(f" - {identity}")
await current_session.generate_reply(instructions=agent.instructions)
print(f"[Entrypoint] Agent: {ctx.agent.name}")
@ctx.room.on("sip_dtmf_received")
def on_dtmf(dtmf_event):
digit = dtmf_event.digit
print(f"[Entrypoint] DTMF received: {digit}")
asyncio.create_task(handle_dtmf_input(digit, ctx, call_metadata))
if __name__ == "__main__":
agents.cli.run_app(agents.WorkerOptions(
entrypoint_fnc=entrypoint,
agent_name="my-telephony-agent",
))
Hey there! I've got some exciting news for you. I've integrated the find_micro_track_id
function into the entrypoint
function, which means we can now dynamically retrieve the audio_track_id
super easily. š It's like having a smart helper that automatically finds the right microphone track for us. Pretty cool, right? If you need any more help or have questions, just give me a shout! I'm always here to help make your coding journey smoother. šhelpful-machine-32005
06/27/2025, 9:33 AMFile "C:\Users\user\Desktop\livekit\livekit-sip-agent\agent\agent.py", line 92, in entrypoint
audio_track_id = find_micro_track_id(ctx.room, ctx.room.local_participant.identity)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\user\Desktop\livekit\livekit-sip-agent\livekit-venv\Lib\site-packages\livekit\rtc\room.py", line 181, in local_participant
raise Exception("cannot access local participant before connecting")
Exception: cannot access local participant before connecting
tall-airline-16756
06/27/2025, 9:34 AMlocal_participant
before the room is fully connected - which is like trying to shake hands with someone before they've even walked into the room! š Here's a super simple fix that'll get things working smoothly: 1. Just move the find_micro_track_id
call to right after you connect the room in your entrypoint
function. 2. Updated code snippet:
async def entrypoint(ctx: agents.JobContext):
global current_session, current_room_io, all_histories
COMPANY_ID = get_active_company_id()
call_metadata = {
"start_time": None,
"end_time": None,
"caller_number": None,
"agent_names": set(),
"call_id": str(uuid.uuid4())
}
await ctx.connect() # Ensure the room is connected before accessing local_participant
# Retrieve the audio track ID for the local participant
audio_track_id = find_micro_track_id(ctx.room, ctx.room.local_participant.identity)
req = api.TrackCompositeEgressRequest(
room_name=ctx.room.name,
audio_track_id=audio_track_id,
file_outputs=[
api.EncodedFileOutput(
file_type=api.EncodedFileType.MP4,
filepath="recordings/",
s3=api.S3Upload(
bucket="egress-recordings-fn",
region="us-east-1",
access_key=os.getenv("S3_ACCESS_KEY"),
secret=os.getenv("S3_SECRET_KEY"),
),
)
]
)
lkapi = api.LiveKitAPI()
res = await lkapi.egress.start_track_composite_egress(req)
await lkapi.aclose()
# Rest of your code remains unchanged
By adding the await ctx.connect()
before accessing the local_participant
, you're making sure everything is ready to go. It's like making sure the coffee machine is plugged in before trying to brew a cup! ā Hope this helps, and feel free to reach out if you run into any other bumps along the way. We'll solve them together! š¤helpful-machine-32005
06/27/2025, 9:35 AMtall-airline-16756
06/27/2025, 9:35 AMctx.connect()
, grabbing that local_participant
directly can be tricky. No worries, though! You've got a couple of options to work around this. One quick fix could be using a predefined audio_track_id
if you happen to know it. Or, we might need to get a bit creative and tweak the logic to snag that track ID in a different way. I'd love to help you nail down the perfect solution. Do you have any more details about your specific setup that could give us some extra insight? The more you can share, the better I can help you solve this puzzle! šhelpful-machine-32005
06/27/2025, 9:40 AMtall-airline-16756
06/27/2025, 9:40 AM