I am getting this error skipping track publish eve...
# ask-ai
o
I am getting this error skipping track publish event - subscribed flag not set .. i have done all the suggestion given by Deepwiki .. but not able to fix this
t
Thanks for reaching out, Subodh. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
Hey there! That error message, "skipping track publish event - subscribed flag not set," usually pops up when the track subscription setup isn’t quite right. No worries though, here are some friendly steps to help you fix it: 1. Check Auto-Subscribe Configuration: Make sure
autoSubscribe
is turned on if you want subscriptions to happen automatically. If it’s off, you’ll need to manage subscriptions yourself. 2. Set the Subscribed Flag Manually: If you’ve got
autoSubscribe
disabled, don’t forget to set the subscribed flag manually for your tracks. Here’s a quick example:
Copy code
if err := pub.SetSubscribed(true); err != nil {
    log.Errorw("cannot subscribe to the track", err)
    return
}
3. Ensure Room is Ready: Double-check that the room is fully connected before you start handling track subscriptions. Using connection state callbacks can really help confirm this. 4. Handle Manual Subscriptions: If you’re going the manual route with subscriptions, make sure to process them only after the room is ready. Something like this:
Copy code
func (r *Room) OnTrackPublished(...) {
    if !r.subscribe.Load() {
        log.Debugw("skipping track publish event - subscribed flag not set")
        return
    }
    // Handle subscription
}
5. Check for Race Conditions: It’s a good idea to use synchronization tools like
core.Fuse
to keep your events in the right order and avoid any timing issues. 6. Review Room Configuration: Take a moment to verify your
RoomConfig
has all the essentials, like
WsUrl
,
RoomName
, and participant info. Give these a try, and hopefully, that clears things up! If you want to chat more about any of these steps or need a hand, just let me know—I’m here to help.
o
please share code which fix this
t
Here’s a little code snippet that might help you make sure the subscribed flag is set just right:
Copy code
// Make sure the room is ready before handling track subscriptions
func (r *Room) OnTrackPublished(pub *TrackPublication) {
    if !r.subscribe.Load() { // Check if the subscribed flag is set
        log.Debugw("skipping track publish event - subscribed flag not set")
        return
    }

    // Set the subscribed flag manually
    if err := pub.SetSubscribed(true); err != nil {
        log.Errorw("cannot subscribe to the track", err)
        return
    }

    log.Infow("Track successfully subscribed", "trackID", pub.TrackID())
}
Try plugging this into your track subscription flow and see how it goes. And hey, if anything’s unclear or you want me to walk you through more examples, just give me a shout!
o
How to use this code in my BOT
t
Here’s a friendly step-by-step guide to help you integrate the provided code into your bot: 1. Set Up Track Subscriptions: When you connect to the room, make sure to disable auto-subscription. It’s as simple as this:
room = await room.connect(url, token, options=rtc.RoomOptions(auto_subscribe=False))
2. Subscribe to Tracks: When tracks get published, you’ll want to subscribe to them using the
TrackPublication
object: `@room.on("track_published") def on_track_published(publication, participant): publication.set_subscribed(True)`Don’t forget to also subscribe to any tracks that were published before your bot joined:
Copy code
for participant in room.remote_participants:
    for publication in participant.track_publications.values():
        publication.set_subscribed(True)
3. Handle Subscribed Tracks: Use the
track_subscribed
event to work with the tracks once subscribed. For example:
Copy code
@room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
    if track.kind == rtc.TrackKind.KIND_AUDIO:
        audio_stream = rtc.AudioStream(track)
        asyncio.create_task(process_audio_stream(audio_stream, participant))
4. Process Audio Tracks: Here’s a handy function to handle those audio streams:
Copy code
async def process_audio_stream(audio_stream, participant):
    try:
        async for event in audio_stream:
            # Process the audio frame here
            pass
    except Exception as e:
        print(f"Error processing audio stream from {participant.identity}: {e}")
    finally:
        await audio_stream.aclose()
