Is there a way to get the transcript with word lev...
# ask-ai
a
Is there a way to get the transcript with word level timestamps?
t
Thanks for reaching out, Anne. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
To get transcripts with word-level timestamps in LiveKit, you can use the
transcription_node
method in your agent. This method processes transcription chunks, including word-level timestamps via
TimedString
objects. Here’s how to implement it: 1. Enable the
use_tts_aligned_transcript
flag when creating an
AgentSession
. This synchronizes transcription timing with the TTS output. 2. In the
transcription_node
method, process each transcription chunk to access word-level timestamps (e.g.,
chunk.start_time
and
chunk.end_time
). Note that this feature is experimental and currently supported by specific TTS providers like Cartesia and ElevenLabs. Let me know if you need further details or code examples! Sources: Text and transcriptions | LiveKit Docs
a
I added async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]: async for delta in text: yield delta.replace("😘", "") to my agent like in the example but i don't see where I should use the method
t
The
transcription_node
method is your go-to for handling transcription chunks in your custom agent. Here’s how you can smoothly fit it into your workflow: 1. Make sure your agent subclasses the base
Agent
class and overrides the
transcription_node
method. 2. Turn on TTS-aligned transcriptions by setting
use_tts_aligned_transcript=True
in your
AgentSession
configuration. 3. Inside the
transcription_node
, you can work with each chunk of text—like tweaking the text (for example,
delta.replace("😘", "")
) or grabbing timing info if the chunk is a
TimedString
. 4. Double-check that your custom agent is plugged into your application pipeline so the
transcription_node
can do its thing during transcription processing. If anything’s unclear or you’d like some examples, just give me a shout—I’m here to help! Sources: Text and transcriptions | LiveKit Docs | examples/voice_agents/structured_output.py | Pipeline nodes and hooks | LiveKit Docs
a
yes, please help me, here is my implementation and I don't know what I am missing
Copy code
import json
import os
from datetime import datetime
from dotenv import load_dotenv
from livekit import agents
from livekit import api as livekit_api
from livekit.agents import Agent, AgentSession, RoomInputOptions , ModelSettings
from livekit.plugins import cartesia, deepgram, noise_cancellation, openai, silero 
from livekit.plugins.turn_detector.multilingual import MultilingualModel 
from typing import AsyncIterable
import boto3
import asyncio
load_dotenv()


class Assistant(Agent):
    def __init__(self) -> None:
        super().__init__(instructions="")


