i want to understand the codebase of the livekit a...
# ask-ai
r
i want to understand the codebase of the livekit agent sdk can you help me how to understand fast and in depth how it handles the large volume and orchestration of the voice pipeline and also connecting sip
t
Thanks for reaching out, John. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
Hey there! The LiveKit Agent SDK is built to handle a ton of calls smoothly and manage the voice pipeline and SIP connections with ease. Here’s a quick rundown of how it works: • Telephony Stack: It can handle thousands of calls at once, with cool features like HD audio, DTMF support, and noise cancellation to keep things crystal clear. • Voice Pipeline Orchestration: Tools like
AgentActivity
help coordinate audio processing, LLM inference, and speech generation all in real-time. • Scalability: Thanks to the Cloud Egress orchestrator and some handy open-source tools, it manages workloads efficiently so nothing gets overwhelmed. • SIP Integration: It supports both inbound and outbound calls, with customizable SIP trunks and dispatch rules to keep your telephony apps running smoothly. • LLM and Media Processing: Real-time speech-to-text, language model processing, and text-to-speech are all part of the package. • WebRTC: This keeps your communication secure and reliable, even if the network isn’t the best. All these features make the SDK a great fit for large-scale voice AI projects. If you want to dive deeper into any part or have questions, just let me know—I’m here to help! Sources: An open source stack for real-time multimodal AI | Voice Assistants | livekit/livekit_composite | DeepWiki | LiveKit Docs | Building the all-in-one platform for voice AI agents | Agents Core Architecture | livekit/livekit_composite | DeepWiki
r
i want to understand the livekit server and workers how actually it exchanges and audio and manages the room in detail with the code
t
Let me walk you through how the LiveKit server and workers handle rooms and audio exchange in a clear and organized way: • Room Creation: The server sets up a
Room
instance for each session, taking care of participants and their media tracks. The
trackManager
is there to manage track publication and subscription smoothly. • Worker Dispatch: Workers get assigned to rooms to run agents. These agents join rooms and subscribe to audio tracks using
RoomCallback
, keeping everything connected. • Audio Exchange: Using WebRTC and an SFU (Selective Forwarding Unit), real-time media routing happens seamlessly. Participants publish their tracks, and others subscribe to them for smooth audio exchange. • Code-Level Details:
RoomInner
in
<http://room.rs|room.rs>
manages data and event transmission through channels like
data_tx
and
transcription_tx
, keeping communication flowing. ◦
FfiRoom
and
FfiTrack
wrap up handles and track info to safely manage resources behind the scenes. • Concurrency: To keep everything thread-safe, mutexes and channels are used—like
track_handle_lookup
in
<http://room.rs|room.rs>
, which uses
Arc<Mutex<HashMap>>
. This helps avoid any hiccups when multiple threads are involved. These parts all work together to make sure room management and audio exchange are efficient and reliable. If you'd like, I’m happy to dive deeper into any specific components or share some code examples—just let me know!
r
async def run(self) -> None: if not self._closed: raise Exception("worker is already running") logger.info( "starting worker", extra={"version": version, "rtc-version": rtc.version}, ) if self._opts.multiprocessing_context == "forkserver": plugin_packages = [p.package for p in Plugin.registered_plugins] + ["av"] logger.info("preloading plugins", extra={"packages": plugin_packages}) self._mp_ctx.set_forkserver_preload(plugin_packages) if self._inference_executor is not None: logger.info("starting inference executor") await self._inference_executor.start() await self._inference_executor.initialize() self._closed = False def _update_job_status(proc: ipc.job_executor.JobExecutor) -> None: t = self._loop.create_task(self._update_job_status(proc)) self._tasks.add(t) t.add_done_callback(self._tasks.discard) await self._http_server.start() if self._prometheus_server: await self._prometheus_server.start() self._proc_pool.on("process_started", _update_job_status) self._proc_pool.on("process_closed", _update_job_status) self._proc_pool.on("process_job_launched", _update_job_status) await self._proc_pool.start() self._http_session = aiohttp.ClientSession(proxy=self._opts.http_proxy or None) self._api = api.LiveKitAPI( self._opts.ws_url, self._opts.api_key, self._opts.api_secret, session=self._http_session ) self._close_future = asyncio.Future(loop=self._loop) @utils.log_exceptions(logger=logger) async def _load_task() -> None: """periodically check load""" interval = utils.aio.interval(UPDATE_LOAD_INTERVAL) while True: await interval.tick() def load_fnc() -> float: signature = inspect.signature(self._opts.load_fnc) parameters = list(signature.parameters.values()) if len(parameters) == 0: return self._opts.load_fnc() # type: ignore return self._opts.load_fnc(self) # type: ignore self._worker_load = await asyncio.get_event_loop().run_in_executor(None, load_fnc) load_threshold = _WorkerEnvOption.getvalue(self._opts.load_threshold, self._devmode) default_num_idle_processes = _WorkerEnvOption.getvalue( self._opts.num_idle_processes, self._devmode ) # print('================== checking the worker load =========================') # print(load_threshold) if not math.isinf(load_threshold): active_jobs = len(self.active_jobs) print(active_jobs) if active_jobs > 0: job_load = self._worker_load / len(self.active_jobs) if job_load > 0.0: available_load = max(load_threshold - self._worker_load, 0.0) print(available_load) available_job = min( math.ceil(available_load / job_load), default_num_idle_processes ) print(available_job) self._proc_pool.set_target_idle_processes(available_job) else: self._proc_pool.set_target_idle_processes(default_num_idle_processes) tasks = [] self._load_task = asyncio.create_task(_load_task(), name="load_task") tasks.append(self._load_task) # Only register with LiveKit if not using custom triggering if self._register: print('===================== checking the worker can register ==============================') self._conn_task = asyncio.create_task(self._connection_task(), name="worker_conn_task") tasks.append(self._conn_task) # print('===================== using custom trigger==============================') # # Always add custom trigger task (it will handle the logic internally) # self._custom_trigger_task = asyncio.create_task(self._custom_trigger_task(), name="custom_trigger_task") # tasks.append(self._custom_trigger_task) self.emit("worker_started") try: await asyncio.gather(*tasks) finally: await utils.aio.cancel_and_wait(*tasks) if not self._close_future.done(): self._close_future.set_result(None) what this does the and how in the sip lifecycle when inbound call is made this is the entrypoint for the starting worker what other things it triggers or keep in listening mode when call is recieved?
t
The
run
method in the LiveKit worker takes care of a bunch of important setup steps to get things rolling: • It checks if the worker is already running and logs some startup info. • If you're using the forkserver multiprocessing context, it preloads plugin packages. • It initializes the inference executor if that's part of your setup. • It kicks off the HTTP server and the Prometheus server (if you've got that enabled). • It sets up event handlers in the process pool to keep track of job status updates. • And it initializes the HTTP client session to handle API communication smoothly. That said, I don’t have the exact details on how this method deals with SIP lifecycle events, like inbound calls. If you want, I’m here to help dig into SIP-related workflows or point you toward the right components. Just let me know, and we can figure it out together! Sources: livekit-agents/livekit/agents/worker.py
r
yes what this worker triggers that is used in the sip call lifecycle
t
I don't have the answer you're looking for. You could also try asking your question: • in one of the other Slack channels or • to https://deepwiki.com/livekit/livekit_composite which is trained on all LiveKit source code If you find the answer, please post it here to help others!
r
can you explain how the sip call can be disconnected from the agent_session?