rhythmic-jordan-68259
09/14/2025, 1:19 PMI'd love to help you create a workflow with nodes and conditional connections in LiveKit! Here's a friendly approach you can try:
Define Multiple Agents or Tools: Think of creating separate agents or tools for each part of your workflow, like greeting the user first and then gathering their information.
Use Conditional Handoffs: You can smoothly transition between nodes (agents or tools) based on what the user says or their intent. For example, after saying hello, you can hand off to a tool that collects their details.
Manage State with User Data: Keeping track of user info and choices with a shared state object helps maintain continuity throughout the workflow.
Implement Conditional Branching: Use simple logic to decide the next step depending on user responses. Like, if the user shares their name, move forward; if not, gently ask again.
Use Pipeline Hooks: Customize what happens at different points using hooks like on_enter() and on_exit(). For instance, you can warmly greet the user when they enter the first node.
These steps can help you build a dynamic and thoughtful workflow with conditional logic. If you'd like, I'm here to provide a specific code example or guide you further—just let me know!
tall-airline-16756
09/14/2025, 1:19 PMtall-airline-16756
09/14/2025, 1:20 PMimport logging
from dotenv import load_dotenv
from dataclasses import dataclass, field
from typing import Dict, List
from enum import Enum
from pydantic import Field
from livekit.agents import JobContext, WorkerOptions, cli, AgentSession
from livekit.agents.llm import function_tool
from livekit.agents.voice import Agent
from livekit.plugins import deepgram, openai, cartesia, silero
load_dotenv()
logger = logging.getLogger("conditional-flow")
logger.setLevel(<http://logging.INFO|logging.INFO>)
# Define choices for each stage
class Stage1Choice(str, Enum):
OPTION_A = "A"
OPTION_B = "B"
class Stage2Choice(str, Enum):
OPTION_X = "X"
OPTION_Y = "Y"
@dataclass
class SurveyData:
responses: Dict[str, str] = field(default_factory=dict)
current_stage: int = 1
current_branch: str = ""
path_taken: List[str] = field(default_factory=list)
def record(self, question: str, answer: str):
self.responses[question] = answer
self.path_taken.append(f"Stage {self.current_stage} ('{self.current_branch}') - {question}: {answer}")
class BaseAgent(Agent):
def __init__(self, job_context: JobContext, instructions: str) -> None:
self.job_context = job_context
super().__init__(
instructions=instructions,
stt=deepgram.STT(),
llm=openai.LLM(model="gpt-4o"),
tts=cartesia.TTS(),
vad=silero.VAD.load()
)
# Stage 1: A or B
class Stage1Agent(BaseAgent):
def __init__(self, job_context: JobContext) -> None:
super().__init__(job_context=job_context, instructions="Choose Option A or B")
async def on_enter(self) -> None:
await self.session.say("Stage 1: Do you prefer Option A or Option B?")
@function_tool
async def collect_stage1(
self,
choice: Annotated[Stage1Choice, Field(description="A or B")]
) -> Agent:
sd: SurveyData = self.session.userdata
sd.record("Prefer A or B", choice.value)
sd.current_branch = choice.value
if choice == Stage1Choice.OPTION_A:
return Stage1ABranchAgent(job_context=self.job_context)
return Stage1BBranchAgent(job_context=self.job_context)
class Stage1ABranchAgent(BaseAgent):
async def on_enter(self) -> None:
await self.session.say("Why do you prefer Option A?")
@function_tool
async def collect_A(self, answer: str) -> Agent:
sd: SurveyData = self.session.userdata
sd.record("Why prefer A", answer)
sd.current_stage = 2
return Stage2Agent(job_context=self.job_context)
class Stage1BBranchAgent(BaseAgent):
async def on_enter(self) -> None:
await self.session.say("Why do you prefer Option B?")
@function_tool
async def collect_B(self, answer: str) -> Agent:
sd: SurveyData = self.session.userdata
sd.record("Why prefer B", answer)
sd.current_stage = 2
return Stage2Agent(job_context=self.job_context)
# Stage 2: X or Y
class Stage2Agent(BaseAgent):
def __init__(self, job_context: JobContext) -> None:
super().__init__(job_context=job_context, instructions="Choose Option X or Y")
async def on_enter(self) -> None:
await self.session.say("Stage 2: Do you prefer Option X or Option Y?")
@function_tool
async def collect_stage2(
self,
choice: Annotated[Stage2Choice, Field(description="X or Y")]
) -> Agent:
sd: SurveyData = self.session.userdata
sd.record("Prefer X or Y", choice.value)
sd.current_branch = choice.value
return SummaryAgent(job_context=self.job_context)
class SummaryAgent(BaseAgent):
async def on_enter(self) -> None:
sd: SurveyData = self.session.userdata
summary = "\n".join(sd.path_taken)
await self.session.say(f"Survey complete. Path taken: {summary}")
await self.session.aclose()
# Entrypoint
async def entrypoint(ctx: JobContext) -> None:
session = AgentSession()
session.userdata = SurveyData()
await session.start(agent=Stage1Agent(ctx), room=ctx.room)
if __name__ == "__main__":
cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
This example beautifully illustrates how you can build a flexible, multi-stage workflow that responds to user choices with conditional branching. If you have any questions or would like me to walk you through any part in more detail, please don’t hesitate to ask—I’m here to help!
Sources: livekit/livekit_composite | DeepWiki | flows/declarative_flow.py | Pipeline nodes and hooks | LiveKit Docs | LangChain integration guide | LiveKit Docsrhythmic-jordan-68259
09/14/2025, 8:29 PMtall-airline-16756
09/14/2025, 8:29 PMAgent
classes, like Stage1Agent
, Stage1ABranchAgent
, Stage2Agent
, and so on. Each node is responsible for a specific part of the interaction, such as asking a question or handling the user’s input.
• Edges: These are the connections or transitions between the nodes. They’re defined by the logic inside the @function_tool
methods. For example, in Stage1Agent
, the method collect_stage1
decides whether to move on to Stage1ABranchAgent
or Stage1BBranchAgent
, depending on what the user chooses.
So, to sum it up, nodes represent the different stages in your workflow, and edges are the conditional paths that link these stages based on user input or other logic. I hope this helps clarify things! If you have any more questions, feel free to ask—I’m here to help.rhythmic-jordan-68259
09/14/2025, 10:21 PMtall-airline-16756
09/14/2025, 10:23 PMAgentActivity
within an AgentSession
while another activity is still ongoing. This is actually a helpful safeguard designed to prevent any conflicts with resource usage and state management.
Here are a couple of friendly tips to help you resolve this:
• Make sure to properly close any existing agent activity before starting a new one. You can do this by calling the aclose
method on the current AgentActivity
instance.
• If you're using update_agent
to switch between agents, the framework takes care of closing the current activity and starting a new one automatically, so you don’t need to worry about manual closure.
Take a moment to review your workflow to ensure the agents and their activities are managed smoothly. If you’d like, I’m here to help you with more detailed guidance or any other questions you might have!
Sources: livekit-agents/livekit/agents/voice/agent.py | Agents Core Architecture | livekit/livekit_composite | DeepWikirhythmic-jordan-68259
09/14/2025, 10:26 PMtall-airline-16756
09/14/2025, 10:26 PMAgentSession
and how the transitions between agents are handled? That way, I can better understand what's going on and help you find the right solution.rhythmic-jordan-68259
09/14/2025, 10:28 PM@function_tool
async def route_to_topic(
self,
topic: Annotated[DynamicTopicChoice, Field(description="Choose the appropriate topic based on user intent")]
) -> Agent:
"""Route user to appropriate topic"""
<http://logger.info|logger.info>(f"Routing to topic: {topic.value}")
# Find target node for this topic
target_node_id = self.get_target_node_for_topic(topic)
if target_node_id:
# Update conversation data
cd: ConversationData = self.session.userdata
cd.current_node_id = target_node_id
cd.record_interaction(self.current_node['name'], f"Route to {topic.value}", "Routing")
# Create new agent for target node
<http://logger.info|logger.info>(f"Returning DynamicFlowAgent for node: {target_node_id}")
return DynamicFlowAgent(self.job_context, self.flow_config, target_node_id)
else:
# Fallback to current agent
<http://logger.info|logger.info>("Returning current agent")
return self
class DynamicFlowAgent(Agent):
"""Dynamic agent that adapts to flow configuration"""
def __init__(self, job_context: JobContext, flow_config: FlowConfig, current_node_id: str):
self.job_context = job_context
self.flow_config = flow_config
self.current_node_id = current_node_id
# Get current node
self.current_node = flow_config.nodes[current_node_id]
# Configure agent with flow settings
super().__init__(
instructions=flow_config.instructions,
stt=self._setup_stt(),
llm=self._setup_llm(),
tts=self._setup_tts(),
vad=silero.VAD.load()
)
def _setup_stt(self):
"""Setup STT from flow configuration"""
stt_config = self.flow_config.stt_settings
if stt_config['provider'] == 'deepgram':
return deepgram.STT(
model=stt_config.get('model', 'nova-2'),
language=stt_config.get('language', 'en-US'),
smart_format=stt_config.get('smart_format', True)
)
else:
raise ValueError(f"Unsupported STT provider: {stt_config['provider']}")
def _setup_llm(self):
"""Setup LLM from flow configuration"""
llm_config = self.flow_config.llm_settings
model = llm_config.get('model', 'gpt-4o-mini')
if model == 'gpt-4.1':
model = 'gpt-4o-mini'
if llm_config['provider'] == 'openai':
return openai.LLM(
model=model,
temperature=llm_config.get('temperature', 0.7)
)
else:
raise ValueError(f"Unsupported LLM provider: {llm_config['provider']}")
def _setup_tts(self):
"""Setup TTS from flow configuration"""
tts_config = self.flow_config.tts_settings
if tts_config['tts_provider'] == 'cartesia':
return cartesia.TTS(
voice=tts_config.get('voice_id'),
model=tts_config.get('speech_engine', 'sonic-2'),
speed=tts_config.get('speed', 'normal')
)
elif tts_config['tts_provider'] == 'openai':
return openai.TTS(
voice=tts_config.get('voice_id', 'alloy'),
model=tts_config.get('model', 'tts-1')
)
else:
raise ValueError(f"Unsupported TTS provider: {tts_config['tts_provider']}")
def get_outgoing_edges(self) -> List[Dict[str, Any]]:
"""Get outgoing edges from current node"""
return [edge for edge in self.flow_config.edges if edge['from_node_id'] == self.current_node_id]
def should_skip_current_node(self) -> bool:
"""Check if current node should be skipped"""
settings = self.current_node['settings']
return (not settings.get('on_enter_text', '').strip() and
settings.get('skip_response', False))
def is_routing_node(self) -> bool:
"""Check if current node is the main routing node"""
return self.current_node_id == self.flow_config.start_node_id
def has_multiple_routing_options(self) -> bool:
"""Check if current node has multiple routing options"""
outgoing_edges = self.get_outgoing_edges()
prompt_edges = [e for e in outgoing_edges if e['type'] == 'prompt']
return len(prompt_edges) > 1
async def on_enter(self) -> Agent:
"""Handle entering the current node"""
# Handle skip edges automatically
outgoing_edges = self.get_outgoing_edges()
skip_edges = [e for e in outgoing_edges if e['type'] == 'skip']
if skip_edges:
# Automatically transition to next node
next_node_id = skip_edges[0]['to_node_id']
<http://logger.info|logger.info>(f"Auto-transitioning from {self.current_node_id} to {next_node_id}")
# Update conversation data
cd: ConversationData = self.session.userdata
cd.current_node_id = next_node_id
# Return new agent for next node
return DynamicFlowAgent(self.job_context, self.flow_config, next_node_id)
# Say the on_enter_text if not skipped
settings = self.current_node['settings']
if settings.get('on_enter_text') and not settings.get('skip_response', False):
await self.session.say(settings['on_enter_text'])
# Return self to continue in current agent
return self
def build_routing_description(self) -> str:
"""Build routing description from edges"""
outgoing_edges = self.get_outgoing_edges()
descriptions = []
for edge in outgoing_edges:
if edge['type'] == 'prompt' and edge.get('settings', {}).get('prompt'):
descriptions.append(edge['settings']['prompt'])
return ' '.join(descriptions)
def map_node_to_topic(self, node_id: str) -> Optional[DynamicTopicChoice]:
"""Map node ID to topic choice"""
node = self.flow_config.nodes[node_id]
node_name = node['name'].lower()
if 'application' in node_name or 'billing' in node_id:
return DynamicTopicChoice.LOAN_APPLICATION
elif 'rates' in node_name or 'rates' in node_id:
return DynamicTopicChoice.RATES_AND_TERMS
elif 'refinanc' in node_name or 'refinanc' in node_id:
return DynamicTopicChoice.REFINANCING
elif 'support' in node_name or 'support' in node_id:
return DynamicTopicChoice.GENERAL_SUPPORT
elif 'general' in node_name and 'question' in node_name:
return DynamicTopicChoice.GENERAL_QUESTIONS
return None
def get_target_node_for_topic(self, topic: DynamicTopicChoice) -> Optional[str]:
"""Get target node ID for a given topic"""
for edge in self.get_outgoing_edges():
if edge['type'] == 'prompt':
target_node_id = edge['to_node_id']
mapped_topic = self.map_node_to_topic(target_node_id)
if mapped_topic == topic:
return target_node_id
return None
@function_tool
async def route_to_topic(
self,
topic: Annotated[DynamicTopicChoice, Field(description="Choose the appropriate topic based on user intent")]
) -> Agent:
"""Route user to appropriate topic"""
<http://logger.info|logger.info>(f"Routing to topic: {topic.value}")
# Find target node for this topic
target_node_id = self.get_target_node_for_topic(topic)
if target_node_id:
# Update conversation data
cd: ConversationData = self.session.userdata
cd.current_node_id = target_node_id
cd.record_interaction(self.current_node['name'], f"Route to {topic.value}", "Routing")
# Create new agent for target node
<http://logger.info|logger.info>(f"Returning DynamicFlowAgent for node: {target_node_id}")
return DynamicFlowAgent(self.job_context, self.flow_config, target_node_id)
else:
# Fallback to current agent
<http://logger.info|logger.info>("Returning current agent")
return self
@function_tool
async def switch_topic(
self,
new_topic: Annotated[DynamicTopicChoice, Field(description="New topic the user wants to discuss")]
) -> Agent:
"""Switch to a different topic"""
<http://logger.info|logger.info>(f"Switching to topic: {new_topic.value}")
# Find the routing node (start node) and route from there
routing_agent = DynamicFlowAgent(self.job_context, self.flow_config, self.flow_config.start_node_id)
return await routing_agent.route_to_topic(new_topic)
@function_tool
async def provide_assistance(
self,
user_question: Annotated[str, Field(description="User's specific question")]
) -> str:
"""Provide assistance for the current topic"""
cd: ConversationData = self.session.userdata
cd.record_interaction(self.current_node['name'], user_question, "Providing assistance")
return f"I'll help you with your question about {self.current_node['name'].lower()}."
2025-09-15 032702,074 - INFO dynamic-flow-agent - Routing to topic: loan_application
2025-09-15 032702,074 - INFO dynamic-flow-agent - Returning current agent
2025-09-15 032702,074 - DEBUG livekit.agents - tools execution completed {"speech_id": "speech_db368353ae81"}
2025-09-15 032702,401 - ERROR livekit.agents - Error in _update_activity_task
Traceback (most recent call last):
File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/utils/log.py", line 16, in async_fn_logs
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/voice/agent_session.py", line 947, in _update_activity_task
await self._update_activity(agent)
File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/voice/agent_session.py", line 898, in _update_activity
raise RuntimeError("cannot start agent: an activity is already running")
RuntimeError: cannot start agent: an activity is already runningtall-airline-16756
09/14/2025, 10:29 PMAgentActivity
is started while another one is still active. This happens because the framework wants to keep things consistent by allowing only one active activity at a time.
Looking at your code, this might be happening because:
• The current activity isn’t properly closed before starting a new one. Make sure to call the aclose()
method on the current activity so that agent._activity
is set to None
.
• You’re calling say()
or start()
methods when the agent isn’t in a running state yet. These methods need the activity to be active.
• update_agent()
is being used incorrectly. This method takes care of closing the current activity and starting a new one automatically, so it’s important to use it properly.
To help you debug this, here are a few things to check:
1. Make sure your transitions between agents (like in route_to_topic
) properly close the current activity before returning a new agent.
2. Double-check that you’re not creating multiple AgentSession
instances or making duplicate start()
calls.
3. Review your agents’ lifecycle to ensure their states are managed correctly.
If you’d like, I’m here to help you pinpoint the exact part of your code causing this or to clarify any of these steps further. Just let me know!
Sources: Still Have Questions | Getting Help | livekit/livekit_composite | DeepWiki | livekit-agents/livekit/agents/voice/agent_activity.py | agents/src/voice/agent_activity.ts