Hello <@U07FRRZH2GN> I am facing a problem while r...
# ask-ai
t
Hello @icy-policeman-64898 I am facing a problem while running my bot in production. This is supposed to converse with a person and in development it is working fine capturing both Audio channels correctly + also the transcript but when i am running the same code in Production in AWS in a Fargate instance, the agent is unable to hear the voice of the person the AI agent is talking to. It is speaking and i am able to hear it but the agent is unable to hear the voice. I am also recording the audio and video of the person and that is correctly getting captures in S3.
t
Thanks for reaching out, Abhishek. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
To help get to the bottom of this, could you share a bit more about the following? • Are there any differences in environment variables, configurations, or dependencies between your development and production setups? • Have you had a chance to check the network settings in AWS Fargate, like security groups or firewall rules, to make sure the right ports for audio streaming are open? • Do you notice any differences in CPU or memory usage when comparing development and production environments?
t
Here is the code. #!/usr/bin/env python3 import asyncio import json import logging import os import time import aiohttp from typing import Optional from livekit.agents import ( Agent, AgentSession, JobContext, WorkerOptions, AutoSubscribe, cli, ) from livekit.plugins import openai, silero # Configure minimal logging logging.basicConfig(level=logging.WARNING) logger = logging.getLogger("interview-agent") logger.setLevel(logging.INFO) # Temporarily enable more detailed LiveKit logging for debugging logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("livekit").setLevel(logging.INFO) # Enable for debugging logging.getLogger("livekit.agents").setLevel(logging.INFO) # Enable for debugging async def send_transcript_to_server(interview_id: str, segments: list, participant_identity: str = None): """Send transcription data to NestJS server""" try: # Get server URL from environment or use default # server_url = os.getenv("NESTJS_SERVER_URL", "http://localhost:3000") data = { "interviewId": interview_id, "participant": {"identity": participant_identity} if participant_identity else None, "segments": segments } async with aiohttp.ClientSession() as session: async with session.post(f"{server_url}/api/interview/transcription", json=data) as response: if response.status in [200, 201]: # Both 200 OK and 201 Created are success logger.debug(f" Sent transcript for interview {interview_id} (status: {response.status})") else: logger.error(f" Failed to send transcript: {response.status}") except Exception as e: logger.error(f" Error sending transcript to server: {e}") class InterviewAgent(Agent): """Minimal interview agent focused purely on conversation""" def __init__(self, questions: list, candidate_name: str = "candidate", job_title: str = "position"): # Build question list for instructions questions_text = "\n".join(f"{i+1}. {q.get('content', str(q))}" for i, q in enumerate(questions)) instructions = f"""You are a professional AI interviewer conducting an interview for {job_title}. INTERVIEW PROCESS: 1. Greet {candidate_name} warmly and explain you'll be conducting their interview 2. Ask the questions below one at a time, in order 3. Wait for complete answers before moving to next question 4. Keep conversation natural and professional 5. When all questions are answered, thank them and conclude the interview QUESTIONS TO ASK: {questions_text} GUIDELINES: - Speak with an Australian accent - Be encouraging and professional - Ask follow-up questions if answers need clarification - Keep responses concise and clear - Maintain natural conversation flow - Ensure every word you speak is captured for transcription""" super().__init__(instructions=instructions) async def entrypoint(ctx: JobContext): """Minimal entrypoint - just join room and start interview""" try: logger.info(f"🎤 Interview agent joining room: {ctx.room.name}") # Connect to the room and subscribe to ALL remote tracks (critical for hearing user audio) await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL) # Wait for at least one participant to ensure audio tracks are available try: participant = await ctx.wait_for_participant() logger.info(f"👤 Participant connected: {getattr(participant, 'identity', 'unknown')}") except Exception as e: logger.warning(f" No participant seen within 10s; continuing anyway... ({e})") # Extract interview context from room metadata metadata = {} questions = [] candidate_name = "candidate" job_title = "position" try: if ctx.room.metadata: metadata = json.loads(ctx.room.metadata) questions = metadata.get("questions", []) candidate_name = metadata.get("candidateName", "candidate") job_title = metadata.get("jobTitle", "position") logger.info(f"📋 Interview setup: {len(questions)} questions for {candidate_name} - {job_title}") else: logger.warning("⚠️ No room metadata found - using defaults") except (json.JSONDecodeError, Exception) as e: logger.error(f" Error parsing room metadata: {e}") questions = [{"content": "Tell me about yourself and your experience."}] # Ensure we have at least one question if not questions: questions = [{"content": "Tell me about yourself and your experience for this role."}] logger.warning("📝 No questions found - using default question") # Create the interview agent agent = InterviewAgent( questions=questions, candidate_name=candidate_name, job_title=job_title ) # Extract interview ID from room name for transcription interview_id = ctx.room.name.replace("interview-", "") if ctx.room.name.startswith("interview-") else ctx.room.name # Create agent session with OpenAI Realtime session = AgentSession( llm=openai.realtime.RealtimeModel( model="gpt-4o-mini-realtime-preview-2024-12-17", voice="alloy", # Clear, professional voice temperature=0.6, # Keep responses focused and consistent (min 0.6 for realtime) ), vad=silero.VAD.load(), # Voice activity detection ) # Set up session event handlers for transcription (synchronous callbacks with async tasks) @session.on("user_speech_committed") def on_user_speech_committed(msg): """Handle committed user speech transcripts""" async def async_handler(): try: logger.info(f"📝 User speech committed: {msg}") # Send user transcript to server segments = [{"text": str(msg), "timestamp": time.time()}] await send_transcript_to_server(interview_id, segments, "user") except Exception as e: logger.error(f" Error handling user speech: {e}") # Create async task from synchronous callback asyncio.create_task(async_handler()) @session.on("agent_speech_committed") def on_agent_speech_committed(msg): """Handle committed agent speech""" async def async_handler(): try: logger.info(f"🤖 Agent speech committed: {msg}") # Send agent transcript to server segments = [{"text": str(msg), "timestamp": time.time()}] await send_transcript_to_server(interview_id, segments, "agent") except Exception as e: logger.error(f" Error handling agent speech: {e}") # Create async task from synchronous callback asyncio.create_task(async_handler()) @session.on("conversation_item_added") def on_conversation_item_added(item): """Handle conversation items for additional transcript capture""" async def async_handler(): try: logger.info(f"💬 Conversation item added: {type(item)}") # Extract content from conversation item (similar to backup code pattern) speaker = "unknown" content = "" if hasattr(item, 'item'): chat_message = item.item if hasattr(chat_message, 'role'): role = chat_message.role speaker = "agent" if role == 'assistant' else "user" if hasattr(chat_message, 'content'): if isinstance(chat_message.content, list): content = ' '.join(str(part) for part in chat_message.content if part) else: content = str(chat_message.content) if content and content.strip(): logger.info(f"📝 Conversation item: {speaker} - {content[:50]}...") segments = [{"text": content.strip(), "timestamp": time.time()}] await send_transcript_to_server(interview_id, segments, speaker) except Exception as e: logger.error(f" Error handling conversation item: {e}") # Create async task from synchronous callback asyncio.create_task(async_handler()) # Start the interview session logger.info("🚀 Starting interview session...") await session.start(agent=agent, room=ctx.room) # Quick diagnostic probe to verify audio subscription async def probe_audio_tracks(): try: await asyncio.sleep(1) # Let tracks settle remote_participants = getattr(ctx.room, "remote_participants", {}) or {} logger.info(f"🔎 Remote participants after session start: {list(remote_participants.keys())}") for pid, participant in remote_participants.items(): audio_tracks = [t for t in getattr(participant, "track_publications", {}).values() if getattr(t, "kind", None) == "audio"] logger.info(f"🎤 Participant {pid} audio tracks: {len(audio_tracks)}") except Exception as e: logger.warning(f"🔎 Audio track probe failed: {e}") asyncio.create_task(probe_audio_tracks()) # Generate opening greeting await session.generate_reply( instructions="Greet the candidate professionally, introduce yourself as their AI interviewer, and begin the interview process." ) logger.info(" Interview agent active and ready") # Keep session alive until room closes or participant leaves # The agent will automatically handle the conversation flow # No manual monitoring needed - LiveKit handles everything except Exception as e: logger.error(f" Interview agent error: {e}") raise if name == "__main__": """Worker entry point""" # Load environment variables from dotenv import load_dotenv load_dotenv() # Validate required environment variables required_vars = ["LIVEKIT_WS_URL", "LIVEKIT_API_KEY", "LIVEKIT_API_SECRET", "OPENAI_API_KEY"] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: logger.error(f" Missing required environment variables: {missing_vars}") exit(1) logger.info("🏁 Starting minimal interview agent worker...") # Create worker options options = WorkerOptions( entrypoint_fnc=entrypoint, ws_url=os.getenv('LIVEKIT_WS_URL'), api_key=os.getenv('LIVEKIT_API_KEY'), api_secret=os.getenv('LIVEKIT_API_SECRET'), ) logger.info("🔄 Worker ready for interview dispatch...") # Start the worker cli.run_app(options)
t
Thanks for sharing the code and the issue you’re facing! Here are a few things that might be causing the AI agent to miss user audio in production, along with some ideas on how to fix them: • Audio Track Publication: Make sure the audio tracks are properly published and subscribed to in your production setup. The
ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL)
line should take care of this, but it’s worth double-checking that the remote participants’ audio tracks are actually being detected and handled. • Environment Variables: It’s a good idea to double-check that all the necessary environment variables like
LIVEKIT_WS_URL
,
LIVEKIT_API_KEY
,
LIVEKIT_API_SECRET
, and
OPENAI_API_KEY
are correctly set up in your production environment. • Network Configuration: Make sure the required ports for LiveKit and related services are open in your AWS Fargate security groups. Sometimes network restrictions can block audio streams, so this is worth a look. • AudioSource Queue Size: If you’re using
AudioSource
for streaming, try increasing the
queue_size_ms
to 2000ms. This can help avoid buffer overflow issues. • Device Permissions: Check that your app has the right permissions to access microphones and audio devices in the production environment. • Logs and Debugging: Take a peek at the production logs for any errors related to audio track subscription, speech-to-text processing, or network connectivity — those clues can be really helpful. If you want, I’m here to help you dig deeper into any of these points or troubleshoot further. Just let me know! Sources: Understanding &amp;#34;silence detected on local audio track&amp;#34; warning | Camera &amp; microphone | LiveKit Docs | Resolving &amp;#34;InvalidState - failed to capture frame&amp;#34; errors in AudioSource | Deploying to production | LiveKit Docs | Voice Assistants | livekit/livekit_composite | DeepWiki