clean-lamp-24122
05/13/2025, 6:59 AMtall-airline-16756
05/13/2025, 6:59 AMtall-airline-16756
05/13/2025, 6:59 AMclean-lamp-24122
05/13/2025, 7:01 AMtall-airline-16756
05/13/2025, 7:01 AMclean-lamp-24122
05/13/2025, 7:02 AMtall-airline-16756
05/13/2025, 7:02 AMclean-lamp-24122
05/13/2025, 7:09 AMtall-airline-16756
05/13/2025, 7:10 AMclean-lamp-24122
05/13/2025, 7:29 AMtall-airline-16756
05/13/2025, 7:29 AMclean-lamp-24122
05/13/2025, 7:31 AMtall-airline-16756
05/13/2025, 7:32 AMfrom livekit.agents import JobProcess, JobContext, AgentSession, WorkerOptions, cli, metrics
from livekit.plugins import silero
from livekit.agents.voice import MetricsCollectedEvent
def prewarm(proc: JobProcess):
# Preload the VAD model to improve startup time
proc.userdata["vad"] = silero.VAD.load()
async def entrypoint(ctx: JobContext):
# Set up logging context
ctx.log_context_fields = {
"room": ctx.room.name,
}
await ctx.connect()
# Create session using prewarmed VAD
session = AgentSession(
vad=ctx.proc.userdata["vad"],
# Add other components like llm, stt, tts as needed
)
# Set up metrics collection
usage_collector = metrics.UsageCollector()
@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
metrics.log_metrics(ev.metrics)
usage_collector.collect(ev.metrics)
async def log_usage():
summary = usage_collector.get_summary()
<http://logger.info|logger.info>(f"Usage: {summary}")
# Register shutdown callback to log final usage
ctx.add_shutdown_callback(log_usage)
# Rest of your agent code...
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))
This implementation:
• Uses a prewarm function to load the Silero VAD model in advance
• Sets up a metrics collector to track usage
• Registers an event handler for metrics collection
• Adds a shutdown callback to log the final usage summary when the session ends
Let me know if you need further assistance with this!
Sources: Tree View: | dse-workbenchclean-lamp-24122
05/13/2025, 7:34 AMtall-airline-16756
05/13/2025, 7:34 AMload_fnc
parameter in WorkerOptions
along with LiveKit's CPU monitoring utilities. Here's an example implementation:
from livekit.agents.utils.hw import get_cpu_monitor
# Get the appropriate CPU monitor
cpu_monitor = get_cpu_monitor()
# Define a load function that reports CPU usage
def load_fnc():
# Returns CPU usage as a value between 0 and 1
return cpu_monitor.cpu_percent()
# Create worker options with the load function
opts = WorkerOptions(
entrypoint_fnc=entrypoint,
load_fnc=load_fnc,
load_threshold=0.8, # Stop accepting new jobs when CPU usage is above 80%
)
You can also use cpu_monitor.cpu_count()
to get the number of available CPU cores (or fractional cores in containerized environments).
This setup will allow you to monitor CPU usage and control job acceptance based on the load threshold. Let me know if you need further assistance!
Sources: Tree View: | dse-workbench | Tree View: | dse-workbench | Worker options | LiveKit Docsclean-lamp-24122
05/13/2025, 8:45 AMtall-airline-16756
05/13/2025, 8:46 AM"process is unresponsive, killing process"
• For initialization timeouts: "initialization timed out, killing process"
Additionally, LiveKit emits events for process lifecycle stages (e.g., process_created
, process_started
, process_ready
, process_closed
), which you can use to track process status. While these events do not specifically indicate the reason for termination, you can monitor logs or implement custom logic to handle such scenarios.
Let me know if you need further assistance!
Sources: Tree View: | dse-workbench | Tree View: | dse-workbenchclean-lamp-24122
05/13/2025, 1:05 PMtall-airline-16756
05/13/2025, 1:05 PMon_enter
, on_exit
, and on_user_turn_completed
in the context of LiveKit's Agents framework.
If you can provide more details about your specific use case, I can help you create a custom implementation using these hooks. Let me know how you'd like to proceed!clean-lamp-24122
05/13/2025, 1:08 PMtall-airline-16756
05/13/2025, 1:08 PMclean-lamp-24122
05/13/2025, 1:10 PMtall-airline-16756
05/13/2025, 1:10 PMclean-lamp-24122
05/13/2025, 1:13 PMtall-airline-16756
05/13/2025, 1:13 PMWorkerOptions
for your worker, including the entrypoint function and any load or prewarm functions if needed. For example:
from livekit.agents import WorkerOptions, cli
def entrypoint(ctx):
# Worker logic here
pass
opts = WorkerOptions(entrypoint_fnc=entrypoint)
3. *Run the First Worker:*Start the first worker instance by running the worker code. For example, if your worker script is named worker.py
, you can run it using:
python worker.py
4. *Run the Second Worker:*Start another instance of the same worker code. You can do this by opening a new terminal and running the same command:
`python worker.py`Each instance will act as a separate worker, and LiveKit will automatically manage the load balancing between them.
5. *Verify Worker Pool:*Check the logs or monitoring tools to ensure that both workers are running and are part of the same worker pool. LiveKit will distribute jobs across the workers based on their availability and capacity.
Let me know if you need further assistance with any of these steps!