how is this code? ```from __future__ import annota...
# ask-ai
b
how is this code?
Copy code
from __future__ import annotations
import asyncio
import contextlib
import json
import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional, Set

from dotenv import load_dotenv
from livekit import api, rtc
from livekit.agents import (Agent, AgentSession, JobContext, JobProcess, RunContext, WorkerOptions, RoomInputOptions, function_tool,
                            cli, BackgroundAudioPlayer, AudioConfig, BuiltinAudioClip, get_job_context,)
from livekit.plugins import deepgram, openai, cartesia, silero
from livekit.plugins.noise_cancellation import BVCTelephony
from supabase import create_client, Client
from metadata import JobMetadata

# ─────────────────────────── Setup ───────────────────────────
load_dotenv(".env.local")
logger = logging.getLogger("outbound-caller")
logging.basicConfig(level=<http://logging.INFO|logging.INFO>)

# ─────────────────────── Utilities / helpers ───────────────────────
def build_expected_recording_url(
    *,
    bucket_public_base: Optional[str],
    bucket_name: Optional[str],
    prefix: Optional[str],
    room_name: str,
    filename: Optional[str] = None,
) -> Optional[str]:
    """Best-effort URL for LiveKit auto-egress file in Supabase storage."""
    if not (bucket_public_base and bucket_name):
        return None
    fname = filename or f"{room_name}.mp3"
    parts = [bucket_public_base.rstrip("/"), bucket_name.strip("/")]
    if prefix:
        parts.append(prefix.strip("/"))
    parts.append(fname)
    return "/".join(parts)


def transcript_with_timestamps(history_dict: Dict[str, Any]) -> str:
    """Readable transcript with [HH:MM:SS] timestamps per message."""
    lines: List[str] = []
    msgs = history_dict.get("messages") or []
    for m in msgs:
        role = (m.get("role") or "unknown").upper()
        ts = m.get("timestamp") or ""
        try:
            ts_obj = datetime.fromisoformat(ts.replace("Z", "+00:00"))
            ts_str = ts_obj.strftime("[%H:%M:%S]")
        except Exception:
            ts_str = ""
        content = m.get("content")
        text = ""
        if isinstance(content, list):
            chunks = []
            for c in content:
                if isinstance(c, dict):
                    chunks.append(c.get("text") or c.get("transcript") or c.get("content", ""))
            text = " ".join(chunks).strip()
        else:
            text = str(content or "").strip()
        if text:
            lines.append(f"{ts_str} {role}: {text}")
    return "\n".join(lines)


# ───────────────────────────── Agent ────────────────────────────────
class OutboundCaller(Agent):
    """LLM drives conversation. Greeting is embedded in system instructions."""

    def __init__(self, *, instructions: str, vm_text: Optional[str]):
        super().__init__(instructions=instructions)
        self.greeted_participants: Set[str] = set()
        self.participant: rtc.RemoteParticipant | None = None
        self.vm_text = vm_text  # already personalized by metadata.py

    def set_participant(self, participant: rtc.RemoteParticipant):
        pid = getattr(participant, "identity", None)
        if pid and pid not in self.greeted_participants:
            self.greeted_participants.add(pid)
            <http://logger.info|logger.info>(f"New participant: {pid}")
        self.participant = participant

    # ── LLM tools
    @function_tool()
    async def end_call(self, ctx: RunContext):
        """End the call immediately after a polite goodbye."""
        jc = get_job_context()
        with contextlib.suppress(Exception):
            await jc.api.room.delete_room(api.DeleteRoomRequest(room=jc.room.name))

    @function_tool()
    async def drop_voicemail_or_hang(self, ctx: RunContext):
        """
        If voicemail is detected:
          - If vm_text provided in metadata, say it once (non-interruptible), then hang up.
          - Otherwise just hang up.
        """
        jc = get_job_context()
        if self.vm_text:
            try:
                handle = await ctx.session.say(self.vm_text, allow_interruptions=False)
                with contextlib.suppress(Exception):
                    await handle.wait_for_playout()
            except Exception as e:
                logger.warning(f"VM playback failed: {e}")
        with contextlib.suppress(Exception):
            await jc.api.room.delete_room(api.DeleteRoomRequest(room=jc.room.name))


# ──────────────────────────  Pre-warm VAD  ──────────────────────────
def prewarm(proc: JobProcess):
    proc.userdata["vad"] = silero.VAD.load()


# ─────────────────────────── Entry point ────────────────────────────
async def entrypoint(ctx: JobContext):
    """
    Expects metadata created by JobMetadata (see metadata.py).
    Auto-Egress should be configured when creating the room.
    """
    session: AgentSession | None = None
    supabase: Optional[Client] = None
    recording_url: Optional[str] = None
    call_status = "completed"

    try:
        # 0) Parse metadata & env
        meta = JobMetadata(**json.loads(ctx.job.metadata or "{}"))
        os.environ.update(
            {
                "DEEPGRAM_API_KEY": meta.deepgram_api_key,
                "CARTESIA_API_KEY": meta.cartesia_api_key,
                "OPENAI_API_KEY": meta.openai_api_key,
            }
        )

        # 1) Compose LLM instructions (greeting injected by metadata.py)
        final_prompt = meta.get_llm_prompt()
        vm_text = meta.get_vm_text()

        agent = OutboundCaller(instructions=final_prompt, vm_text=vm_text)

        # 2) Session config — LLM should start speaking greeting right away
        session = AgentSession(
            vad=ctx.proc.userdata.get("vad"),
            llm=openai.LLM(model="gpt-4o"),
            stt=deepgram.STT(model="nova-3", interim_results=True, language="multi"),
            tts=cartesia.TTS(model="sonic-2", voice="694f9389-aac1-45b6-b726-9d9369183238"),
            turn_detection="stt",
            #preemptive_generation=True,   # let LLM start (greeting is in the system prompt)
            allow_interruptions=True,
        )

        # (optional) logging user turns
        @session.on("user_input_transcribed")
        def _log_turn(ev):
            if getattr(ev, "is_final", False):
                <http://logger.info|logger.info>(f"User said: {getattr(ev, 'transcript', '')}")

        # 3) ambience
        background_audio = BackgroundAudioPlayer(
            ambient_sound=AudioConfig(BuiltinAudioClip.OFFICE_AMBIENCE, volume=0.8),
            thinking_sound=[
                AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING, volume=0.8),
                AudioConfig(BuiltinAudioClip.KEYBOARD_TYPING2, volume=0.7),
            ],
        )

        # 4) Start session & dial
        session_started = asyncio.create_task(
            session.start(
                agent=agent,
                room=ctx.room,
                room_input_options=RoomInputOptions(noise_cancellation=BVCTelephony()),
            )
        )

        await ctx.api.sip.create_sip_participant(
            api.CreateSIPParticipantRequest(
                room_name=ctx.room.name,
                sip_trunk_id=meta.sip_outbound_trunk_id,
                sip_call_to=meta.phone_number,
                participant_identity=meta.phone_number,
                wait_until_answered=True,
            )
        )

        await session_started
        participant = await ctx.wait_for_participant(identity=meta.phone_number)
        agent.set_participant(participant)

        await background_audio.start(room=ctx.room, agent_session=session)

        # 5) Supabase client (optional)
        if create_client and meta.supabase_url and meta.supabase_key:
            supabase = create_client(meta.supabase_url, meta.supabase_key)
        else:
            if not create_client:
                <http://logger.info|logger.info>("Supabase SDK not installed; skipping DB logging.")
            elif not (meta.supabase_url and meta.supabase_key):
                <http://logger.info|logger.info>("Supabase creds not provided; skipping DB logging.")

        # 6) Compute expected recording URL (auto-egress best-effort)
        recording_url = build_expected_recording_url(
            bucket_public_base=meta.storage_public_base,
            bucket_name=meta.storage_bucket,
            prefix=meta.storage_prefix,
            room_name=ctx.room.name,
        )

    except Exception as exc:
        call_status = "failed"
        logger.exception(f"fatal error: {exc}")
        with contextlib.suppress(Exception):
            await ctx.api.room.delete_room(api.DeleteRoomRequest(room=ctx.room.name))

    finally:
        # 7) Harvest transcript
        transcript_text = ""
        full_history: Dict[str, Any] | None = None
        if session and getattr(session, "history", None):
            try:
                full_history = session.history.to_dict()
            except AttributeError:
                with contextlib.suppress(Exception):
                    full_history = json.loads(session.history.to_json(indent=2))
            if full_history:
                transcript_text = transcript_with_timestamps(full_history)
                print("\n--- Transcript ---")
                print(transcript_text)
                print("--- End Transcript ---\n")

        # 8) Insert call log (best-effort)
        if supabase:
            try:
                payload = {
                    "call_id": meta.call_id,
                    "phone": meta.phone_number,
                    "status": call_status,
                    "recording_url": recording_url,
                    "transcript": transcript_text or None,
                    "full_transcript": full_history or None,  # JSON column recommended
                }
                table = meta.supabase_table or "call_logs"
                # res = supabase.table(table).insert(payload).execute()
                #<http://logger.info|logger.info>(f"Supabase insert OK: {res}")
                <http://logger.info|logger.info>(f"Supabase insert OK: ")
                <http://logger.info|logger.info>(f"LLM Instructions is: {meta.get_llm_prompt()}" )
            except Exception as e:
                logger.warning(f"Supabase insert failed: {e}")


# ─────────────────────────── CLI runner ────────────────────────────
if __name__ == "__main__":
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=entrypoint,
            agent_name="outbound-caller",
            prewarm_fnc=prewarm,
        )
    )
t
Thanks for reaching out, Abhijeet. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
b
pls analyze if this is build as per livekit best practices?
t
Hey there! Just wanted to share how the code is doing a great job following some of LiveKit's best practices for building agents and working with SIP functionality: • SIP Trunking: It’s using SIP trunks for outbound calls, which is definitely the way to go for telephony integration. • Agent Setup: The structure is spot on with LiveKit agents, including using
AgentSession
and AI services for STT, TTS, and LLM. • Call Lifecycle Management: The code handles call states really well, covering voicemail and wrapping up calls smoothly. • Environment Variables: Sensitive info like API keys are kept safe using environment variables, which is awesome. That said, there are a few spots where we could polish things up to match LiveKit’s best practices even better: • Call Metrics: Adding things like call duration and success rates could give you a clearer picture when monitoring. • Advanced Call Routing: Introducing more complex routing, like fallback numbers, would make the system even more reliable. • Error Handling: The code does include error handling, but adding checks for edge cases like invalid phone numbers or network hiccups could help avoid surprises. • Documentation: Making sure the code is well-documented will make future maintenance and scaling a breeze. If you want, I’m happy to dive deeper or offer specific suggestions—just let me know! Sources: Making calls using SIP | LiveKit Docs | LiveKit Agents | LiveKit Docs | SIP Integration | livekit/livekit_composite | DeepWiki | Integrating with Telephony | LiveKit Docs | telephony/sip_lifecycle.py
b
i would love to hear about call metrics
t
Hey there! If you want to keep an eye on call metrics in LiveKit, you can easily use the built-in Prometheus-based metrics system. Here are some important metrics you might want to track: • Call Duration: The
durCall
histogram helps you measure how long calls last. It’s super handy for spotting those longer calls. • Success Rates: You can figure out success rates by looking at counters like
inviteAccept
(for successful calls) and
inviteError
(for failed ones). Just use this formula:
(InviteAccepted / (InviteAccepted + InviteError)) * 100
. • Performance Metrics: Keep an eye on RTP packet counts and errors with
packetsRTP
to get a feel for call quality and reliability. Here’s how you can get started: 1. First, initialize the
Monitor
struct in your app to register those metrics with Prometheus. 2. Make sure Prometheus is set up to scrape the metrics endpoint. 3. If you want, you can even extend the
Monitor
struct to add your own custom metrics. If you’d like me to walk you through any part or share some examples, just let me know—I’m happy to help! Sources: pkg/stats/monitor.go | pkg/telemetry/prometheus/rooms.go