async def entrypoint(ctx: agents.JobContext) -> None:
    session: AgentSession = None
    async def write_transcript() -> None:
        try:
            current_date = datetime.now().strftime("%Y%m%d_%H%M%S")

            # Upload transcript to S3 with timeout
            bucket = "staging-interview-recordings"
            key = f"transcripts/{ctx.room.name}_{current_date}.json"

            # Check if session exists and has history
            if session and hasattr(session, "history"):
                transcript_data = json.dumps(session.history.to_dict(), indent=2)
            else:
                # Fallback with empty items if session is not available
                transcript_data = json.dumps({"items": []}, indent=2)
                print("Warning: Session history not available, saving empty transcript")

            # Use asyncio.wait_for to timeout the S3 upload
            async def upload_to_s3():
                loop = asyncio.get_event_loop()
                s3_client = boto3.client(
                    "s3",
                    aws_access_key_id=os.getenv("S3_UPLOAD_ACCESS_KEY"),
                    aws_secret_access_key=os.getenv("S3_UPLOAD_SECRET"),
                    region_name=os.getenv("AWS_REGION", "us-east-1"),
                )

                # Run S3 upload in thread pool to avoid blocking
                await loop.run_in_executor(
                    None,
                    lambda: s3_client.put_object(
                        Bucket=bucket, Key=key, Body=transcript_data, ContentType="application/json"
                    ),
                )

            # Timeout after 5 seconds to prevent blocking shutdown
            await asyncio.wait_for(upload_to_s3(), timeout=5.0)
            print(f"Transcript for {ctx.room.name} saved to s3://{bucket}/{key}")

        except asyncio.TimeoutError:
            print(f"S3 upload timed out for {ctx.room.name}, skipping...")
        except Exception as e:
            print(f"Failed to upload transcript to S3: {e}")

    async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]: 
        async for delta in text:
            yield delta.replace("😘", " ")

    ctx.add_shutdown_callback(write_transcript)
    room = ctx.room

    # Note: Room context metadata is empty by design - so we need to fetch the room from the API in order to get the metadata
    agent = Assistant()

    # Create LiveKit API client to fetch fresh room info
    livekit_url = os.getenv("LIVEKIT_URL")
    livekit_api_key = os.getenv("LIVEKIT_API_KEY")
    livekit_api_secret = os.getenv("LIVEKIT_API_SECRET")
    lk_api = livekit_api.LiveKitAPI(url=livekit_url, api_key=livekit_api_key, api_secret=livekit_api_secret)
    try:
        # Use list_rooms with name filter
        room_request = livekit_api.ListRoomsRequest(names=[room.name])
        room_info = await lk_api.room.list_rooms(room_request)

        if not room_info.rooms:
            raise Exception(f"Room {room.name} not found in API")

        target_room = room_info.rooms[0]  # Should be the only one with this name
        # Parse metadata from API
        if not target_room.metadata:
            raise Exception("API room has no metadata!")

        metadata = json.loads(target_room.metadata)
        prompt = metadata.get("prompt", "")
        if not prompt:
            raise Exception("No prompt was provided for the agent")
        await agent.update_instructions(prompt)
        print("=== AGENT READY ===")
        language = metadata.get("language", "en-US")
        tts_model = metadata.get("tts_model", "sonic-2")
        tts_voice = metadata.get("tts_voice", "f786b574-daa5-4673-aa0c-cbe3e8534c02")
        session = AgentSession(
            stt=deepgram.STT(model="nova-3", language=language),
            llm=openai.LLM(model="gpt-4o-mini"),
            tts=cartesia.TTS(model=tts_model, voice=tts_voice, language=language),
            vad=silero.VAD.load(),
            turn_detection=MultilingualModel(),
            use_tts_aligned_transcript=True
        )
        await session.start(
            room=ctx.room,
            agent=agent,
            room_input_options=RoomInputOptions(
                # - For telephony applications, use `BVCTelephony` for best results
                noise_cancellation=noise_cancellation.BVC(),
            ),
        )

    finally:
        await lk_api.aclose()


if __name__ == "__main__":
    agents.cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))
t
To get the
transcription_node
method working smoothly in your workflow, here’s what you’ll want to do: 1. Subclass the
Agent
class and override the
transcription_node
method, just like you’ve done. This method handles transcription chunks as they come in, in real-time. 2. Make sure your
AgentSession
is set up with
use_tts_aligned_transcript=True
so transcription processing is enabled. 3. When you start the session, pass your custom agent (like your
Assistant
) to the
AgentSession
. For example:
Copy code
session = AgentSession(
    agent=Assistant(),
    room=ctx.room,
    use_tts_aligned_transcript=True
)
await session.start()
Just double-check that your
Assistant
agent is being passed in and that the session starts properly. If anything’s unclear or you want to dive deeper, I’m here to help!
a
like so?
Copy code
class Assistant(Agent):
    def __init__(self) -> None:
        super().__init__(instructions="")
    async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]: 
        async for delta in text:
            yield delta.replace("😘", " ")
