acceptable-dream-85180
08/06/2025, 6:06 PMtall-airline-16756
08/06/2025, 6:06 PMtall-airline-16756
08/06/2025, 6:06 PMtranscription_node
method in your agent. This method processes transcription chunks, including word-level timestamps via TimedString
objects.
Here’s how to implement it:
1. Enable the use_tts_aligned_transcript
flag when creating an AgentSession
. This synchronizes transcription timing with the TTS output.
2. In the transcription_node
method, process each transcription chunk to access word-level timestamps (e.g., chunk.start_time
and chunk.end_time
).
Note that this feature is experimental and currently supported by specific TTS providers like Cartesia and ElevenLabs. Let me know if you need further details or code examples!
Sources: Text and transcriptions | LiveKit Docsacceptable-dream-85180
08/06/2025, 6:23 PMtall-airline-16756
08/06/2025, 6:24 PMtranscription_node
method is your go-to for handling transcription chunks in your custom agent. Here’s how you can smoothly fit it into your workflow:
1. Make sure your agent subclasses the base Agent
class and overrides the transcription_node
method.
2. Turn on TTS-aligned transcriptions by setting use_tts_aligned_transcript=True
in your AgentSession
configuration.
3. Inside the transcription_node
, you can work with each chunk of text—like tweaking the text (for example, delta.replace("😘", "")
) or grabbing timing info if the chunk is a TimedString
.
4. Double-check that your custom agent is plugged into your application pipeline so the transcription_node
can do its thing during transcription processing.
If anything’s unclear or you’d like some examples, just give me a shout—I’m here to help!
Sources: Text and transcriptions | LiveKit Docs | examples/voice_agents/structured_output.py | Pipeline nodes and hooks | LiveKit Docsacceptable-dream-85180
08/06/2025, 6:28 PMimport json
import os
from datetime import datetime
from dotenv import load_dotenv
from livekit import agents
from livekit import api as livekit_api
from livekit.agents import Agent, AgentSession, RoomInputOptions , ModelSettings
from livekit.plugins import cartesia, deepgram, noise_cancellation, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import AsyncIterable
import boto3
import asyncio
load_dotenv()
class Assistant(Agent):
def __init__(self) -> None:
super().__init__(instructions="")
async def entrypoint(ctx: agents.JobContext) -> None:
session: AgentSession = None
async def write_transcript() -> None:
try:
current_date = datetime.now().strftime("%Y%m%d_%H%M%S")
# Upload transcript to S3 with timeout
bucket = "staging-interview-recordings"
key = f"transcripts/{ctx.room.name}_{current_date}.json"
# Check if session exists and has history
if session and hasattr(session, "history"):
transcript_data = json.dumps(session.history.to_dict(), indent=2)
else:
# Fallback with empty items if session is not available
transcript_data = json.dumps({"items": []}, indent=2)
print("Warning: Session history not available, saving empty transcript")
# Use asyncio.wait_for to timeout the S3 upload
async def upload_to_s3():
loop = asyncio.get_event_loop()
s3_client = boto3.client(
"s3",
aws_access_key_id=os.getenv("S3_UPLOAD_ACCESS_KEY"),
aws_secret_access_key=os.getenv("S3_UPLOAD_SECRET"),
region_name=os.getenv("AWS_REGION", "us-east-1"),
)
# Run S3 upload in thread pool to avoid blocking
await loop.run_in_executor(
None,
lambda: s3_client.put_object(
Bucket=bucket, Key=key, Body=transcript_data, ContentType="application/json"
),
)
# Timeout after 5 seconds to prevent blocking shutdown
await asyncio.wait_for(upload_to_s3(), timeout=5.0)
print(f"Transcript for {ctx.room.name} saved to s3://{bucket}/{key}")
except asyncio.TimeoutError:
print(f"S3 upload timed out for {ctx.room.name}, skipping...")
except Exception as e:
print(f"Failed to upload transcript to S3: {e}")
async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]:
async for delta in text:
yield delta.replace("😘", " ")
ctx.add_shutdown_callback(write_transcript)
room = ctx.room
# Note: Room context metadata is empty by design - so we need to fetch the room from the API in order to get the metadata
agent = Assistant()
# Create LiveKit API client to fetch fresh room info
livekit_url = os.getenv("LIVEKIT_URL")
livekit_api_key = os.getenv("LIVEKIT_API_KEY")
livekit_api_secret = os.getenv("LIVEKIT_API_SECRET")
lk_api = livekit_api.LiveKitAPI(url=livekit_url, api_key=livekit_api_key, api_secret=livekit_api_secret)
try:
# Use list_rooms with name filter
room_request = livekit_api.ListRoomsRequest(names=[room.name])
room_info = await lk_api.room.list_rooms(room_request)
if not room_info.rooms:
raise Exception(f"Room {room.name} not found in API")
target_room = room_info.rooms[0] # Should be the only one with this name
# Parse metadata from API
if not target_room.metadata:
raise Exception("API room has no metadata!")
metadata = json.loads(target_room.metadata)
prompt = metadata.get("prompt", "")
if not prompt:
raise Exception("No prompt was provided for the agent")
await agent.update_instructions(prompt)
print("=== AGENT READY ===")
language = metadata.get("language", "en-US")
tts_model = metadata.get("tts_model", "sonic-2")
tts_voice = metadata.get("tts_voice", "f786b574-daa5-4673-aa0c-cbe3e8534c02")
session = AgentSession(
stt=deepgram.STT(model="nova-3", language=language),
llm=openai.LLM(model="gpt-4o-mini"),
tts=cartesia.TTS(model=tts_model, voice=tts_voice, language=language),
vad=silero.VAD.load(),
turn_detection=MultilingualModel(),
use_tts_aligned_transcript=True
)
await session.start(
room=ctx.room,
agent=agent,
room_input_options=RoomInputOptions(
# - For telephony applications, use `BVCTelephony` for best results
noise_cancellation=noise_cancellation.BVC(),
),
)
finally:
await lk_api.aclose()
if __name__ == "__main__":
agents.cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))
tall-airline-16756
08/06/2025, 6:28 PMtranscription_node
method working smoothly in your workflow, here’s what you’ll want to do:
1. Subclass the Agent
class and override the transcription_node
method, just like you’ve done. This method handles transcription chunks as they come in, in real-time.
2. Make sure your AgentSession
is set up with use_tts_aligned_transcript=True
so transcription processing is enabled.
3. When you start the session, pass your custom agent (like your Assistant
) to the AgentSession
. For example:
session = AgentSession(
agent=Assistant(),
room=ctx.room,
use_tts_aligned_transcript=True
)
await session.start()
Just double-check that your Assistant
agent is being passed in and that the session starts properly. If anything’s unclear or you want to dive deeper, I’m here to help!acceptable-dream-85180
08/06/2025, 6:30 PMclass Assistant(Agent):
def __init__(self) -> None:
super().__init__(instructions="")
async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]:
async for delta in text:
yield delta.replace("😘", " ")
tall-airline-16756
08/06/2025, 6:30 PMtranscription_node
method looks spot on for handling transcription chunks. It smartly replaces "😘" with a space in the text. To make sure everything runs smoothly, just double-check these few things:
1. That the Assistant
class is passed to the AgentSession
when you start the session:
session = AgentSession(
agent=Assistant(),
room=ctx.room,
use_tts_aligned_transcript=True
)
await session.start()
1. The use_tts_aligned_transcript
flag is set to True
in the AgentSession
if you want those TTS-aligned transcripts.
2. And that the transcription_node
method is being called properly within your transcription pipeline.
If you’ve checked all this and it’s still giving you trouble, just give me a shout! I’m here to help you figure it out.acceptable-dream-85180
08/06/2025, 6:32 PMimport json
import os
from datetime import datetime
from dotenv import load_dotenv
from livekit import agents
from livekit import api as livekit_api
from livekit.agents import Agent, AgentSession, RoomInputOptions , ModelSettings
from livekit.plugins import cartesia, deepgram, noise_cancellation, openai, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
from typing import AsyncIterable
import boto3
import asyncio
load_dotenv()
class Assistant(Agent):
def __init__(self) -> None:
super().__init__(instructions="")
async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]:
async for delta in text:
yield delta.replace("😘", " ")
async def entrypoint(ctx: agents.JobContext) -> None:
session: AgentSession = None
async def write_transcript() -> None:
try:
current_date = datetime.now().strftime("%Y%m%d_%H%M%S")
# Upload transcript to S3 with timeout
bucket = "staging-interview-recordings"
key = f"transcripts/{ctx.room.name}_{current_date}.json"
# Check if session exists and has history
if session and hasattr(session, "history"):
transcript_data = json.dumps(session.history.to_dict(), indent=2)
else:
# Fallback with empty items if session is not available
transcript_data = json.dumps({"items": []}, indent=2)
print("Warning: Session history not available, saving empty transcript")
# Use asyncio.wait_for to timeout the S3 upload
async def upload_to_s3():
loop = asyncio.get_event_loop()
s3_client = boto3.client(
"s3",
aws_access_key_id=os.getenv("S3_UPLOAD_ACCESS_KEY"),
aws_secret_access_key=os.getenv("S3_UPLOAD_SECRET"),
region_name=os.getenv("AWS_REGION", "us-east-1"),
)
# Run S3 upload in thread pool to avoid blocking
await loop.run_in_executor(
None,
lambda: s3_client.put_object(
Bucket=bucket, Key=key, Body=transcript_data, ContentType="application/json"
),
)
# Timeout after 5 seconds to prevent blocking shutdown
await asyncio.wait_for(upload_to_s3(), timeout=5.0)
print(f"Transcript for {ctx.room.name} saved to s3://{bucket}/{key}")
except asyncio.TimeoutError:
print(f"S3 upload timed out for {ctx.room.name}, skipping...")
except Exception as e:
print(f"Failed to upload transcript to S3: {e}")
ctx.add_shutdown_callback(write_transcript)
room = ctx.room
# Note: Room context metadata is empty by design - so we need to fetch the room from the API in order to get the metadata
agent = Assistant()
# Create LiveKit API client to fetch fresh room info
livekit_url = os.getenv("LIVEKIT_URL")
livekit_api_key = os.getenv("LIVEKIT_API_KEY")
livekit_api_secret = os.getenv("LIVEKIT_API_SECRET")
lk_api = livekit_api.LiveKitAPI(url=livekit_url, api_key=livekit_api_key, api_secret=livekit_api_secret)
try:
# Use list_rooms with name filter
room_request = livekit_api.ListRoomsRequest(names=[room.name])
room_info = await lk_api.room.list_rooms(room_request)
if not room_info.rooms:
raise Exception(f"Room {room.name} not found in API")
target_room = room_info.rooms[0] # Should be the only one with this name
# Parse metadata from API
if not target_room.metadata:
raise Exception("API room has no metadata!")
metadata = json.loads(target_room.metadata)
prompt = metadata.get("prompt", "")
if not prompt:
raise Exception("No prompt was provided for the agent")
await agent.update_instructions(prompt)
print("=== AGENT READY ===")
language = metadata.get("language", "en-US")
tts_model = metadata.get("tts_model", "sonic-2")
tts_voice = metadata.get("tts_voice", "f786b574-daa5-4673-aa0c-cbe3e8534c02")
session = AgentSession(
agent=agent,
stt=deepgram.STT(model="nova-3", language=language),
llm=openai.LLM(model="gpt-4o-mini"),
tts=cartesia.TTS(model=tts_model, voice=tts_voice, language=language),
vad=silero.VAD.load(),
turn_detection=MultilingualModel(),
use_tts_aligned_transcript=True
)
await session.start(
room=ctx.room,
room_input_options=RoomInputOptions(
# - For telephony applications, use `BVCTelephony` for best results
noise_cancellation=noise_cancellation.BVC(),
),
)
finally:
await lk_api.aclose()
if __name__ == "__main__":
agents.cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))
tall-airline-16756
08/06/2025, 6:32 PMtranscription_node
method into the Assistant
class and passing it along to the AgentSession
. Plus, setting use_tts_aligned_transcript=True
means transcription processing is all set and ready to go.
Just a heads-up—give the workflow a test run to make sure the transcription_node
method gets called and the text changes (like swapping out "😘" for a space) happen just right. If anything feels off or you hit a snag, don’t hesitate to reach out—I’m here to help you figure it out!acceptable-dream-85180
08/06/2025, 7:33 PMtall-airline-16756
08/06/2025, 7:34 PMuse_tts_aligned_transcript=True
set in your AgentSession
configuration.
◦ In the transcription_node
method, you’ll want to handle TimedString
objects—they come with start_time
and end_time
for each word. Here’s a quick example:
1. User's Word-Level Timestamps:
◦ Choose an STT provider that supports word-level timestamps—Sarvam is a great option.
◦ You can then get the user’s transcriptions with timestamps directly from the STT results.
If you want me to walk you through any part of this or need more examples, just let me know—I’m happy to help!acceptable-dream-85180
08/06/2025, 8:05 PMtall-airline-16756
08/06/2025, 8:05 PMinterim_results=True
when you initialize the Deepgram STT plugin. This way, you’ll get real-time transcription updates along with timestamps for each word—super handy!
Here’s a quick example to show you how:
from livekit.plugins.deepgram import stt
deepgram_stt = stt.STT(
model="nova-2-general",
interim_results=True, # This turns on word-level timestamps
punctuate=True,
filler_words=True,
language="en-US",
)
Then, inside your agent’s transcription_node
method, you can handle the transcription stream to grab the start_time
and end_time
for each word like this:
async def transcription_node(self, text: AsyncIterable[str | TimedString], model_settings: ModelSettings) -> AsyncGenerator[str | TimedString, None]:
async for chunk in text:
if isinstance(chunk, TimedString):
<http://logger.info|logger.info>(f"Word: '{chunk.text}' (Start: {chunk.start_time}, End: {chunk.end_time})")
yield chunk
If you have any questions or want me to walk you through it, just give me a shout—I’m here to help!
Sources: livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.p... | Text and transcriptions | LiveKit Docs