If anything’s unclear or you want a hand with the implementation, just let me know—I’m here to help!
o
Can you please share complete code
t
Hey there! Here’s a complete example to help you integrate track subscription handling into your LiveKit bot. I’ve broken it down so it’s easy to follow:
Copy code
import asyncio
import logging
from livekit import rtc, api

async def main():
    async def on_track_subscribed(track, publication, participant):
        if track.kind == rtc.TrackKind.KIND_AUDIO:
            audio_stream = rtc.AudioStream(track)
            asyncio.create_task(process_audio_stream(audio_stream, participant))

    async def process_audio_stream(audio_stream, participant):
        try:
            async for event in audio_stream:
                # Here’s where you can process the audio frame
                pass
        except Exception as e:
            logging.error(f"Oops! There was an error processing audio from {participant.identity}: {e}")
        finally:
            await audio_stream.aclose()

    # Setting up logging so you can keep an eye on what’s happening
    logging.basicConfig(level=<http://logging.INFO|logging.INFO>)

    # Create an access token with the right permissions
    token = api.AccessToken()
    .with_identity("my-bot")
    .with_grants(api.VideoGrants(room_join=True, room="my-room"))
    .to_jwt()

    # Create the room instance
    room = rtc.Room()

    # Connect to the room, and we’re disabling auto_subscribe for manual control
    await room.connect(
        "<wss://your-livekit-server.com>",
        token,
        options=rtc.RoomOptions(auto_subscribe=False)
    )

    # Set up event handlers to manage tracks smoothly
    room.on("track_published")(lambda pub, p: pub.set_subscribed(True))
    room.on("track_subscribed")(on_track_subscribed)
    room.on("track_unsubscribed")(lambda track, pub, p: <http://logging.info|logging.info>(f"Track unsubscribed: {pub.sid} from {p.identity}"))

    # Keep the bot up and running
    while True:
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())
Here’s a quick rundown of what this code does: • Connects to a LiveKit room with
auto_subscribe
turned off, giving you full control. • Manually subscribes to tracks as they get published. • Handles the subscribed tracks and processes audio streams in real-time. • Keeps track of events by logging them, so you always know what’s going on. Just swap out the placeholders like
"<wss://your-livekit-server.com>"
and
"my-room"
with your actual server URL and room name. If you hit any bumps or want to chat more about this, I’m here to help!
o
I am sharing my code with you can u please integrate this in that code import asyncio import os import uuid from dotenv import load_dotenv from livekit.agents import JobContext, WorkerOptions, cli, AutoSubscribe,RoomInputOptions,RoomOutputOptions from livekit.agents.voice import Agent, AgentSession from livekit.plugins import openai, deepgram, cartesia, silero,azure from livekit import rtc from livekit.plugins.turn_detector.multilingual import MultilingualModel load_dotenv() # Configuration LIVEKIT_URL = os.getenv("LIVEKIT_URL") LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY") LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET") SIP_TRUNK_ID = os.getenv("SIP_TRUNK_ID") SIP_CALL_TO = os.getenv("SIP_CALL_TO", "+1234567890") #OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") ROOM_NAME = f"room-{uuid.uuid4()}" azure_copilotgpt_endpoint = os.getenv("azure_copilotgpt_endpoint") azure_copilotgpt_deployment = os.getenv("azure_copilotgpt_deployment") api_copilotgpt_key = os.getenv("api_copilotgpt_key") api_copilotgpt_version = os.getenv("api_copilotgpt_version") class OutboundBot(Agent): def __init__(self): super().__init__( instructions="You are a helpful voice assistant making an outbound call." ) async def on_enter(self): await self.session.say("Hello! This is an automated call. How can I help you today?") async def entrypoint(ctx: JobContext): await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY) session = AgentSession( stt=openai.STT.with_azure(model="whisper"), llm=openai.LLM.with_azure( azure_endpoint=azure_copilotgpt_endpoint, azure_deployment=azure_copilotgpt_deployment, api_key=api_copilotgpt_key, api_version=api_copilotgpt_version ), tts=azure.TTS(voice="en-IN-AnanyaNeural"), vad=silero.VAD.load(), turn_detection=MultilingualModel(), ) # Start the agent session to initialize room properly await session.start( agent=OutboundBot(), room=ctx.room, room_input_options=RoomInputOptions(audio_enabled=True), room_output_options=RoomOutputOptions(audio_enabled=True), ) # Add a small delay to ensure room is fully initialized await asyncio.sleep(3) # Create outbound call await ctx.add_sip_participant( call_to=os.getenv("SIP_CALL_TO"), trunk_id=os.getenv("SIP_TRUNK_ID"), participant_identity="magic-sip-caller", # Add this required parameter participant_name="MAGIC-SIP Caller" ) #Wait for call to connect participant = await ctx.wait_for_participant( kind=rtc.ParticipantKind.PARTICIPANT_KIND_SIP ) if name == "__main__": cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint,agent_name="magicbricks-outbound-bot"))
t
Thanks so much for sharing your code! I've gone ahead and integrated the track subscription handling logic into your bot. Here’s the updated version:
Copy code
import asyncio
import os
import uuid
from dotenv import load_dotenv
from livekit.agents import JobContext, WorkerOptions, cli, AutoSubscribe, RoomInputOptions, RoomOutputOptions
from livekit.agents.voice import Agent, AgentSession
from livekit.plugins import openai, deepgram, cartesia, silero, azure
from livekit import rtc
from livekit.plugins.turn_detector.multilingual import MultilingualModel

load_dotenv()

# Configuration
LIVEKIT_URL = os.getenv("LIVEKIT_URL")
LIVEKIT_API_KEY = os.getenv("LIVEKIT_API_KEY")
LIVEKIT_API_SECRET = os.getenv("LIVEKIT_API_SECRET")
SIP_TRUNK_ID = os.getenv("SIP_TRUNK_ID")
SIP_CALL_TO = os.getenv("SIP_CALL_TO", "+1234567890")
ROOM_NAME = f"room-{uuid.uuid4()}"
azure_copilotgpt_endpoint = os.getenv("azure_copilotgpt_endpoint")
azure_copilotgpt_deployment = os.getenv("azure_copilotgpt_deployment")
api_copilotgpt_key = os.getenv("api_copilotgpt_key")
api_copilotgpt_version = os.getenv("api_copilotgpt_version")

class OutboundBot(Agent):
    def __init__(self):
        super().__init__(
            instructions="You are a helpful voice assistant making an outbound call."
        )

    async def on_enter(self):
        await self.session.say("Hello! This is an automated call. How can I help you today?")

async def entrypoint(ctx: JobContext):

    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

    session = AgentSession(
        stt=openai.STT.with_azure(model="whisper"),
        llm=openai.LLM.with_azure(
            azure_endpoint=azure_copilotgpt_endpoint,
            azure_deployment=azure_copilotgpt_deployment,
            api_key=api_copilotgpt_key,
            api_version=api_copilotgpt_version
        ),
        tts=azure.TTS(voice="en-IN-AnanyaNeural"),
        vad=silero.VAD.load(),
        turn_detection=MultilingualModel(),
    )

    # Start the agent session to initialize room properly
    await session.start(
        agent=OutboundBot(),
        room=ctx.room,
        room_input_options=RoomInputOptions(audio_enabled=True),
        room_output_options=RoomOutputOptions(audio_enabled=True),
    )

    # Add a small delay to ensure room is fully initialized
    await asyncio.sleep(3)

    # Create outbound call
    await ctx.add_sip_participant(
        call_to=os.getenv("SIP_CALL_TO"),
        trunk_id=os.getenv("SIP_TRUNK_ID"),
        participant_identity="magic-sip-caller",  # Add this required parameter
        participant_name="MAGIC-SIP Caller"
    )

    # Wait for call to connect
    participant = await ctx.wait_for_participant(
        kind=rtc.ParticipantKind.PARTICIPANT_KIND_SIP
    )

    # Handle track subscriptions
    @ctx.room.on("track_published")
    async def on_track_published(publication, participant):
        publication.set_subscribed(True)

    @ctx.room.on("track_subscribed")
    async def on_track_subscribed(track, publication, participant):
        if track.kind == rtc.TrackKind.KIND_AUDIO:
            audio_stream = rtc.AudioStream(track)
            asyncio.create_task(process_audio_stream(audio_stream, participant))

    async def process_audio_stream(audio_stream, participant):
        try:
            async for event in audio_stream:
                # Process the audio frame here
                pass
        except Exception as e:
            print(f"Error processing audio stream from {participant.identity}: {e}")
        finally:
            await audio_stream.aclose()

if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, agent_name="magicbricks-outbound-bot"))
I hope this helps! If you’d like me to tweak anything or if you have more questions, just let me know—I’m here to help.
o
I am getting this error 2025-08-11 184449,986 - ERROR livekit.agents - unhandled exception while running the job task Traceback (most recent call last): File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/opentelemetry/util/_decorator.py", line 71, in async_wrapper return await func(*args, **kwargs) # type: ignore File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/agents/ipc/job_proc_lazy_main.py", line 240, in _traceable_entrypoint await self._job_entrypoint_fnc(job_ctx) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py", line 77, in entrypoint async def on_track_published(publication, participant): File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/event_emitter.py", line 172, in decorator self.on(event, callback) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/room.py", line 388, in on return super().on(event, callback) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/event_emitter.py", line 161, in on raise ValueError( ValueError: Cannot register an async callback with
.on()
. Use
asyncio.create_task
within your synchronous callback instead. {"pid": 18534, "job_id": "AJ_fKyTQUEeAhUn"}
t
Hey there! The error happens because LiveKit's EventEmitter’s
.on()
method expects a synchronous callback, but it looks like an asynchronous function was passed in. No worries though — you can fix this by wrapping your async callback inside a synchronous function and then using
asyncio.create_task
to handle the async part. Here’s a quick example to help you out:
Copy code
# Define your async callback
async def on_track_published_async(publication, participant):
    # Your async logic here
    print(f"Track published: {publication.track_sid}")

# Wrap the async callback in a synchronous function
def on_track_published_sync(publication, participant):
    asyncio.create_task(on_track_published_async(publication, participant))

# Register the synchronous wrapper with .on()
ctx.room.on("track_published", on_track_published_sync)
Just swap out your current
on_track_published
registration with this setup. If you hit any snags or want me to walk you through it, feel free to ask — I’m here to help!
o
2025-08-11 193035,072 - ERROR livekit.agents - unhandled exception while running the job task Traceback (most recent call last): File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/opentelemetry/util/_decorator.py", line 71, in async_wrapper return await func(*args, **kwargs) # type: ignore File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/agents/ipc/job_proc_lazy_main.py", line 240, in _traceable_entrypoint await self._job_entrypoint_fnc(job_ctx) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py", line 88, in entrypoint async def on_track_subscribed(track, publication, participant): File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/event_emitter.py", line 172, in decorator self.on(event, callback) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/room.py", line 388, in on return super().on(event, callback) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/event_emitter.py", line 161, in on raise ValueError( ValueError: Cannot register an async callback with
.on()
. Use
asyncio.create_task
within your synchronous callback instead. {"pid": 19829, "job_id": "AJ_XAEyM7ydqXV9"} 2025-08-11 193035,095 - ERROR asyncio - Task exception was never retrieved future: <Task finished name='Task-64' coro=<entrypoint.<locals>.on_track_published_async() done, defined at /Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py:76> exception=AttributeError("'RemoteTrackPublication' object has no attribute 'track_sid'")> Traceback (most recent call last): File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py", line 78, in on_track_published_async print(f"Track published: {publication.track_sid}") AttributeError: 'RemoteTrackPublication' object has no attribute 'track_sid' {"pid": 19829, "job_id": "AJ_XAEyM7ydqXV9"}
t
Hey there! The error you're seeing happens because the
RemoteTrackPublication
object doesn’t have a
track_sid
attribute. Instead, you’ll want to use the
sid
property to get the track ID. Here’s the updated code to help you out:
Copy code
# Corrected async callback
def on_track_published_sync(publication, participant):
    asyncio.create_task(on_track_published_async(publication, participant))

async def on_track_published_async(publication, participant):
    # Use the 'sid' property to access the track ID
    print(f"Track published: {publication.sid}")
Just swap out
publication.track_sid
for
publication.sid
in your code. If you run into any other issues or have questions, feel free to ask—I’m here to help!
o
2025-08-11 194445,781 - ERROR livekit.agents - unhandled exception while running the job task Traceback (most recent call last): File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/opentelemetry/util/_decorator.py", line 71, in async_wrapper return await func(*args, **kwargs) # type: ignore File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/agents/ipc/job_proc_lazy_main.py", line 240, in _traceable_entrypoint await self._job_entrypoint_fnc(job_ctx) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py", line 88, in entrypoint async def on_track_subscribed(track, publication, participant): File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/event_emitter.py", line 172, in decorator self.on(event, callback) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/room.py", line 388, in on return super().on(event, callback) File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/.venv/lib/python3.9/site-packages/livekit/rtc/event_emitter.py", line 161, in on raise ValueError( ValueError: Cannot register an async callback with
.on()
. Use
asyncio.create_task
within your synchronous callback instead. {"pid": 20497, "job_id": "AJ_zxTSgfKPGSiF"} 2025-08-11 194445,796 - ERROR asyncio - Task exception was never retrieved future: <Task finished name='Task-64' coro=<entrypoint.<locals>.on_track_published_async() done, defined at /Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py:76> exception=AttributeError("'RemoteTrackPublication' object has no attribute 'track_sid'")> Traceback (most recent call last): File "/Users/subodh.kumar2/Desktop/Ai-voice/aibot-livekit/callout.py", line 78, in on_track_published_async print(f"Track published: {publication.track_sid}") AttributeError: 'RemoteTrackPublication' object has no attribute 'track_sid' {"pid": 20497, "job_id": "AJ_zxTSgfKPGSiF"}
t
It looks like the error is still happening because the callback registration and the attribute access haven’t been fully fixed yet. No worries though! Here’s the updated version that should do the trick: 1. Fix the Callback Registration: We’ll wrap the async callback in a synchronous function and use
asyncio.create_task
to handle the async part smoothly.
Copy code
# Corrected callback registration
def on_track_published_sync(publication, participant):
    asyncio.create_task(on_track_published_async(publication, participant))

ctx.room.on("track_published", on_track_published_sync)
2. Fix the Attribute Access: Instead of
publication.track_sid
, use
publication.sid
to get the track ID correctly.
Copy code
# Corrected async callback
async def on_track_published_async(publication, participant):
    # Use the 'sid' property to access the track ID
    print(f"Track published: {publication.sid}")
Make sure to update both parts in your code. If you hit any other snags or have questions, just let me know—I’m here to help!
o
skipping participant join event - subscribed flag not set
t
That error message, "skipping participant join event - subscribed flag not set," usually pops up when the subscription setup isn’t quite right. No worries though, here are some friendly steps to help you fix it: 1. Enable Auto-Subscribe: Make sure
auto_subscribe
is set to
True
in your Room configuration. This way, tracks will automatically subscribe when participants join.
room.connect(url, token, options=RoomOptions(auto_subscribe=True))
2. Verify Room Ready State: Double-check that the room is fully initialized and ready before you handle participant join events. 3. Manually Manage Subscriptions: If you have
auto_subscribe
set to
False
, you’ll want to manually subscribe to tracks when someone joins:
Copy code
@room.on("participant_joined")
def on_participant_joined(participant):
    for publication in participant.track_publications.values():
        publication.set_subscribed(True)
If you want me to walk you through any of these steps or if something isn’t clear, just let me know—I’m here to help!