t
Hey there! Your implementation of the
transcription_node
method looks spot on for handling transcription chunks. It smartly replaces "😘" with a space in the text. To make sure everything runs smoothly, just double-check these few things: 1. That the
Assistant
class is passed to the
AgentSession
when you start the session:
Copy code
session = AgentSession(
    agent=Assistant(),
    room=ctx.room,
    use_tts_aligned_transcript=True
)
await session.start()
1. The
use_tts_aligned_transcript
flag is set to
True
in the
AgentSession
if you want those TTS-aligned transcripts. 2. And that the
transcription_node
method is being called properly within your transcription pipeline. If you’ve checked all this and it’s still giving you trouble, just give me a shout! I’m here to help you figure it out.
a
like so?
Copy code
import json
import os
from datetime import datetime
from dotenv import load_dotenv
from livekit import agents
from livekit import api as livekit_api
from livekit.agents import Agent, AgentSession, RoomInputOptions , ModelSettings
from livekit.plugins import cartesia, deepgram, noise_cancellation, openai, silero 
from livekit.plugins.turn_detector.multilingual import MultilingualModel 
from typing import AsyncIterable
import boto3
import asyncio
load_dotenv()


class Assistant(Agent):
    def __init__(self) -> None:
        super().__init__(instructions="")
    async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]: 
        async for delta in text:
            yield delta.replace("😘", " ")


async def entrypoint(ctx: agents.JobContext) -> None:
    session: AgentSession = None
    async def write_transcript() -> None:
        try:
            current_date = datetime.now().strftime("%Y%m%d_%H%M%S")

            # Upload transcript to S3 with timeout
            bucket = "staging-interview-recordings"
            key = f"transcripts/{ctx.room.name}_{current_date}.json"

            # Check if session exists and has history
            if session and hasattr(session, "history"):
                transcript_data = json.dumps(session.history.to_dict(), indent=2)
            else:
                # Fallback with empty items if session is not available
                transcript_data = json.dumps({"items": []}, indent=2)
                print("Warning: Session history not available, saving empty transcript")

            # Use asyncio.wait_for to timeout the S3 upload
            async def upload_to_s3():
                loop = asyncio.get_event_loop()
                s3_client = boto3.client(
                    "s3",
                    aws_access_key_id=os.getenv("S3_UPLOAD_ACCESS_KEY"),
                    aws_secret_access_key=os.getenv("S3_UPLOAD_SECRET"),
                    region_name=os.getenv("AWS_REGION", "us-east-1"),
                )

                # Run S3 upload in thread pool to avoid blocking
                await loop.run_in_executor(
                    None,
                    lambda: s3_client.put_object(
                        Bucket=bucket, Key=key, Body=transcript_data, ContentType="application/json"
                    ),
                )

            # Timeout after 5 seconds to prevent blocking shutdown
            await asyncio.wait_for(upload_to_s3(), timeout=5.0)
            print(f"Transcript for {ctx.room.name} saved to s3://{bucket}/{key}")

        except asyncio.TimeoutError:
            print(f"S3 upload timed out for {ctx.room.name}, skipping...")
        except Exception as e:
            print(f"Failed to upload transcript to S3: {e}")



    ctx.add_shutdown_callback(write_transcript)
    room = ctx.room

    # Note: Room context metadata is empty by design - so we need to fetch the room from the API in order to get the metadata
    agent = Assistant()

    # Create LiveKit API client to fetch fresh room info
    livekit_url = os.getenv("LIVEKIT_URL")
    livekit_api_key = os.getenv("LIVEKIT_API_KEY")
    livekit_api_secret = os.getenv("LIVEKIT_API_SECRET")
    lk_api = livekit_api.LiveKitAPI(url=livekit_url, api_key=livekit_api_key, api_secret=livekit_api_secret)
    try:
        # Use list_rooms with name filter
        room_request = livekit_api.ListRoomsRequest(names=[room.name])
        room_info = await lk_api.room.list_rooms(room_request)

        if not room_info.rooms:
            raise Exception(f"Room {room.name} not found in API")

        target_room = room_info.rooms[0]  # Should be the only one with this name
        # Parse metadata from API
        if not target_room.metadata:
            raise Exception("API room has no metadata!")

        metadata = json.loads(target_room.metadata)
        prompt = metadata.get("prompt", "")
        if not prompt:
            raise Exception("No prompt was provided for the agent")
        await agent.update_instructions(prompt)
        print("=== AGENT READY ===")
        language = metadata.get("language", "en-US")
        tts_model = metadata.get("tts_model", "sonic-2")
        tts_voice = metadata.get("tts_voice", "f786b574-daa5-4673-aa0c-cbe3e8534c02")
        session = AgentSession(
            agent=agent,
            stt=deepgram.STT(model="nova-3", language=language),
            llm=openai.LLM(model="gpt-4o-mini"),
            tts=cartesia.TTS(model=tts_model, voice=tts_voice, language=language),
            vad=silero.VAD.load(),
            turn_detection=MultilingualModel(),
            use_tts_aligned_transcript=True
        )
        await session.start(
            room=ctx.room,
            room_input_options=RoomInputOptions(
                # - For telephony applications, use `BVCTelephony` for best results
                noise_cancellation=noise_cancellation.BVC(),
            ),
        )

    finally:
        await lk_api.aclose()


