bumpy-shampoo-17100
02/28/2025, 5:42 PMgray-monkey-83525
02/28/2025, 5:42 PMchat
method in the LLM class to integrate your custom logic. Refer to the LLM class documentation for details on how to implement this.
3. STT and TTS Setup: For STT and TTS, you can use existing plugins or create custom ones if needed. LiveKit provides plugins for various providers, but you can also implement your own by following the plugin framework documentation.
4. Deploy and Test: Once your custom server is set up, deploy it using a managed container orchestration environment like AWS ECS or fly.io. Ensure that your server is connected to the LiveKit network for real-time media and data exchange.
Would you like more detailed guidance on any specific part of this process?
Sources:
- LiveKit Agents documentation
- Plugin framework documentation
- LLM class documentation 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.bumpy-shampoo-17100
02/28/2025, 5:45 PMgray-monkey-83525
02/28/2025, 5:45 PMbumpy-shampoo-17100
02/28/2025, 5:49 PMgray-monkey-83525
02/28/2025, 5:49 PMSTTSegmentsForwarder
class to handle transcription events. This class allows you to receive real-time transcription updates as they are generated. Here's a brief overview of how you can achieve this:
1. Set Up STT Stream: Use a provider plugin like Deepgram to create an STT stream. Push audio frames to this stream to receive transcription events.
2. Handle Transcription Events: Implement a function to process the transcription events. You can listen for FINAL_TRANSCRIPT
and INTERIM_TRANSCRIPT
events to get the exact text updates.
3. Use Transcription Events: The STTSegmentsForwarder
can be used to forward these transcription events to your application or client.
Here's a basic example in Python:
from livekit import agents, rtc
from livekit.plugins import deepgram
from livekit.agents.stt import SpeechEventType, SpeechEvent
from typing import AsyncIterable
async def process_track(ctx: agents.JobContext, track: rtc.Track):
stt = deepgram.STT()
stt_stream = stt.stream()
audio_stream = rtc.AudioStream(track)
ctx.create_task(process_text_from_speech(stt_stream))
async for audio_event in audio_stream:
stt_stream.push_frame(audio_event.frame)
stt_stream.end_input()
async def process_text_from_speech(stream: AsyncIterable[SpeechEvent]):
async for event in stream:
if event.type == SpeechEventType.FINAL_TRANSCRIPT:
text = event.alternatives[0].text
# Do something with text
elif event.type == SpeechEventType.INTERIM_TRANSCRIPT:
pass
await stream.aclose()
Would you like more detailed guidance on implementing this in your application?
Sources:
- Working with plugins
- Transcriptions 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.bumpy-shampoo-17100
02/28/2025, 5:50 PMbumpy-shampoo-17100
02/28/2025, 5:52 PMgray-monkey-83525
02/28/2025, 5:52 PMSpeechStream
class from a provider plugin like Deepgram. Here's a basic example of how you can implement this:
1. Set Up STT Stream: Use a provider plugin to create an STT stream and push audio frames to it.
2. Handle Transcription Events: Listen for transcription events such as FINAL_TRANSCRIPT
to get the exact text updates.
Here's a basic example in Node.js:
const { agents, rtc } = require('livekit');
const { STT } = require('livekit/agents/plugins/deepgram');
async function processTrack(ctx, track) {
const stt = new STT();
const sttStream = stt.stream();
const audioStream = new rtc.AudioStream(track);
ctx.createTask(processTextFromSpeech(sttStream));
for await (const audioEvent of audioStream) {
sttStream.pushFrame(audioEvent.frame);
}
sttStream.endInput();
}
async function processTextFromSpeech(stream) {
for await (const event of stream) {
if (event.type === 'FINAL_TRANSCRIPT') {
const text = event.alternatives[0].text;
console.log('Final Transcript:', text);
// Do something with text
}
}
await stream.close();
}
This example demonstrates how to set up an STT stream and handle transcription events in Node.js. Would you like more detailed guidance on any specific part of this process?
Sources:
- https://docs.livekit.io/agents/openai/multimodal-agent/
- https://docs.livekit.io/agents/openai/multimodalagent/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.bumpy-shampoo-17100
02/28/2025, 7:41 PMgray-monkey-83525
02/28/2025, 7:41 PMINTERIM_TRANSCRIPT
and FINAL_TRANSCRIPT
events to get updates on what the user is speaking.
2. Generate Output When User Stops Speaking: Implement logic to detect when the user stops speaking, typically by listening for END_OF_SPEECH
events. You can then generate a response based on the final transcript.
3. Send Generated Response as TTS Stream: Use a TTS plugin to convert the generated text response into audio. Implement interruption handling to stop the TTS stream if needed.
Here's a basic example in Node.js:
const { agents, rtc } = require('livekit');
const { STT } = require('livekit/agents/plugins/deepgram');
const { TTS } = require('livekit/agents/plugins/elevenlabs');
async function processTrack(ctx, track) {
const stt = new STT();
const sttStream = stt.stream();
const audioStream = new rtc.AudioStream(track);
ctx.createTask(processTextFromSpeech(sttStream));
for await (const audioEvent of audioStream) {
sttStream.pushFrame(audioEvent.frame);
}
sttStream.endInput();
}
async function processTextFromSpeech(stream) {
let finalText = '';
for await (const event of stream) {
if (event.type === 'INTERIM_TRANSCRIPT') {
console.log('Interim:', event.alternatives[0].text);
} else if (event.type === 'FINAL_TRANSCRIPT') {
finalText = event.alternatives[0].text;
console.log('Final:', finalText);
} else if (event.type === 'END_OF_SPEECH') {
// Generate response when user stops speaking
const responseText = generateResponse(finalText);
await sendResponseAsTTS(responseText);
}
}
await stream.close();
}
function generateResponse(text) {
// Implement your logic to generate a response based on the final text
return `You said: ${text}`;
}
async function sendResponseAsTTS(text) {
const tts = new TTS();
const ttsStream = tts.stream();
ttsStream.pushText(text);
ttsStream.endInput();
for await (const audioFrame of ttsStream) {
// Send audio frame to client
console.log('Sending audio frame:', audioFrame);
}
await ttsStream.close();
}
// Example usage
// Assume `ctx` and `track` are provided by your application
// processTrack(ctx, track);
This example demonstrates how to handle real-time speech updates, generate a response when the user stops speaking, and send the response as a TTS stream. Would you like more detailed guidance on any specific part of this process?
Sources:
- Working with plugins
- Transcriptions 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.bumpy-shampoo-17100
02/28/2025, 7:46 PM// Example usage
// Assume `ctx` and `track` are provided by your application
// processTrack(ctx, track);
Can you provide this code along with the github repo examplegray-monkey-83525
02/28/2025, 7:46 PMctx
and track
, you can refer to the following code snippet. This example assumes you have a LiveKit room set up and are using the LiveKit SDK to connect and manage tracks.
Here's a complete example in Node.js:
const { Room, Track, RoomEvent } = require('livekit-client');
const { STT } = require('livekit/agents/plugins/deepgram');
const { TTS } = require('livekit/agents/plugins/elevenlabs');
async function connectToRoom(serverUrl, token) {
const room = new Room();
await room.connect(serverUrl, token);
console.log('Connected to room:', room.name);
room.on(RoomEvent.TrackSubscribed, (track, publication, participant) => {
if (track.kind === Track.Kind.Audio) {
processTrack(room, track);
}
});
return room;
}
async function processTrack(ctx, track) {
const stt = new STT();
const sttStream = stt.stream();
const audioStream = new rtc.AudioStream(track);
ctx.createTask(processTextFromSpeech(sttStream));
for await (const audioEvent of audioStream) {
sttStream.pushFrame(audioEvent.frame);
}
sttStream.endInput();
}
async function processTextFromSpeech(stream) {
let finalText = '';
for await (const event of stream) {
if (event.type === 'INTERIM_TRANSCRIPT') {
console.log('Interim:', event.alternatives[0].text);
} else if (event.type === 'FINAL_TRANSCRIPT') {
finalText = event.alternatives[0].text;
console.log('Final:', finalText);
} else if (event.type === 'END_OF_SPEECH') {
const responseText = generateResponse(finalText);
await sendResponseAsTTS(responseText);
}
}
await stream.close();
}
function generateResponse(text) {
return `You said: ${text}`;
}
async function sendResponseAsTTS(text) {
const tts = new TTS();
const ttsStream = tts.stream();
ttsStream.pushText(text);
ttsStream.endInput();
for await (const audioFrame of ttsStream) {
console.log('Sending audio frame:', audioFrame);
}
await ttsStream.close();
}
// Example usage
const serverUrl = '<your LiveKit server URL>';
const token = '<your access token>';
connectToRoom(serverUrl, token);
For a GitHub repository example, you can refer to the LiveKit GitHub for various examples and the agents repository for agent-specific implementations. These resources can help you understand how to set up and manage LiveKit rooms and tracks.
Would you like more detailed guidance on any specific part of this process?
Sources:
- https://livekit-users.slack.com/archives/C07FY8WHGPM/p1726915449.973779
- https://docs.livekit.io/agents/plugins/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.bumpy-shampoo-17100
02/28/2025, 7:50 PMgray-monkey-83525
02/28/2025, 7:50 PMVoicePipelineAgent
from LiveKit. This agent allows you to handle STT, LLM, and TTS with interruption support. Here's how you can set it up:
1. Set Up the Agent: Use the VoicePipelineAgent
to manage the conversation flow. This agent can handle interruptions by setting the allowInterruptions
option.
2. Implement Interruption Handling: Use the allowInterruptions
flag to enable interruptions. You can also listen for specific events to manage interruptions.
Here's a complete example in Node.js:
const { Room, Track, RoomEvent } = require('livekit-client');
const { VoicePipelineAgent } = require('livekit/agents');
const { STT } = require('livekit/agents/plugins/deepgram');
const { TTS } = require('livekit/agents/plugins/elevenlabs');
const { LLM } = require('livekit/agents/plugins/openai');
async function connectToRoom(serverUrl, token) {
const room = new Room();
await room.connect(serverUrl, token);
console.log('Connected to room:', room.name);
room.on(RoomEvent.TrackSubscribed, (track, publication, participant) => {
if (track.kind === Track.Kind.Audio) {
processTrack(room, track);
}
});
return room;
}
async function processTrack(ctx, track) {
const agent = new VoicePipelineAgent({
vad: new VAD(), // Voice Activity Detection
stt: new STT(),
llm: new LLM(),
tts: new TTS(),
allowInterruptions: true, // Enable interruptions
});
agent.on('user_started_speaking', () => {
console.log('User started speaking');
});
agent.on('user_stopped_speaking', () => {
console.log('User stopped speaking');
});
agent.on('agent_speech_interrupted', () => {
console.log('Agent speech interrupted');
});
agent.start(ctx, track);
ctx.createTask(async () => {
for await (const event of agent) {
if (event.type === 'FINAL_TRANSCRIPT') {
const responseText = generateResponse(event.alternatives[0].text);
await agent.say(responseText, { allowInterruptions: true });
}
}
});
}
function generateResponse(text) {
return `You said: ${text}`;
}
// Example usage
const serverUrl = '<your LiveKit server URL>';
const token = '<your access token>';
connectToRoom(serverUrl, token);
This example demonstrates how to integrate a VoicePipelineAgent
into your application, enabling interruption handling. The agent listens for speech events and can interrupt its own speech if needed. Would you like more detailed guidance on any specific part of this process?
Sources:
- Working with plugins 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.bumpy-shampoo-17100
02/28/2025, 7:53 PMconst { Room, Track, RoomEvent } = require('livekit-client');
const { STT } = require('livekit/agents/plugins/deepgram');
const { TTS } = require('livekit/agents/plugins/elevenlabs');
async function connectToRoom(serverUrl, token) {
const room = new Room();
await room.connect(serverUrl, token);
console.log('Connected to room:', room.name);
room.on(RoomEvent.TrackSubscribed, (track, publication, participant) => {
if (track.kind === Track.Kind.Audio) {
processTrack(room, track);
}
});
return room;
}
async function processTrack(ctx, track) {
const stt = new STT();
const sttStream = stt.stream();
const audioStream = new rtc.AudioStream(track);
ctx.createTask(processTextFromSpeech(sttStream));
for await (const audioEvent of audioStream) {
sttStream.pushFrame(audioEvent.frame);
}
sttStream.endInput();
}
async function processTextFromSpeech(stream) {
let finalText = '';
for await (const event of stream) {
if (event.type === 'INTERIM_TRANSCRIPT') {
console.log('Interim:', event.alternatives[0].text);
} else if (event.type === 'FINAL_TRANSCRIPT') {
finalText = event.alternatives[0].text;
console.log('Final:', finalText);
} else if (event.type === 'END_OF_SPEECH') {
const responseText = generateResponse(finalText);
await sendResponseAsTTS(responseText);
}
}
await stream.close();
}
function generateResponse(text) {
return `You said: ${text}`;
}
async function sendResponseAsTTS(text) {
const tts = new TTS();
const ttsStream = tts.stream();
ttsStream.pushText(text);
ttsStream.endInput();
for await (const audioFrame of ttsStream) {
console.log('Sending audio frame:', audioFrame);
}
await ttsStream.close();
}
// Example usage
const serverUrl = '<your LiveKit server URL>';
const token = '<your access token>';
connectToRoom(serverUrl, token);
Can I directly pass interuption events in this?
Also I need the transcriptions visible in frontend clientbumpy-shampoo-17100
02/28/2025, 7:53 PMgray-monkey-83525
02/28/2025, 7:53 PMgray-monkey-83525
02/28/2025, 7:54 PMconst { Room, Track, RoomEvent } = require('livekit-client');
const { STT } = require('livekit/agents/plugins/deepgram');
const { TTS } = require('livekit/agents/plugins/elevenlabs');
const WebSocket = require('ws'); // Use WebSocket for real-time communication
const wss = new WebSocket.Server({ port: 8080 }); // WebSocket server for frontend communication
async function connectToRoom(serverUrl, token) {
const room = new Room();
await room.connect(serverUrl, token);
console.log('Connected to room:', room.name);
room.on(RoomEvent.TrackSubscribed, (track, publication, participant) => {
if (track.kind === Track.Kind.Audio) {
processTrack(room, track);
}
});
return room;
}
async function processTrack(ctx, track) {
const stt = new STT();
const sttStream = stt.stream();
const audioStream = new rtc.AudioStream(track);
ctx.createTask(processTextFromSpeech(sttStream));
for await (const audioEvent of audioStream) {
sttStream.pushFrame(audioEvent.frame);
}
sttStream.endInput();
}
async function processTextFromSpeech(stream) {
let finalText = '';
for await (const event of stream) {
if (event.type === 'INTERIM_TRANSCRIPT') {
console.log('Interim:', event.alternatives[0].text);
broadcastToClients('Interim:', event.alternatives[0].text);
} else if (event.type === 'FINAL_TRANSCRIPT') {
finalText = event.alternatives[0].text;
console.log('Final:', finalText);
broadcastToClients('Final:', finalText);
} else if (event.type === 'END_OF_SPEECH') {
const responseText = generateResponse(finalText);
await sendResponseAsTTS(responseText);
}
}
await stream.close();
}
function generateResponse(text) {
return `You said: ${text}`;
}
async function sendResponseAsTTS(text) {
const tts = new TTS();
const ttsStream = tts.stream();
ttsStream.pushText(text);
ttsStream.endInput();
for await (const audioFrame of ttsStream) {
console.log('Sending audio frame:', audioFrame);
}
await ttsStream.close();
}
function broadcastToClients(type, message) {
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ type, message }));
}
});
}
// Example usage
const serverUrl = '<your LiveKit server URL>';
const token = '<your access token>';
connectToRoom(serverUrl, token);
gray-monkey-83525
02/28/2025, 7:54 PMbroadcastToClients
function sends transcription updates to all connected WebSocket clients.
- Interruption Handling: You can add more logic to handle interruptions by listening for specific events and managing the flow of speech processing.
Would you like more detailed guidance on any specific part of this process?
Sources:
- https://docs.livekit.io/python/livekit/agents/pipeline/human_input.html#livekit.agents.pipeline.human_input.HumanInput
- https://docs.livekit.io/agents/openai/multimodal-agent/ 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.bumpy-shampoo-17100
02/28/2025, 8:13 PMbumpy-shampoo-17100
02/28/2025, 8:13 PMgray-monkey-83525
02/28/2025, 8:13 PMgray-monkey-83525
02/28/2025, 8:14 PMconst { Room, Track, RoomEvent } = require('livekit-client');
const { STT } = require('livekit/agents/plugins/deepgram');
const { TTS } = require('livekit/agents/plugins/elevenlabs');
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
let currentTTSStream = null; // Track the current TTS stream
async function connectToRoom(serverUrl, token) {
const room = new Room();
await room.connect(serverUrl, token);
console.log('Connected to room:', room.name);
room.on(RoomEvent.TrackSubscribed, (track, publication, participant) => {
if (track.kind === Track.Kind.Audio) {
processTrack(room, track);
}
});
return room;
}
async function processTrack(ctx, track) {
const stt = new STT();
const sttStream = stt.stream();
const audioStream = new rtc.AudioStream(track);
ctx.createTask(processTextFromSpeech(sttStream));
for await (const audioEvent of audioStream) {
sttStream.pushFrame(audioEvent.frame);
}
sttStream.endInput();
}
async function processTextFromSpeech(stream) {
let finalText = '';
for await (const event of stream) {
if (event.type === 'INTERIM_TRANSCRIPT') {
console.log('Interim:', event.alternatives[0].text);
broadcastToClients('Interim:', event.alternatives[0].text);
} else if (event.type === 'FINAL_TRANSCRIPT') {
finalText = event.alternatives[0].text;
console.log('Final:', finalText);
broadcastToClients('Final:', finalText);
} else if (event.type === 'END_OF_SPEECH') {
const responseText = generateResponse(finalText);
await sendResponseAsTTS(responseText);
}
}
await stream.close();
}
function generateResponse(text) {
return `You said: ${text}`;
}
async function sendResponseAsTTS(text) {
if (currentTTSStream) {
currentTTSStream.endInput(); // Interrupt the current TTS stream
}
const tts = new TTS();
currentTTSStream = tts.stream();
currentTTSStream.pushText(text);
currentTTSStream.endInput();
for await (const audioFrame of currentTTSStream) {
console.log('Sending audio frame:', audioFrame);
}
await currentTTSStream.close();
currentTTSStream = null; // Reset the current TTS stream
}
function broadcastToClients(type, message) {
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify({ type, message }));
}
});
}
// Example usage
const serverUrl = '<your LiveKit server URL>';
const token = '<your access token>';
connectToRoom(serverUrl, token);
gray-monkey-83525
02/28/2025, 8:14 PMcurrentTTSStream
variable tracks the current TTS stream. When a new TTS request is made, the existing stream is interrupted by calling endInput()
.
- Interruption Logic: The sendResponseAsTTS
function checks if there is an active TTS stream and interrupts it before starting a new one.
This setup allows you to interrupt the agent's speech whenever a new command or event requires it. Would you like more detailed guidance on any specific part of this process?
Sources:
- https://docs.livekit.io/agents/voice-agent/voice-pipeline/#Interruption-handling
- https://docs.livekit.io/python/livekit/agents/pipeline/agent_output.html#livekit.agents.pipeline.agent_output.AgentOutput.playout 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.bumpy-shampoo-17100
03/01/2025, 7:59 AM// SPDX-FileCopyrightText: 2024 LiveKit, Inc.
//
// SPDX-License-Identifier: Apache-2.0
import type { JobProcess } from '@livekit/agents';
import {
AutoSubscribe,
type JobContext,
WorkerOptions,
cli,
defineAgent,
llm,
pipeline,
} from '@livekit/agents';
import * as deepgram from '@livekit/agents-plugin-deepgram';
import * as livekit from '@livekit/agents-plugin-livekit';
import * as openai from '@livekit/agents-plugin-openai';
import * as silero from '@livekit/agents-plugin-silero';
import { fileURLToPath } from 'node:url';
import { z } from 'zod';
export default defineAgent({
prewarm: async (proc: JobProcess) => {
proc.userData.vad = await silero.VAD.load();
},
entry: async (ctx: JobContext) => {
const vad = ctx.proc.userData.vad! as silero.VAD;
const initialContext = new llm.ChatContext().append({
role: llm.ChatRole.SYSTEM,
text:
'You are a voice assistant created by LiveKit. Your interface with users will be voice. ' +
'You should use short and concise responses, and avoiding usage of unpronounceable ' +
'punctuation.',
});
await ctx.connect(undefined, AutoSubscribe.AUDIO_ONLY);
console.log('waiting for participant');
const participant = await ctx.waitForParticipant();
console.log(starting assistant example agent for ${participant.identity});
const fncCtx: llm.FunctionContext = {
weather: {
description: 'Get the weather in a location',
parameters: z.object({
location: z.string().describe('The location to get the weather for'),
}),
execute: async ({ location }) => {
console.debug(executing weather function for ${location});
const response = await fetch(<https://wttr.in/${location}?format=%C+%t>);
if (!response.ok) {
throw new Error(Weather API returned status: ${response.status});
}
const weather = await response.text();
return The weather in ${location} right now is ${weather}.;
},
},
};
const agent = new pipeline.VoicePipelineAgent(
vad,
new deepgram.STT(),
new openai.LLM(),
new openai.TTS(),
{ chatCtx: initialContext, fncCtx, turnDetector: new livekit.turnDetector.EOUModel() },
);
agent.start(ctx.room, participant);
await agent.say('Hey, how can I help you today', true);
},
});
cli.runApp(new WorkerOptions({ agent: fileURLToPath(import.meta.url) }));
What is the entrypoint of this code?
How to run this codebumpy-shampoo-17100
03/01/2025, 8:30 AMgray-monkey-83525
03/01/2025, 8:30 AM