orange-salesclerk-64602
08/11/2025, 12:36 PMtall-airline-16756
08/11/2025, 12:36 PMtall-airline-16756
08/11/2025, 12:38 PMautoSubscribe
is turned on if you want subscriptions to happen automatically. If it’s off, you’ll need to manage subscriptions yourself.
2. Set the Subscribed Flag Manually: If you’ve got autoSubscribe
disabled, don’t forget to set the subscribed flag manually for your tracks. Here’s a quick example: if err := pub.SetSubscribed(true); err != nil {
log.Errorw("cannot subscribe to the track", err)
return
}
3. Ensure Room is Ready: Double-check that the room is fully connected before you start handling track subscriptions. Using connection state callbacks can really help confirm this.
4. Handle Manual Subscriptions: If you’re going the manual route with subscriptions, make sure to process them only after the room is ready. Something like this: func (r *Room) OnTrackPublished(...) {
if !r.subscribe.Load() {
log.Debugw("skipping track publish event - subscribed flag not set")
return
}
// Handle subscription
}
5. Check for Race Conditions: It’s a good idea to use synchronization tools like core.Fuse
to keep your events in the right order and avoid any timing issues.
6. Review Room Configuration: Take a moment to verify your RoomConfig
has all the essentials, like WsUrl
, RoomName
, and participant info.
Give these a try, and hopefully, that clears things up! If you want to chat more about any of these steps or need a hand, just let me know—I’m here to help.orange-salesclerk-64602
08/11/2025, 12:40 PMtall-airline-16756
08/11/2025, 12:40 PM// Make sure the room is ready before handling track subscriptions
func (r *Room) OnTrackPublished(pub *TrackPublication) {
if !r.subscribe.Load() { // Check if the subscribed flag is set
log.Debugw("skipping track publish event - subscribed flag not set")
return
}
// Set the subscribed flag manually
if err := pub.SetSubscribed(true); err != nil {
log.Errorw("cannot subscribe to the track", err)
return
}
log.Infow("Track successfully subscribed", "trackID", pub.TrackID())
}
Try plugging this into your track subscription flow and see how it goes. And hey, if anything’s unclear or you want me to walk you through more examples, just give me a shout!orange-salesclerk-64602
08/11/2025, 1:02 PMtall-airline-16756
08/11/2025, 1:03 PMroom = await room.connect(url, token, options=rtc.RoomOptions(auto_subscribe=False))
2. Subscribe to Tracks: When tracks get published, you’ll want to subscribe to them using the TrackPublication
object: `@room.on("track_published")
def on_track_published(publication, participant):
publication.set_subscribed(True)`Don’t forget to also subscribe to any tracks that were published before your bot joined: for participant in room.remote_participants:
for publication in participant.track_publications.values():
publication.set_subscribed(True)
3. Handle Subscribed Tracks: Use the track_subscribed
event to work with the tracks once subscribed. For example: @room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
if track.kind == rtc.TrackKind.KIND_AUDIO:
audio_stream = rtc.AudioStream(track)
asyncio.create_task(process_audio_stream(audio_stream, participant))
4. Process Audio Tracks: Here’s a handy function to handle those audio streams: async def process_audio_stream(audio_stream, participant):
try:
async for event in audio_stream:
# Process the audio frame here
pass
except Exception as e:
print(f"Error processing audio stream from {participant.identity}: {e}")
finally:
await audio_stream.aclose()
If anything’s unclear or you want a hand with the implementation, just let me know—I’m here to help!orange-salesclerk-64602
08/11/2025, 1:06 PMtall-airline-16756
08/11/2025, 1:06 PMimport asyncio
import logging
from livekit import rtc, api
async def main():
async def on_track_subscribed(track, publication, participant):
if track.kind == rtc.TrackKind.KIND_AUDIO:
audio_stream = rtc.AudioStream(track)
asyncio.create_task(process_audio_stream(audio_stream, participant))
async def process_audio_stream(audio_stream, participant):
try:
async for event in audio_stream:
# Here’s where you can process the audio frame
pass
except Exception as e:
logging.error(f"Oops! There was an error processing audio from {participant.identity}: {e}")
finally:
await audio_stream.aclose()
# Setting up logging so you can keep an eye on what’s happening
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)
# Create an access token with the right permissions
token = api.AccessToken()
.with_identity("my-bot")
.with_grants(api.VideoGrants(room_join=True, room="my-room"))
.to_jwt()
# Create the room instance
room = rtc.Room()
# Connect to the room, and we’re disabling auto_subscribe for manual control
await room.connect(
"<wss://your-livekit-server.com>",
token,
options=rtc.RoomOptions(auto_subscribe=False)
)
# Set up event handlers to manage tracks smoothly
room.on("track_published")(lambda pub, p: pub.set_subscribed(True))
room.on("track_subscribed")(on_track_subscribed)
room.on("track_unsubscribed")(lambda track, pub, p: <http://logging.info|logging.info>(f"Track unsubscribed: {pub.sid} from {p.identity}"))
# Keep the bot up and running
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
Here’s a quick rundown of what this code does:
• Connects to a LiveKit room with auto_subscribe
turned off, giving you full control.
• Manually subscribes to tracks as they get published.
• Handles the subscribed tracks and processes audio streams in real-time.
• Keeps track of events by logging them, so you always know what’s going on.
Just swap out the placeholders like "<wss://your-livekit-server.com>"
and "my-room"
with your actual server URL and room name. If you hit any bumps or want to chat more about this, I’m here to help!orange-salesclerk-64602
08/11/2025, 1:09 PMtall-airline-16756
08/11/2025, 1:10 PMimport asyncio
import os
import uuid
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli, AutoSubscribe, RoomInputOptions, RoomOutputOptions
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import openai, deepgram, cartesia, silero, azure
from livekit import rtc
from livekit.plugins.turn_detector.multilingual import MultilingualModel
load_dotenv()
# Configuration
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
SIP_TRUNK_ID = os.getenv("SIP_TRUNK_ID")
SIP_CALL_TO = os.getenv("SIP_CALL_TO", "+1234567890")
ROOM_NAME = f"room-{uuid.uuid4()}"
azure_copilotgpt_endpoint = os.getenv("azure_copilotgpt_endpoint")
azure_copilotgpt_deployment = os.getenv("azure_copilotgpt_deployment")
api_copilotgpt_key = os.getenv("api_copilotgpt_key")
api_copilotgpt_version = os.getenv("api_copilotgpt_version")
class OutboundBot(Agent):
def __init__(self):
super().__init__(
instructions="You are a helpful voice assistant making an outbound call."
)
async def on_enter(self):
await self.session.say("Hello! This is an automated call. How can I help you today?")
async def entrypoint(ctx: JobContext):
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
session = AgentSession(
stt=openai.STT.with_azure(model="whisper"),
llm=openai.LLM.with_azure(
azure_endpoint=azure_copilotgpt_endpoint,
azure_deployment=azure_copilotgpt_deployment,
api_key=api_copilotgpt_key,
api_version=api_copilotgpt_version
),
tts=azure.TTS(voice="en-IN-AnanyaNeural"),
vad=silero.VAD.load(),
turn_detection=MultilingualModel(),
)
# Start the agent session to initialize room properly
await session.start(
agent=OutboundBot(),
room=ctx.room,
room_input_options=RoomInputOptions(audio_enabled=True),
room_output_options=RoomOutputOptions(audio_enabled=True),
)
# Add a small delay to ensure room is fully initialized
await asyncio.sleep(3)
# Create outbound call
await ctx.add_sip_participant(
call_to=os.getenv("SIP_CALL_TO"),
trunk_id=os.getenv("SIP_TRUNK_ID"),
participant_identity="magic-sip-caller", # Add this required parameter
participant_name="MAGIC-SIP Caller"
)
# Wait for call to connect
participant = await ctx.wait_for_participant(
kind=rtc.ParticipantKind.PARTICIPANT_KIND_SIP
)
# Handle track subscriptions
@ctx.room.on("track_published")
async def on_track_published(publication, participant):
publication.set_subscribed(True)
@ctx.room.on("track_subscribed")
async def on_track_subscribed(track, publication, participant):
if track.kind == rtc.TrackKind.KIND_AUDIO:
audio_stream = rtc.AudioStream(track)
asyncio.create_task(process_audio_stream(audio_stream, participant))
async def process_audio_stream(audio_stream, participant):
try:
async for event in audio_stream:
# Process the audio frame here
pass
except Exception as e:
print(f"Error processing audio stream from {participant.identity}: {e}")
finally:
await audio_stream.aclose()
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, agent_name="magicbricks-outbound-bot"))
I hope this helps! If you’d like me to tweak anything or if you have more questions, just let me know—I’m here to help.orange-salesclerk-64602
08/11/2025, 1:15 PM.on()
. Use asyncio.create_task
within your synchronous callback instead. {"pid": 18534, "job_id": "AJ_fKyTQUEeAhUn"}tall-airline-16756
08/11/2025, 1:16 PM.on()
method expects a synchronous callback, but it looks like an asynchronous function was passed in. No worries though — you can fix this by wrapping your async callback inside a synchronous function and then using asyncio.create_task
to handle the async part. Here’s a quick example to help you out:
# Define your async callback
async def on_track_published_async(publication, participant):
# Your async logic here
print(f"Track published: {publication.track_sid}")
# Wrap the async callback in a synchronous function
def on_track_published_sync(publication, participant):
asyncio.create_task(on_track_published_async(publication, participant))
# Register the synchronous wrapper with .on()
ctx.room.on("track_published", on_track_published_sync)
Just swap out your current on_track_published
registration with this setup. If you hit any snags or want me to walk you through it, feel free to ask — I’m here to help!orange-salesclerk-64602
08/11/2025, 2:01 PM.on()
. Use asyncio.create_task
within your synchronous callback instead. {"pid": 19829, "job_id": "AJ_XAEyM7ydqXV9"}
2025-08-11 193035,095 - ERROR asyncio - Task exception was never retrieved
future: <Task finished name='Task-64' coro=<entrypoint.<locals>.on_track_published_async() done, defined at /Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py:76> exception=AttributeError("'RemoteTrackPublication' object has no attribute 'track_sid'")>
Traceback (most recent call last):
File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py", line 78, in on_track_published_async
print(f"Track published: {publication.track_sid}")
AttributeError: 'RemoteTrackPublication' object has no attribute 'track_sid' {"pid": 19829, "job_id": "AJ_XAEyM7ydqXV9"}tall-airline-16756
08/11/2025, 2:02 PMRemoteTrackPublication
object doesn’t have a track_sid
attribute. Instead, you’ll want to use the sid
property to get the track ID. Here’s the updated code to help you out:
# Corrected async callback
def on_track_published_sync(publication, participant):
asyncio.create_task(on_track_published_async(publication, participant))
async def on_track_published_async(publication, participant):
# Use the 'sid' property to access the track ID
print(f"Track published: {publication.sid}")
Just swap out publication.track_sid
for publication.sid
in your code. If you run into any other issues or have questions, feel free to ask—I’m here to help!orange-salesclerk-64602
08/11/2025, 2:15 PM.on()
. Use asyncio.create_task
within your synchronous callback instead. {"pid": 20497, "job_id": "AJ_zxTSgfKPGSiF"}
2025-08-11 194445,796 - ERROR asyncio - Task exception was never retrieved
future: <Task finished name='Task-64' coro=<entrypoint.<locals>.on_track_published_async() done, defined at /Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py:76> exception=AttributeError("'RemoteTrackPublication' object has no attribute 'track_sid'")>
Traceback (most recent call last):
File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py", line 78, in on_track_published_async
print(f"Track published: {publication.track_sid}")
AttributeError: 'RemoteTrackPublication' object has no attribute 'track_sid' {"pid": 20497, "job_id": "AJ_zxTSgfKPGSiF"}tall-airline-16756
08/11/2025, 2:16 PMasyncio.create_task
to handle the async part smoothly. # Corrected callback registration
def on_track_published_sync(publication, participant):
asyncio.create_task(on_track_published_async(publication, participant))
ctx.room.on("track_published", on_track_published_sync)
2. Fix the Attribute Access: Instead of publication.track_sid
, use publication.sid
to get the track ID correctly. # Corrected async callback
async def on_track_published_async(publication, participant):
# Use the 'sid' property to access the track ID
print(f"Track published: {publication.sid}")
Make sure to update both parts in your code. If you hit any other snags or have questions, just let me know—I’m here to help!orange-salesclerk-64602
08/12/2025, 7:27 AMtall-airline-16756
08/12/2025, 7:28 AMauto_subscribe
is set to True
in your Room configuration. This way, tracks will automatically subscribe when participants join. room.connect(url, token, options=RoomOptions(auto_subscribe=True))
2. Verify Room Ready State: Double-check that the room is fully initialized and ready before you handle participant join events.
3. Manually Manage Subscriptions: If you have auto_subscribe
set to False
, you’ll want to manually subscribe to tracks when someone joins: @room.on("participant_joined")
def on_participant_joined(participant):
for publication in participant.track_publications.values():
publication.set_subscribed(True)
If you want me to walk you through any of these steps or if something isn’t clear, just let me know—I’m here to help!