Here is the code.
#!/usr/bin/env python3
import asyncio
import json
import logging
import os
import time
import aiohttp
from typing import Optional
from livekit.agents import (
Agent,
AgentSession,
JobContext,
WorkerOptions,
AutoSubscribe,
cli,
)
from livekit.plugins import openai, silero
# Configure minimal logging
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger("interview-agent")
logger.setLevel(
logging.INFO)
# Temporarily enable more detailed LiveKit logging for debugging
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("livekit").setLevel(
logging.INFO) # Enable for debugging
logging.getLogger("livekit.agents").setLevel(
logging.INFO) # Enable for debugging
async def send_transcript_to_server(interview_id: str, segments: list, participant_identity: str = None):
"""Send transcription data to NestJS server"""
try:
# Get server URL from environment or use default
# server_url = os.getenv("NESTJS_SERVER_URL", "
http://localhost:3000")
data = {
"interviewId": interview_id,
"participant": {"identity": participant_identity} if participant_identity else None,
"segments": segments
}
async with aiohttp.ClientSession() as session:
async with
session.post(f"{server_url}/api/interview/transcription", json=data) as response:
if response.status in [200, 201]: # Both 200 OK and 201 Created are success
logger.debug(f"✅ Sent transcript for interview {interview_id} (status: {response.status})")
else:
logger.error(f"❌ Failed to send transcript: {response.status}")
except Exception as e:
logger.error(f"❌ Error sending transcript to server: {e}")
class InterviewAgent(Agent):
"""Minimal interview agent focused purely on conversation"""
def __init__(self, questions: list, candidate_name: str = "candidate", job_title: str = "position"):
# Build question list for instructions
questions_text = "\n".join(f"{i+1}. {q.get('content', str(q))}" for i, q in enumerate(questions))
instructions = f"""You are a professional AI interviewer conducting an interview for {job_title}.
INTERVIEW PROCESS:
1. Greet {candidate_name} warmly and explain you'll be conducting their interview
2. Ask the questions below one at a time, in order
3. Wait for complete answers before moving to next question
4. Keep conversation natural and professional
5. When all questions are answered, thank them and conclude the interview
QUESTIONS TO ASK:
{questions_text}
GUIDELINES:
- Speak with an Australian accent
- Be encouraging and professional
- Ask follow-up questions if answers need clarification
- Keep responses concise and clear
- Maintain natural conversation flow
- Ensure every word you speak is captured for transcription"""
super().__init__(instructions=instructions)
async def entrypoint(ctx: JobContext):
"""Minimal entrypoint - just join room and start interview"""
try:
logger.info(f"🎤 Interview agent joining room: {ctx.room.name}")
# Connect to the room and subscribe to ALL remote tracks (critical for hearing user audio)
await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL)
# Wait for at least one participant to ensure audio tracks are available
try:
participant = await ctx.wait_for_participant()
logger.info(f"👤 Participant connected: {getattr(participant, 'identity', 'unknown')}")
except Exception as e:
logger.warning(f"⏳ No participant seen within 10s; continuing anyway... ({e})")
# Extract interview context from room metadata
metadata = {}
questions = []
candidate_name = "candidate"
job_title = "position"
try:
if ctx.room.metadata:
metadata = json.loads(ctx.room.metadata)
questions = metadata.get("questions", [])
candidate_name = metadata.get("candidateName", "candidate")
job_title = metadata.get("jobTitle", "position")
logger.info(f"📋 Interview setup: {len(questions)} questions for {candidate_name} - {job_title}")
else:
logger.warning("⚠️ No room metadata found - using defaults")
except (json.JSONDecodeError, Exception) as e:
logger.error(f"❌ Error parsing room metadata: {e}")
questions = [{"content": "Tell me about yourself and your experience."}]
# Ensure we have at least one question
if not questions:
questions = [{"content": "Tell me about yourself and your experience for this role."}]
logger.warning("📝 No questions found - using default question")
# Create the interview agent
agent = InterviewAgent(
questions=questions,
candidate_name=candidate_name,
job_title=job_title
)
# Extract interview ID from room name for transcription
interview_id = ctx.room.name.replace("interview-", "") if ctx.room.name.startswith("interview-") else ctx.room.name
# Create agent session with OpenAI Realtime
session = AgentSession(
llm=openai.realtime.RealtimeModel(
model="gpt-4o-mini-realtime-preview-2024-12-17",
voice="alloy", # Clear, professional voice
temperature=0.6, # Keep responses focused and consistent (min 0.6 for realtime)
),
vad=silero.VAD.load(), # Voice activity detection
)
# Set up session event handlers for transcription (synchronous callbacks with async tasks)
@session.on("user_speech_committed")
def on_user_speech_committed(msg):
"""Handle committed user speech transcripts"""
async def async_handler():
try:
logger.info(f"📝 User speech committed: {msg}")
# Send user transcript to server
segments = [{"text": str(msg), "timestamp": time.time()}]
await send_transcript_to_server(interview_id, segments, "user")
except Exception as e:
logger.error(f"❌ Error handling user speech: {e}")
# Create async task from synchronous callback
asyncio.create_task(async_handler())
@session.on("agent_speech_committed")
def on_agent_speech_committed(msg):
"""Handle committed agent speech"""
async def async_handler():
try:
logger.info(f"🤖 Agent speech committed: {msg}")
# Send agent transcript to server
segments = [{"text": str(msg), "timestamp": time.time()}]
await send_transcript_to_server(interview_id, segments, "agent")
except Exception as e:
logger.error(f"❌ Error handling agent speech: {e}")
# Create async task from synchronous callback
asyncio.create_task(async_handler())
@session.on("conversation_item_added")
def on_conversation_item_added(item):
"""Handle conversation items for additional transcript capture"""
async def async_handler():
try:
logger.info(f"💬 Conversation item added: {type(item)}")
# Extract content from conversation item (similar to backup code pattern)
speaker = "unknown"
content = ""
if hasattr(item, 'item'):
chat_message = item.item
if hasattr(chat_message, 'role'):
role = chat_message.role
speaker = "agent" if role == 'assistant' else "user"
if hasattr(chat_message, 'content'):
if isinstance(chat_message.content, list):
content = ' '.join(str(part) for part in chat_message.content if part)
else:
content = str(chat_message.content)
if content and content.strip():
logger.info(f"📝 Conversation item: {speaker} - {content[:50]}...")
segments = [{"text": content.strip(), "timestamp": time.time()}]
await send_transcript_to_server(interview_id, segments, speaker)
except Exception as e:
logger.error(f"❌ Error handling conversation item: {e}")
# Create async task from synchronous callback
asyncio.create_task(async_handler())
# Start the interview session
logger.info("🚀 Starting interview session...")
await session.start(agent=agent, room=ctx.room)
# Quick diagnostic probe to verify audio subscription
async def probe_audio_tracks():
try:
await asyncio.sleep(1) # Let tracks settle
remote_participants = getattr(ctx.room, "remote_participants", {}) or {}
logger.info(f"🔎 Remote participants after session start: {list(remote_participants.keys())}")
for pid, participant in remote_participants.items():
audio_tracks = [t for t in getattr(participant, "track_publications", {}).values()
if getattr(t, "kind", None) == "audio"]
logger.info(f"🎤 Participant {pid} audio tracks: {len(audio_tracks)}")
except Exception as e:
logger.warning(f"🔎 Audio track probe failed: {e}")
asyncio.create_task(probe_audio_tracks())
# Generate opening greeting
await session.generate_reply(
instructions="Greet the candidate professionally, introduce yourself as their AI interviewer, and begin the interview process."
)
logger.info("✅ Interview agent active and ready")
# Keep session alive until room closes or participant leaves
# The agent will automatically handle the conversation flow
# No manual monitoring needed - LiveKit handles everything
except Exception as e:
logger.error(f"❌ Interview agent error: {e}")
raise
if
name == "__main__":
"""Worker entry point"""
# Load environment variables
from dotenv import load_dotenv
load_dotenv()
# Validate required environment variables
required_vars = ["LIVEKIT_WS_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "OPENAI_API_KEY"]
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
logger.error(f"❌ Missing required environment variables: {missing_vars}")
exit(1)
logger.info("🏁 Starting minimal interview agent worker...")
# Create worker options
options = WorkerOptions(
entrypoint_fnc=entrypoint,
ws_url=os.getenv('LIVEKIT_WS_URL'),
api_key=os.getenv('LIVEKIT_API_KEY'),
api_secret=os.getenv('LIVEKIT_API_SECRET'),
)
logger.info("🔄 Worker ready for interview dispatch...")
# Start the worker
cli.run_app(options)