if __name__ == "__main__":
    agents.cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))
t
Hey there! Your updated implementation looks spot on. You’ve done a great job integrating the
transcription_node
method into the
Assistant
class and passing it along to the
AgentSession
. Plus, setting
use_tts_aligned_transcript=True
means transcription processing is all set and ready to go. Just a heads-up—give the workflow a test run to make sure the
transcription_node
method gets called and the text changes (like swapping out "😘" for a space) happen just right. If anything feels off or you hit a snag, don’t hesitate to reach out—I’m here to help you figure it out!
a
this works. how do I access the word-level timestamps for both the user and the agent though
t
Here’s how you can get word-level timestamps for both the user and the agent: 1. *Agent's Word-Level Timestamps:*`async def transcription_node(self, text: AsyncIterable[str | TimedString], model_settings: ModelSettings) -> AsyncGenerator[str | TimedString, None]: async for chunk in text: if isinstance(chunk, TimedString): logger.info(f"Word: '{chunk.text}' (Start: {chunk.start_time}, End: {chunk.end_time})") yield chunk` ◦ Make sure you have
use_tts_aligned_transcript=True
set in your
AgentSession
configuration. ◦ In the
transcription_node
method, you’ll want to handle
TimedString
objects—they come with
start_time
and
end_time
for each word. Here’s a quick example: 1. User's Word-Level Timestamps: ◦ Choose an STT provider that supports word-level timestamps—Sarvam is a great option. ◦ You can then get the user’s transcriptions with timestamps directly from the STT results. If you want me to walk you through any part of this or need more examples, just let me know—I’m happy to help!
a
deepgram supports word-level timestamps, is it a setting?
t
Hey there! To get word-level timestamps working in Deepgram STT for LiveKit, just set
interim_results=True
when you initialize the Deepgram STT plugin. This way, you’ll get real-time transcription updates along with timestamps for each word—super handy! Here’s a quick example to show you how:
Copy code
from livekit.plugins.deepgram import stt

deepgram_stt = stt.STT(
    model="nova-2-general",
    interim_results=True,  # This turns on word-level timestamps
    punctuate=True,
    filler_words=True,
    language="en-US",
)
Then, inside your agent’s
transcription_node
method, you can handle the transcription stream to grab the
start_time
and
end_time
for each word like this:
Copy code
async def transcription_node(self, text: AsyncIterable[str | TimedString], model_settings: ModelSettings) -> AsyncGenerator[str | TimedString, None]:
    async for chunk in text:
        if isinstance(chunk, TimedString):
            <http://logger.info|logger.info>(f"Word: '{chunk.text}' (Start: {chunk.start_time}, End: {chunk.end_time})")
        yield chunk
If you have any questions or want me to walk you through it, just give me a shout—I’m here to help! Sources: livekit-plugins/livekit-plugins-elevenlabs/livekit/plugins/elevenlabs/tts.p... | Text and transcriptions | LiveKit Docs