elegant-optician-29026
08/09/2025, 4:08 PMtall-airline-16756
08/09/2025, 4:08 PMtall-airline-16756
08/09/2025, 4:08 PMtext_enabled=True
in RoomInputOptions
.
◦ Enable transcription output by setting transcription_enabled=True
in RoomOutputOptions
.
◦ To get real-time streaming, disable syncing transcription with audio by setting sync_transcription=False
.
1. Get Your Agent Ready and Running:
◦ Create a MultimodalAgent
instance with the right model configuration.
◦ Start the agent session using your configured options so it can handle both text and audio smoothly.
1. Manage Text Input and Craft Responses:
◦ Use the _on_user_text_input
callback to catch incoming text messages and respond.
◦ The generate_reply()
method helps you create responses in both voice and text formats.
Here’s a simple example to get you started:
from livekit.agents.voice import RoomIO, RoomInputOptions, RoomOutputOptions
from livekit.agents import AutoSubscribe, JobContext
from livekit.agents.multimodal import MultimodalAgent
from livekit import rtc
async def run_multimodal_agent(job_context: JobContext, participant: rtc.RemoteParticipant):
# Set up RoomIO options
input_options = RoomInputOptions(text_enabled=True)
output_options = RoomOutputOptions(
transcription_enabled=True,
sync_transcription=False # This lets transcription stream in real-time
)
# Initialize your agent
model = OpenAIModel(modalities=["text", "audio"])
agent = MultimodalAgent(model=model)
# Start the agent with your options
agent.start(job_context.room, participant, input_options=input_options, output_options=output_options)
# Kick things off with a friendly greeting
agent.generate_reply("Hello! How can I assist you today?")
If you want to dive deeper, check out these handy resources:
• LiveKit Agents Documentation
• GitHub Examples
This setup makes it easy for your agent to chat in real-time, responding with both voice and text as the conversation flows. If you have any questions, just ask—I’m here to help!elegant-optician-29026
08/09/2025, 4:12 PMelegant-optician-29026
08/09/2025, 4:13 PMtall-airline-16756
08/09/2025, 4:13 PMVoicePipelineAgent
class is your friend. Here’s a simple walkthrough:
1. Initialize the Agent:
◦ Start by creating an instance of VoicePipelineAgent
with your STT, LLM, and TTS providers. You can also add optional VAD (voice activity detection) if you want.
1. Customize the Pipeline:
◦ If you want to tweak how things flow, you can override nodes like stt_node
, llm_node
, or tts_node
.
1. Handle Events:
◦ Use lifecycle hooks such as on_enter
, on_exit
, and on_user_turn_completed
to keep the conversation running smoothly.
1. Start the Agent:
◦ Finally, call start()
on your VoicePipelineAgent
instance, specifying the room and participant you want to listen to.
Here’s an example to get you going:
import logging
from typing import AsyncIterable
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
metrics,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, silero
load_dotenv(dotenv_path=Path(__file__).parent / '.env')
logger = logging.getLogger("voice-assistant")
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append({
role="system",
text=(
"You are a voice assistant created by LiveKit that can respond via both voice and text. "
"You should use short and concise responses, avoiding unpronounceable punctuation."
),
})
<http://logger.info|logger.info>(f"Connecting to room {ctx.room.name}")
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
participant = await ctx.wait_for_participant()
<http://logger.info|logger.info>(f"Starting voice assistant for participant {participant.identity}")
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=openai.STT.with_groq(),
llm=openai.LLM.with_groq(model="deepseek-r1-distill-llama-70b"),
tts=openai.TTS(),
chat_ctx=initial_ctx,
)
# Example of modifying text before TTS
async def _before_tts_cb(agent: VoicePipelineAgent, text: str | AsyncIterable[str]):
if isinstance(text, str):
result = text.replace("Example", "Example Modified")
return result
else:
async def process_stream():
async for chunk in text:
processed = chunk.replace("Example", "Example Modified")
yield processed
return process_stream()
agent.before_tts_cb = _before_tts_cb
agent.start(ctx.room, participant)
# Example of sending a text response
await agent.send_message("Hello! How can I assist you today?")
await agent.say("Hello! How can I assist you today?")
usage_collector = metrics.UsageCollector()
@agent.on("metrics_collected")
def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
metrics.log_metrics(mtrcs)
usage_collector.collect(mtrcs)
async def log_usage():
summary = usage_collector.get_summary()
<http://logger.info|logger.info>(f"Usage: ${summary}")
ctx.add_shutdown_callback(log_usage)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
),
)
With this setup, your agent will listen through STT, process what’s said using the LLM, and respond both with voice via TTS and text messages. If you have any questions or want me to walk you through any part, just let me know—I’m here to help!
Sources: Pipeline nodes and hooks | LiveKit Docs | VoicePipelineAgent | LiveKit Agentstall-airline-16756
08/09/2025, 4:13 PMVoicePipelineAgent
class is your friend. Here’s a simple step-by-step to get you going:
1. Initialize the Agent:
◦ Start by creating an instance of VoicePipelineAgent
with your STT, LLM, and TTS providers. You can also add optional VAD for voice activity detection.
1. Customize the Pipeline:
◦ If you want to tweak how things flow, you can override nodes like stt_node
, llm_node
, or tts_node
.
1. Handle Events:
◦ Use lifecycle hooks such as on_enter
, on_exit
, and on_user_turn_completed
to smoothly manage the conversation.
1. Start the Agent:
◦ Just call the start()
method on your VoicePipelineAgent
instance, and specify the room and participant you want to listen to.
Here’s a friendly example to illustrate:
import logging
from typing import AsyncIterable
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import (
AutoSubscribe,
JobContext,
JobProcess,
WorkerOptions,
cli,
llm,
metrics,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, silero
load_dotenv(dotenv_path=Path(__file__).parent / '.env')
logger = logging.getLogger("voice-assistant")
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
initial_ctx = llm.ChatContext().append({
role="system",
text=(
"You are a voice assistant created by LiveKit that can respond via both voice and text. "
"You should use short and concise responses, avoiding unpronounceable punctuation."
),
})
<http://logger.info|logger.info>(f"Connecting to room {ctx.room.name}")
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
participant = await ctx.wait_for_participant()
<http://logger.info|logger.info>(f"Starting voice assistant for participant {participant.identity}")
agent = VoicePipelineAgent(
vad=ctx.proc.userdata["vad"],
stt=openai.STT.with_groq(),
llm=openai.LLM.with_groq(model="deepseek-r1-distill-llama-70b"),
tts=openai.TTS(),
chat_ctx=initial_ctx,
)
# Here’s a little example of changing text before TTS
async def _before_tts_cb(agent: VoicePipelineAgent, text: str | AsyncIterable[str]):
if isinstance(text, str):
result = text.replace("Example", "Example Modified")
return result
else:
async def process_stream():
async for chunk in text:
processed = chunk.replace("Example", "Example Modified")
yield processed
return process_stream()
agent.before_tts_cb = _before_tts_cb
agent.start(ctx.room, participant)
# Sending a quick text response
await agent.send_message("Hello! How can I assist you today?")
await agent.say("Hello! How can I assist you today?")
usage_collector = metrics.UsageCollector()
@agent.on("metrics_collected")
def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
metrics.log_metrics(mtrcs)
usage_collector.collect(mtrcs)
async def log_usage():
summary = usage_collector.get_summary()
<http://logger.info|logger.info>(f"Usage: ${summary}")
ctx.add_shutdown_callback(log_usage)
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=entrypoint,
prewarm_fnc=prewarm,
),
)
This setup lets your agent listen with STT, understand with LLM, and reply using both voice (via TTS) and text messages. If you have any questions or want to dive deeper, just let me know—I’m here to help!