Hello everyone, Is there any documentation around...
# ask-ai
e
Hello everyone, Is there any documentation around subscribing to text messages from the livekit room in an agent session to respond to the user via both voice and text in streaming mode? Agent is written in python
t
Thanks for reaching out, Avinish. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
Hey there! If you want to subscribe to text messages from a LiveKit room during an agent session and respond with both voice and text in streaming mode using Python, here’s a friendly step-by-step guide to help you out: 1. Set Up Your Room Input and Output Options: ◦ Turn on text input by setting
text_enabled=True
in
RoomInputOptions
. ◦ Enable transcription output by setting
transcription_enabled=True
in
RoomOutputOptions
. ◦ To get real-time streaming, disable syncing transcription with audio by setting
sync_transcription=False
. 1. Get Your Agent Ready and Running: ◦ Create a
MultimodalAgent
instance with the right model configuration. ◦ Start the agent session using your configured options so it can handle both text and audio smoothly. 1. Manage Text Input and Craft Responses: ◦ Use the
_on_user_text_input
callback to catch incoming text messages and respond. ◦ The
generate_reply()
method helps you create responses in both voice and text formats. Here’s a simple example to get you started:
Copy code
from livekit.agents.voice import RoomIO, RoomInputOptions, RoomOutputOptions
from livekit.agents import AutoSubscribe, JobContext
from livekit.agents.multimodal import MultimodalAgent
from livekit import rtc

async def run_multimodal_agent(job_context: JobContext, participant: rtc.RemoteParticipant):
    # Set up RoomIO options
    input_options = RoomInputOptions(text_enabled=True)
    output_options = RoomOutputOptions(
        transcription_enabled=True,
        sync_transcription=False  # This lets transcription stream in real-time
    )

    # Initialize your agent
    model = OpenAIModel(modalities=["text", "audio"])
    agent = MultimodalAgent(model=model)
    
    # Start the agent with your options
    agent.start(job_context.room, participant, input_options=input_options, output_options=output_options)

    # Kick things off with a friendly greeting
    agent.generate_reply("Hello! How can I assist you today?")
If you want to dive deeper, check out these handy resources: • LiveKit Agents DocumentationGitHub Examples This setup makes it easy for your agent to chat in real-time, responding with both voice and text as the conversation flows. If you have any questions, just ask—I’m here to help!
e
I am using the STT-LLM-TTS pipeline
I am using the STT-LLM-TTS pipeline
t
Hey there! If you’re looking to set up the STT-LLM-TTS pipeline in LiveKit so your app can subscribe to text messages and reply using both voice and text, the
VoicePipelineAgent
class is your friend. Here’s a simple walkthrough: 1. Initialize the Agent: ◦ Start by creating an instance of
VoicePipelineAgent
with your STT, LLM, and TTS providers. You can also add optional VAD (voice activity detection) if you want. 1. Customize the Pipeline: ◦ If you want to tweak how things flow, you can override nodes like
stt_node
,
llm_node
, or
tts_node
. 1. Handle Events: ◦ Use lifecycle hooks such as
on_enter
,
on_exit
, and
on_user_turn_completed
to keep the conversation running smoothly. 1. Start the Agent: ◦ Finally, call
start()
on your
VoicePipelineAgent
instance, specifying the room and participant you want to listen to. Here’s an example to get you going:
Copy code
import logging
from typing import AsyncIterable
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import (
    AutoSubscribe,
    JobContext,
    JobProcess,
    WorkerOptions,
    cli,
    llm,
    metrics,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, silero

load_dotenv(dotenv_path=Path(__file__).parent / '.env')
logger = logging.getLogger("voice-assistant")

def prewarm(proc: JobProcess):
    proc.userdata["vad"] = silero.VAD.load()

async def entrypoint(ctx: JobContext):
    initial_ctx = llm.ChatContext().append({
        role="system",
        text=(
            "You are a voice assistant created by LiveKit that can respond via both voice and text. "
            "You should use short and concise responses, avoiding unpronounceable punctuation."
        ),
    })

    <http://logger.info|logger.info>(f"Connecting to room {ctx.room.name}")
    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

    participant = await ctx.wait_for_participant()
    <http://logger.info|logger.info>(f"Starting voice assistant for participant {participant.identity}")

    agent = VoicePipelineAgent(
        vad=ctx.proc.userdata["vad"],
        stt=openai.STT.with_groq(),
        llm=openai.LLM.with_groq(model="deepseek-r1-distill-llama-70b"),
        tts=openai.TTS(),
        chat_ctx=initial_ctx,
    )

    # Example of modifying text before TTS
    async def _before_tts_cb(agent: VoicePipelineAgent, text: str | AsyncIterable[str]):
        if isinstance(text, str):
            result = text.replace("Example", "Example Modified")
            return result
        else:
            async def process_stream():
                async for chunk in text:
                    processed = chunk.replace("Example", "Example Modified")
                    yield processed
            return process_stream()

    agent.before_tts_cb = _before_tts_cb

    agent.start(ctx.room, participant)

    # Example of sending a text response
    await agent.send_message("Hello! How can I assist you today?")
    await agent.say("Hello! How can I assist you today?")

    usage_collector = metrics.UsageCollector()

    @agent.on("metrics_collected")
    def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
        metrics.log_metrics(mtrcs)
        usage_collector.collect(mtrcs)

    async def log_usage():
        summary = usage_collector.get_summary()
        <http://logger.info|logger.info>(f"Usage: ${summary}")

    ctx.add_shutdown_callback(log_usage)

if __name__ == "__main__":
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=entrypoint,
            prewarm_fnc=prewarm,
        ),
    )
