fierce-jordan-44682
09/11/2025, 2:36 AMtall-airline-16756
09/11/2025, 2:36 AMtall-airline-16756
09/11/2025, 2:37 AM@session.on("conversation_item_added")
in LiveKit without any special setup. The LiveKit Agents framework is designed to support asynchronous event handlers seamlessly, so you can easily perform non-blocking tasks like database logging or calling external APIs.
Here’s a friendly example of how an async handler might look:
from livekit.agents import ConversationItemAddedEvent
@session.on("conversation_item_added")
async def on_conversation_item_added(event: ConversationItemAddedEvent):
print(f"Async handler: Conversation item added from {event.item.role}: {event.item.text_content}")
# You can do async operations here, for example:
# await log_to_database(event.item)
# await publish_to_messaging_queue(event.item)
Just so you know, the conversation_item_added
event fires every time a new item is added to the chat history. It gives you full access to the chat message through event.item
, which might include text, images, or audio content. If you have any questions about this or want to explore more, I’m here to help!
Sources: livekit-agents/livekit/agents/voice/agent_activity.py | Events and error handling | LiveKit Docstall-airline-16756
09/11/2025, 2:37 AM@session.on("conversation_item_added")
in LiveKit without any extra setup. The LiveKit Agents framework is designed to support both synchronous and asynchronous event handlers seamlessly.
Let me share a simple example of an async handler with you:
from livekit.agents import ConversationItemAddedEvent
@session.on("conversation_item_added")
async def on_conversation_item_added(event: ConversationItemAddedEvent):
print(f"Async handler: Conversation item added from {event.item.role}: {event.item.text_content}")
# You can perform async operations here, like:
# await log_to_database(event.item)
# await publish_to_messaging_queue(event.item)
The conversation_item_added
event fires whenever a new conversation item is added to the chat history. It gives you access to the full chat message through event.item
, letting you handle tasks like database logging or API calls in a non-blocking, efficient way.fierce-jordan-44682
09/11/2025, 2:58 AM.on()
. Use asyncio.create_task
within your synchronous callback instead.
But i got this errortall-airline-16756
09/11/2025, 2:58 AM.on()
method to be synchronous functions, so it doesn’t allow asynchronous functions (those defined with async def
) to be used directly as callbacks.
But don’t worry! There’s a simple way to work around this. You can register a synchronous function with .on()
and then use asyncio.create_task()
inside that function to run your asynchronous code. Here’s an example to help clarify:
# This will cause the error\n@room.on("track_subscribed")\nasync def on_track_subscribed(track, publication, participant):\n # async code here\n pass\n\n# A better way to do it\n@room.on("track_subscribed")\ndef on_track_subscribed(track, publication, participant):\n # Schedule the async function to run\n asyncio.create_task(handle_track_subscribed_async(track, publication, participant))\n\nasync def handle_track_subscribed_async(track, publication, participant):\n # Your original async code here\n pass
This approach keeps the event handler synchronous, as required, while still letting you handle asynchronous operations smoothly. If you have any more questions or want to walk through this together, I’m here to help!
Sources: README.mdtall-airline-16756
09/11/2025, 2:58 AMasync def
) directly as event callbacks using the .on()
method. This is because the event emitter system expects callbacks to be synchronous.
But don’t worry! You can easily work around this by registering a synchronous function with .on()
and then using asyncio.create_task()
inside that function to run your asynchronous code. Here’s a little example to help you out:
# This will cause the error\n@room.on("track_subscribed")\nasync def on_track_subscribed(track, publication, participant):\n # async code here\n pass\n\n# Here’s the right way to do it\n@room.on("track_subscribed")\ndef on_track_subscribed(track, publication, participant):\n # Create a task to run the async function\n asyncio.create_task(handle_track_subscribed_async(track, publication, participant))\n\nasync def handle_track_subscribed_async(track, publication, participant):\n # Your original async code here\n pass
This way, the event handler you register with .on()
stays synchronous, but you can still run your async operations smoothly when the event happens. I hope this helps clear things up! If you have any more questions, feel free to ask — I’m here to help.