How can I get the audio_track_id here: ```async d...
# ask-ai
h
How can I get the audio_track_id here:
Copy code
async def entrypoint(ctx: agents.JobContext):
    
    req = api.TrackCompositeEgressRequest(
        room_name=ctx.room.name,
        audio_track_id="",
        file_outputs=[
            api.EncodedFileOutput(
                file_type=api.EncodedFileType.MP4,
                filepath="recordings/",
                s3=api.S3Upload(
                    bucket="egress-recordings-fn",
                    region="us-east-1",
                    access_key=os.getenv("S3_ACCESS_KEY"),
                    secret=os.getenv("S3_SECRET_KEY"),
                ),
            )
        ]
    )

    lkapi = api.LiveKitAPI()
    res = await lkapi.egress.start_track_composite_egress(req)

    await lkapi.aclose()
t
Thanks for reaching out, Fatima. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
To get the
audio_track_id
for your
TrackCompositeEgressRequest
, you need to obtain the ID of the audio track from your LiveKit room. This ID is required to specify which audio track to composite. Typically, you can retrieve the track ID from the room's track information before making the egress request. If you need further assistance on how to retrieve track information, let me know. Sources: Egress API | LiveKit Docs | TrackCompositeOptions | LiveKit JS Server SDK - v2.9.7 | EgressClient | LiveKit JS Server SDK - v2.9.7
h
How do I make it in code?
t
Hey there! šŸ‘‹ Let me help you find that microphone track ID in a super straightforward way. Here's a handy Python function that'll do the trick for you:
Copy code
def find_micro_track_id(room: rtc.Room, identity: str) -> str:
    p: rtc.RemoteParticipant | rtc.LocalParticipant | None = room.remote_participants.get(identity)
    if identity == room.local_participant.identity:
        p = room.local_participant

    if p is None:
        raise ValueError(f"participant {identity} not found")

    # find first microphone track
    track_id = None
    for track in p.track_publications.values():
        if track.source == rtc.TrackSource.SOURCE_MICROPHONE:
            track_id = track.sid
            break

    if track_id is None:
        raise ValueError(f"participant {identity} does not have a microphone track")

    return track_id
I've got you covered! This function is pretty straightforward – just pass in the LiveKit room and the participant's identity, and it'll hunt down their microphone track's session ID. If something goes wrong (like not finding the participant or their mic track), it'll let you know with a helpful error message. Need me to break it down further or help you with anything else? I'm all ears! 😊 Sources: GitHub::livekit::agents | livekit-agents/livekit/agents/voice/room_io/_outp...
h
How do I apply it here:
Copy code
async def entrypoint(ctx: agents.JobContext):
    
    global current_session, current_room_io, all_histories
    
    COMPANY_ID = get_active_company_id()
    
    call_metadata = {
        "start_time": None,
        "end_time": None,
        "caller_number": None,
        "agent_names": set(),
        "call_id": str(uuid.uuid4())
    }
    
    req = api.TrackCompositeEgressRequest(
        room_name=ctx.room.name,
        audio_track_id=ctx.room.local_participant.track_publications,
        file_outputs=[
            api.EncodedFileOutput(
                file_type=api.EncodedFileType.MP4,
                filepath="recordings/",
                s3=api.S3Upload(
                    bucket="egress-recordings-fn",
                    region="us-east-1",
                    access_key=os.getenv("S3_ACCESS_KEY"),
                    secret=os.getenv("S3_SECRET_KEY"),
                ),
            )
        ]
    )

    
    lkapi = api.LiveKitAPI()
    # res = await lkapi.egress.start_room_composite_egress(req)
    res = await lkapi.egress.start_track_composite_egress(req)

    await lkapi.aclose()
    
    async def write_transcript():
        print("[Entrypoint] Writing transcript to file...")
        current_date = datetime.now().strftime("%Y%m%d_%H%M%S")

        filename = f"./transcriptions/transcript_{ctx.room.name}_{current_date}.json"
        
        transcript = all_histories.copy()
        if current_session:
            transcript.append(current_session.history.to_dict())
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(transcript, f, indent=2, ensure_ascii=False)
            
        print(f"Transcript for {ctx.room.name} saved to {filename}")
        
        # Save to DynamoDB
        call_metadata["end_time"] = datetime.now(lebanon_tz).isoformat()
        start = datetime.fromisoformat(call_metadata["start_time"])
        end = datetime.fromisoformat(call_metadata["end_time"])
        duration = Decimal(str((end - start).total_seconds()))

        item = {
            "company_id": COMPANY_ID,
            "call_id": call_metadata["call_id"],
            "start_time": call_metadata["start_time"],
            "end_time": call_metadata["end_time"],
            "duration": duration,
            "caller_number": call_metadata["caller_number"],
            "agent_names": list(call_metadata["agent_names"]),
            "full_transcript": transcript,
        }

        logs_table.put_item(Item=item)
        print(f"[Entrypoint] Call log saved for {COMPANY_ID} - {item['call_id']}")

    ctx.add_shutdown_callback(write_transcript)
    
    await ctx.connect()
    
    print(f"Agent Name: {ctx.job.agent_name}")
    print(f"[Entrypoint] Model: {model_name}")
    
    call_metadata["start_time"] = datetime.now(lebanon_tz).isoformat()
    
    current_session = AgentSession(
        llm=openai.realtime.RealtimeModel(
            turn_detection=None,
        ),
        allow_interruptions=False,
    )  
    
    current_room_io = RoomIO(current_session, room=ctx.room)
    await current_room_io.start()
    
    agent = Assistant(company_id=COMPANY_ID, dtmf=None)
    call_metadata["agent_names"].add(agent.name)
    
    await current_session.start(
        agent=agent,
    )
    
    for identity, participant in ctx.room.remote_participants.items():
        if identity.startswith("sip_"):
            phone_number = identity[4:]  # Remove 'sip_' prefix
            print(f"Caller Number: {phone_number}")
            call_metadata["caller_number"] = phone_number
        else:
            print(f"  - {identity}")

    await current_session.generate_reply(instructions=agent.instructions)
    print(f"[Entrypoint] Agent: {ctx.agent.name}")
    
    @ctx.room.on("sip_dtmf_received")
    def on_dtmf(dtmf_event):
        digit = dtmf_event.digit
        print(f"[Entrypoint] DTMF received: {digit}")
        asyncio.create_task(handle_dtmf_input(digit, ctx, call_metadata))
        
dtmf_agents_map: Dict[str, Assistant] = {}   

async def handle_dtmf_input(digit: str, ctx: agents.JobContext, call_metadata: dict):

    global current_session, current_room_io, all_histories, dtmf_agents_map
    
    COMPANY_ID = get_active_company_id()

    print(f"CTX: {ctx.room.local_participant}")
    print(f"SESSION: {ctx.job.participant.attributes}")
    
    if current_room_io:
        print("[Handle DTMF] Stopping current RoomIO...")
        await current_room_io.aclose()
        current_room_io = None

    current_session.interrupt()
    current_session._chat_ctx.to_dict()
    
    if current_session:
        print("[Handle DTMF] Saving current session history...")
        all_histories.append(current_session.history.to_dict())
        
        print("[Handle DTMF] Closing current session...")
        await current_session.aclose()
        current_session = None

    await asyncio.sleep(0.2)
    
    if not dtmf_agents_map:
        agents = get_agents_for_company(COMPANY_ID)
        dtmf_agents_map = {
            agent["dtmf"]: Assistant(company_id=COMPANY_ID, dtmf=agent["dtmf"])
            for agent in agents if agent.get("dtmf") is not None
        }

    new_agent = dtmf_agents_map.get(digit)
    
    if not new_agent:
        print(f"[Handle DTMF] Invalid or undefined DTMF input: {digit}")
        return

    call_metadata["agent_names"].add(new_agent.name)
    print(f"[Handle DTMF] Switching to agent for DTMF {digit}...")
    
    current_session = AgentSession(
        llm=openai.realtime.RealtimeModel(
            turn_detection=TurnDetection(
            type="server_vad",
            threshold=0.5,
            prefix_padding_ms=300,
            silence_duration_ms=500,
            create_response=True,
            interrupt_response=True,
        )),
    )        
    
    current_room_io = RoomIO(current_session, room=ctx.room)
    await current_room_io.start()
    
    await current_session.start(agent=new_agent)
    print(f"[Handle DTMF] New session started: {current_session}")
    
    print(f"ALL PARTICIOANTS: {ctx.room.remote_participants}")
    
    for identity, participant in ctx.room.remote_participants.items():
        if identity.startswith("sip_"):
            phone_number = identity[4:]  # Remove 'sip_' prefix
            print(f"Caller Number: {phone_number}")
        else:
            print(f"  - {identity}")
    reply_instuctions = new_agent.instructions
    print(f"[Handle DTMF] New agent instructions: {reply_instuctions}")
    await current_session.generate_reply(instructions=new_agent.instructions)

if __name__ == "__main__":
    agents.cli.run_app(agents.WorkerOptions(
        entrypoint_fnc=entrypoint,
        agent_name="my-telephony-agent",
        ))
t
Hey there! 😊 Let me help you understand how to apply the
find_micro_track_id
function in your code. It's actually pretty straightforward! You'll want to retrieve the
audio_track_id
for the local participant before creating the
TrackCompositeEgressRequest
. Here's a friendly walkthrough of how to modify your
entrypoint
function:
Copy code
async def entrypoint(ctx: agents.JobContext):
    global current_session, current_room_io, all_histories

    COMPANY_ID = get_active_company_id()

    # Retrieve the audio track ID for the local participant
    audio_track_id = find_micro_track_id(ctx.room, ctx.room.local_participant.identity)

    call_metadata = {
        "start_time": None,
        "end_time": None,
        "caller_number": None,
        "agent_names": set(),
        "call_id": str(uuid.uuid4())
    }

    req = api.TrackCompositeEgressRequest(
        room_name=ctx.room.name,
        audio_track_id=audio_track_id,
        file_outputs=[
            api.EncodedFileOutput(
                file_type=api.EncodedFileType.MP4,
                filepath="recordings/",
                s3=api.S3Upload(
                    bucket="egress-recordings-fn",
                    region="us-east-1",
                    access_key=os.getenv("S3_ACCESS_KEY"),
                    secret=os.getenv("S3_SECRET_KEY"),
                ),
            )
        ]
    )

    lkapi = api.LiveKitAPI()
    res = await lkapi.egress.start_track_composite_egress(req)

    await lkapi.aclose()

    # Rest of your code remains unchanged
This approach dynamically grabs the
audio_track_id
for the local participant before making the egress request. Pretty neat, right? šŸŽ‰ Need any more help or have questions? I'm here for you! Just ask, and I'll be happy to explain further. šŸ‘
h
Do I keep the rtc in the function (find_micro_track_id)?
t
Hey there! šŸ‘‹ Great question about the
rtc
in the
find_micro_track_id
function. If you're working with the LiveKit Python SDK, definitely keep that
rtc
- it's super helpful! Think of it like your backstage pass to the room, giving you easy access to all the participants and their track details. Looks like you're diving into some cool video/audio functionality. Anything specific about
rtc
you want to chat about? I'm all ears and ready to help you smooth out any bumps in your code! 😊
h
Now give me the full codes
t
Copy code
from livekit import rtc
import os
import uuid
from datetime import datetime
import json
from decimal import Decimal

# Function to find the microphone track ID
def find_micro_track_id(room: rtc.Room, identity: str) -> str:
    p: rtc.RemoteParticipant | rtc.LocalParticipant | None = room.remote_participants.get(identity)
    if identity == room.local_participant.identity:
        p = room.local_participant

    if p is None:
        raise ValueError(f"Participant {identity} not found")

    # Find the first microphone track
    track_id = None
    for track in p.track_publications.values():
        if track.source == rtc.TrackSource.SOURCE_MICROPHONE:
            track_id = track.sid
            break

    if track_id is None:
        raise ValueError(f"Participant {identity} does not have a microphone track")

    return track_id

# Main entrypoint function
async def entrypoint(ctx: agents.JobContext):
    global current_session, current_room_io, all_histories

    COMPANY_ID = get_active_company_id()

    # Retrieve the audio track ID for the local participant
    audio_track_id = find_micro_track_id(ctx.room, ctx.room.local_participant.identity)

    call_metadata = {
        "start_time": None,
        "end_time": None,
        "caller_number": None,
        "agent_names": set(),
        "call_id": str(uuid.uuid4())
    }

    req = api.TrackCompositeEgressRequest(
        room_name=ctx.room.name,
        audio_track_id=audio_track_id,
        file_outputs=[
            api.EncodedFileOutput(
                file_type=api.EncodedFileType.MP4,
                filepath="recordings/",
                s3=api.S3Upload(
                    bucket="egress-recordings-fn",
                    region="us-east-1",
                    access_key=os.getenv("S3_ACCESS_KEY"),
                    secret=os.getenv("S3_SECRET_KEY"),
                ),
            )
        ]
    )

    lkapi = api.LiveKitAPI()
    res = await lkapi.egress.start_track_composite_egress(req)

    await lkapi.aclose()

    async def write_transcript():
        print("[Entrypoint] Writing transcript to file...")
        current_date = datetime.now().strftime("%Y%m%d_%H%M%S")

        filename = f"./transcriptions/transcript_{ctx.room.name}_{current_date}.json"

        transcript = all_histories.copy()
        if current_session:
            transcript.append(current_session.history.to_dict())

        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(transcript, f, indent=2, ensure_ascii=False)

        print(f"Transcript for {ctx.room.name} saved to {filename}")

        # Save to DynamoDB
        call_metadata["end_time"] = datetime.now(lebanon_tz).isoformat()
        start = datetime.fromisoformat(call_metadata["start_time"])
        end = datetime.fromisoformat(call_metadata["end_time"])
        duration = Decimal(str((end - start).total_seconds()))

        item = {
            "company_id": COMPANY_ID,
            "call_id": call_metadata["call_id"],
            "start_time": call_metadata["start_time"],
            "end_time": call_metadata["end_time"],
            "duration": duration,
            "caller_number": call_metadata["caller_number"],
            "agent_names": list(call_metadata["agent_names"]),
            "full_transcript": transcript,
        }

        logs_table.put_item(Item=item)
        print(f"[Entrypoint] Call log saved for {COMPANY_ID} - {item['call_id']}")

    ctx.add_shutdown_callback(write_transcript)

    await ctx.connect()

    print(f"Agent Name: {ctx.job.agent_name}")
    print(f"[Entrypoint] Model: {model_name}")

    call_metadata["start_time"] = datetime.now(lebanon_tz).isoformat()

    current_session = AgentSession(
        llm=openai.realtime.RealtimeModel(
            turn_detection=None,
        ),
        allow_interruptions=False,
    )

    current_room_io = RoomIO(current_session, room=ctx.room)
    await current_room_io.start()

    agent = Assistant(company_id=COMPANY_ID, dtmf=None)
    call_metadata["agent_names"].add(agent.name)

    await current_session.start(
        agent=agent,
    )

    for identity, participant in ctx.room.remote_participants.items():
        if identity.startswith("sip_"):
            phone_number = identity[4:]  # Remove 'sip_' prefix
            print(f"Caller Number: {phone_number}")
            call_metadata["caller_number"] = phone_number
        else:
            print(f"  - {identity}")

    await current_session.generate_reply(instructions=agent.instructions)
    print(f"[Entrypoint] Agent: {ctx.agent.name}")

    @ctx.room.on("sip_dtmf_received")
    def on_dtmf(dtmf_event):
        digit = dtmf_event.digit
        print(f"[Entrypoint] DTMF received: {digit}")
        asyncio.create_task(handle_dtmf_input(digit, ctx, call_metadata))

if __name__ == "__main__":
    agents.cli.run_app(agents.WorkerOptions(
        entrypoint_fnc=entrypoint,
        agent_name="my-telephony-agent",
    ))
Hey there! I've got some exciting news for you. I've integrated the
find_micro_track_id
function into the
entrypoint
function, which means we can now dynamically retrieve the
audio_track_id
super easily. 😊 It's like having a smart helper that automatically finds the right microphone track for us. Pretty cool, right? If you need any more help or have questions, just give me a shout! I'm always here to help make your coding journey smoother. šŸš€
h
I got this error:
Copy code
File "C:\Users\user\Desktop\livekit\livekit-sip-agent\agent\agent.py", line 92, in entrypoint
    audio_track_id = find_micro_track_id(ctx.room, ctx.room.local_participant.identity)
                                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\user\Desktop\livekit\livekit-sip-agent\livekit-venv\Lib\site-packages\livekit\rtc\room.py", line 181, in local_participant
    raise Exception("cannot access local participant before connecting")
Exception: cannot access local participant before connecting
t
Hey there! šŸ‘‹ I totally understand the frustration with this error. Don't worry, I've got a friendly solution that'll help you smooth out this little hiccup in your code. The issue is that you're trying to grab the
local_participant
before the room is fully connected - which is like trying to shake hands with someone before they've even walked into the room! šŸ˜„ Here's a super simple fix that'll get things working smoothly: 1. Just move the
find_micro_track_id
call to right after you connect the room in your
entrypoint
function. 2. Updated code snippet:
Copy code
async def entrypoint(ctx: agents.JobContext):
    global current_session, current_room_io, all_histories

    COMPANY_ID = get_active_company_id()

    call_metadata = {
        "start_time": None,
        "end_time": None,
        "caller_number": None,
        "agent_names": set(),
        "call_id": str(uuid.uuid4())
    }

    await ctx.connect()  # Ensure the room is connected before accessing local_participant

    # Retrieve the audio track ID for the local participant
    audio_track_id = find_micro_track_id(ctx.room, ctx.room.local_participant.identity)

    req = api.TrackCompositeEgressRequest(
        room_name=ctx.room.name,
        audio_track_id=audio_track_id,
        file_outputs=[
            api.EncodedFileOutput(
                file_type=api.EncodedFileType.MP4,
                filepath="recordings/",
                s3=api.S3Upload(
                    bucket="egress-recordings-fn",
                    region="us-east-1",
                    access_key=os.getenv("S3_ACCESS_KEY"),
                    secret=os.getenv("S3_SECRET_KEY"),
                ),
            )
        ]
    )

    lkapi = api.LiveKitAPI()
    res = await lkapi.egress.start_track_composite_egress(req)

    await lkapi.aclose()

    # Rest of your code remains unchanged
By adding the
await ctx.connect()
before accessing the
local_participant
, you're making sure everything is ready to go. It's like making sure the coffee machine is plugged in before trying to brew a cup! ā˜• Hope this helps, and feel free to reach out if you run into any other bumps along the way. We'll solve them together! šŸ¤
h
But in the docs, they said that the egress request should be send before ctx.connect()
t
Hey there! I totally get the challenge you're facing with the egress request timing. If you're sending the request before
ctx.connect()
, grabbing that
local_participant
directly can be tricky. No worries, though! You've got a couple of options to work around this. One quick fix could be using a predefined
audio_track_id
if you happen to know it. Or, we might need to get a bit creative and tweak the logic to snag that track ID in a different way. I'd love to help you nail down the perfect solution. Do you have any more details about your specific setup that could give us some extra insight? The more you can share, the better I can help you solve this puzzle! 😊
h
Can you help me on how to do it
t
I don't have the answer you're looking for. You could also try asking your question: • in one of the other Slack channels or • to https://deepwiki.com/livekit/livekit_composite which is trained on all LiveKit source code If you find the answer, please post it here to help others!