With this setup, your agent will listen through STT, process what’s said using the LLM, and respond both with voice via TTS and text messages. If you have any questions or want me to walk you through any part, just let me know—I’m here to help! Sources: Pipeline nodes and hooks | LiveKit Docs | VoicePipelineAgent | LiveKit Agents
Hey there! If you want to set up the STT-LLM-TTS pipeline in LiveKit to subscribe to text messages and reply using both voice and text, the
VoicePipelineAgent
class is your friend. Here’s a simple step-by-step to get you going: 1. Initialize the Agent: ◦ Start by creating an instance of
VoicePipelineAgent
with your STT, LLM, and TTS providers. You can also add optional VAD for voice activity detection. 1. Customize the Pipeline: ◦ If you want to tweak how things flow, you can override nodes like
stt_node
,
llm_node
, or
tts_node
. 1. Handle Events: ◦ Use lifecycle hooks such as
on_enter
,
on_exit
, and
on_user_turn_completed
to smoothly manage the conversation. 1. Start the Agent: ◦ Just call the
start()
method on your
VoicePipelineAgent
instance, and specify the room and participant you want to listen to. Here’s a friendly example to illustrate:
Copy code
import logging
from typing import AsyncIterable
from dotenv import load_dotenv
from pathlib import Path
from livekit.agents import (
    AutoSubscribe,
    JobContext,
    JobProcess,
    WorkerOptions,
    cli,
    llm,
    metrics,
)
from livekit.agents.pipeline import VoicePipelineAgent
from livekit.plugins import openai, silero

load_dotenv(dotenv_path=Path(__file__).parent / '.env')
logger = logging.getLogger("voice-assistant")

def prewarm(proc: JobProcess):
    proc.userdata["vad"] = silero.VAD.load()

async def entrypoint(ctx: JobContext):
    initial_ctx = llm.ChatContext().append({
        role="system",
        text=(
            "You are a voice assistant created by LiveKit that can respond via both voice and text. "
            "You should use short and concise responses, avoiding unpronounceable punctuation."
        ),
    })

    <http://logger.info|logger.info>(f"Connecting to room {ctx.room.name}")
    await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)

    participant = await ctx.wait_for_participant()
    <http://logger.info|logger.info>(f"Starting voice assistant for participant {participant.identity}")

    agent = VoicePipelineAgent(
        vad=ctx.proc.userdata["vad"],
        stt=openai.STT.with_groq(),
        llm=openai.LLM.with_groq(model="deepseek-r1-distill-llama-70b"),
        tts=openai.TTS(),
        chat_ctx=initial_ctx,
    )

    # Here’s a little example of changing text before TTS
    async def _before_tts_cb(agent: VoicePipelineAgent, text: str | AsyncIterable[str]):
        if isinstance(text, str):
            result = text.replace("Example", "Example Modified")
            return result
        else:
            async def process_stream():
                async for chunk in text:
                    processed = chunk.replace("Example", "Example Modified")
                    yield processed
            return process_stream()

    agent.before_tts_cb = _before_tts_cb

    agent.start(ctx.room, participant)

    # Sending a quick text response
    await agent.send_message("Hello! How can I assist you today?")
    await agent.say("Hello! How can I assist you today?")

    usage_collector = metrics.UsageCollector()

    @agent.on("metrics_collected")
    def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
        metrics.log_metrics(mtrcs)
        usage_collector.collect(mtrcs)

    async def log_usage():
        summary = usage_collector.get_summary()
        <http://logger.info|logger.info>(f"Usage: ${summary}")

    ctx.add_shutdown_callback(log_usage)

if __name__ == "__main__":
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=entrypoint,
            prewarm_fnc=prewarm,
        ),
    )
This setup lets your agent listen with STT, understand with LLM, and reply using both voice (via TTS) and text messages. If you have any questions or want to dive deeper, just let me know—I’m here to help!