could you provide complete code for this ```I'd l...
# ask-ai
r
could you provide complete code for this
Copy code
I'd love to help you create a workflow with nodes and conditional connections in LiveKit! Here's a friendly approach you can try:

Define Multiple Agents or Tools: Think of creating separate agents or tools for each part of your workflow, like greeting the user first and then gathering their information.
Use Conditional Handoffs: You can smoothly transition between nodes (agents or tools) based on what the user says or their intent. For example, after saying hello, you can hand off to a tool that collects their details.
Manage State with User Data: Keeping track of user info and choices with a shared state object helps maintain continuity throughout the workflow.
Implement Conditional Branching: Use simple logic to decide the next step depending on user responses. Like, if the user shares their name, move forward; if not, gently ask again.
Use Pipeline Hooks: Customize what happens at different points using hooks like on_enter() and on_exit(). For instance, you can warmly greet the user when they enter the first node.
These steps can help you build a dynamic and thoughtful workflow with conditional logic. If you'd like, I'm here to provide a specific code example or guide you further—just let me know!
t
Thanks for reaching out, Umair. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
I'm excited to share with you a complete example of creating a workflow with nodes and conditional connections in LiveKit. This example walks you through a multi-stage survey workflow that adapts based on user input, making it dynamic and interactive:
Copy code
import logging
from dotenv import load_dotenv
from dataclasses import dataclass, field
from typing import Dict, List
from enum import Enum
from pydantic import Field

from livekit.agents import JobContext, WorkerOptions, cli, AgentSession
from livekit.agents.llm import function_tool
from livekit.agents.voice import Agent
from livekit.plugins import deepgram, openai, cartesia, silero

load_dotenv()
logger = logging.getLogger("conditional-flow")
logger.setLevel(<http://logging.INFO|logging.INFO>)

# Define choices for each stage
class Stage1Choice(str, Enum):
    OPTION_A = "A"
    OPTION_B = "B"

class Stage2Choice(str, Enum):
    OPTION_X = "X"
    OPTION_Y = "Y"

@dataclass
class SurveyData:
    responses: Dict[str, str] = field(default_factory=dict)
    current_stage: int = 1
    current_branch: str = ""
    path_taken: List[str] = field(default_factory=list)

    def record(self, question: str, answer: str):
        self.responses[question] = answer
        self.path_taken.append(f"Stage {self.current_stage} ('{self.current_branch}') - {question}: {answer}")

class BaseAgent(Agent):
    def __init__(self, job_context: JobContext, instructions: str) -> None:
        self.job_context = job_context
        super().__init__(
            instructions=instructions,
            stt=deepgram.STT(),
            llm=openai.LLM(model="gpt-4o"),
            tts=cartesia.TTS(),
            vad=silero.VAD.load()
        )

# Stage 1: A or B
class Stage1Agent(BaseAgent):
    def __init__(self, job_context: JobContext) -> None:
        super().__init__(job_context=job_context, instructions="Choose Option A or B")

    async def on_enter(self) -> None:
        await self.session.say("Stage 1: Do you prefer Option A or Option B?")

    @function_tool
    async def collect_stage1(
        self,
        choice: Annotated[Stage1Choice, Field(description="A or B")]
    ) -> Agent:
        sd: SurveyData = self.session.userdata
        sd.record("Prefer A or B", choice.value)
        sd.current_branch = choice.value
        if choice == Stage1Choice.OPTION_A:
            return Stage1ABranchAgent(job_context=self.job_context)
        return Stage1BBranchAgent(job_context=self.job_context)

class Stage1ABranchAgent(BaseAgent):
    async def on_enter(self) -> None:
        await self.session.say("Why do you prefer Option A?")

    @function_tool
    async def collect_A(self, answer: str) -> Agent:
        sd: SurveyData = self.session.userdata
        sd.record("Why prefer A", answer)
        sd.current_stage = 2
        return Stage2Agent(job_context=self.job_context)

class Stage1BBranchAgent(BaseAgent):
    async def on_enter(self) -> None:
        await self.session.say("Why do you prefer Option B?")

    @function_tool
    async def collect_B(self, answer: str) -> Agent:
        sd: SurveyData = self.session.userdata
        sd.record("Why prefer B", answer)
        sd.current_stage = 2
        return Stage2Agent(job_context=self.job_context)

# Stage 2: X or Y
class Stage2Agent(BaseAgent):
    def __init__(self, job_context: JobContext) -> None:
        super().__init__(job_context=job_context, instructions="Choose Option X or Y")

    async def on_enter(self) -> None:
        await self.session.say("Stage 2: Do you prefer Option X or Option Y?")

    @function_tool
    async def collect_stage2(
        self,
        choice: Annotated[Stage2Choice, Field(description="X or Y")]
    ) -> Agent:
        sd: SurveyData = self.session.userdata
        sd.record("Prefer X or Y", choice.value)
        sd.current_branch = choice.value
        return SummaryAgent(job_context=self.job_context)

class SummaryAgent(BaseAgent):
    async def on_enter(self) -> None:
        sd: SurveyData = self.session.userdata
        summary = "\n".join(sd.path_taken)
        await self.session.say(f"Survey complete. Path taken: {summary}")
        await self.session.aclose()

# Entrypoint
async def entrypoint(ctx: JobContext) -> None:
    session = AgentSession()
    session.userdata = SurveyData()
    await session.start(agent=Stage1Agent(ctx), room=ctx.room)

if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint))
This example beautifully illustrates how you can build a flexible, multi-stage workflow that responds to user choices with conditional branching. If you have any questions or would like me to walk you through any part in more detail, please don’t hesitate to ask—I’m here to help! Sources: livekit/livekit_composite | DeepWiki | flows/declarative_flow.py | Pipeline nodes and hooks | LiveKit Docs | LangChain integration guide | LiveKit Docs
r
what are nodes and edges in this code
t
Thank you for sharing the code! Let’s walk through how the concepts of "nodes" and "edges" are represented here, so it’s clear and easy to follow: • Nodes: Think of these as the individual stages or steps in your workflow. They’re represented by the different
Agent
classes, like
Stage1Agent
,
Stage1ABranchAgent
,
Stage2Agent
, and so on. Each node is responsible for a specific part of the interaction, such as asking a question or handling the user’s input. • Edges: These are the connections or transitions between the nodes. They’re defined by the logic inside the
@function_tool
methods. For example, in
Stage1Agent
, the method
collect_stage1
decides whether to move on to
Stage1ABranchAgent
or
Stage1BBranchAgent
, depending on what the user chooses. So, to sum it up, nodes represent the different stages in your workflow, and edges are the conditional paths that link these stages based on user input or other logic. I hope this helps clarify things! If you have any more questions, feel free to ask—I’m here to help.
r
I am getting this error: 2025-09-15 031909,284 - INFO dynamic-flow-agent - Routing to topic: loan_application 2025-09-15 031909,284 - DEBUG livekit.agents - tools execution completed {"speech_id": "speech_fa772f7a8c25"} 2025-09-15 031909,593 - ERROR livekit.agents - Error in _update_activity_task Traceback (most recent call last): File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/utils/log.py", line 16, in async_fn_logs return await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/voice/agent_session.py", line 947, in _update_activity_task await self._update_activity(agent) File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/voice/agent_session.py", line 898, in _update_activity raise RuntimeError("cannot start agent: an activity is already running") RuntimeError: cannot start agent: an activity is already running 2025-09-15 031917,508 - INFO livekit.agents - shutting down worker {"id": "unregistered"} 2025-09-15 031917,509 - DEBUG livekit.agents - shutting down job task {"reason": "", "user_initiated": false} 2025-09-15 031917,510 - DEBUG livekit.agents - job exiting {"reason": "", "tid": 628406, "job_id": "simulated-job-d0dd89ed1516"} 2025-09-15 031917,510 - DEBUG livekit.agents - session closed {"reason": "job_shutdown", "error": null} 2025-09-15 031917,511 - DEBUG livekit.agents - http_session(): closing the httpclient ctx
t
It looks like you're encountering the error "`cannot start agent: an activity is already running`". This happens when you try to start a new
AgentActivity
within an
AgentSession
while another activity is still ongoing. This is actually a helpful safeguard designed to prevent any conflicts with resource usage and state management. Here are a couple of friendly tips to help you resolve this: • Make sure to properly close any existing agent activity before starting a new one. You can do this by calling the
aclose
method on the current
AgentActivity
instance. • If you're using
update_agent
to switch between agents, the framework takes care of closing the current activity and starting a new one automatically, so you don’t need to worry about manual closure. Take a moment to review your workflow to ensure the agents and their activities are managed smoothly. If you’d like, I’m here to help you with more detailed guidance or any other questions you might have! Sources: livekit-agents/livekit/agents/voice/agent.py | Agents Core Architecture | livekit/livekit_composite | DeepWiki
r
this is my code which is generating this error could you find out what is the issue
t
Could you please share the part of your code where the error happens, especially the sections with the
AgentSession
and how the transitions between agents are handled? That way, I can better understand what's going on and help you find the right solution.
r
Copy code
@function_tool
async def route_to_topic(
        self,
        topic: Annotated[DynamicTopicChoice, Field(description="Choose the appropriate topic based on user intent")]
) -> Agent:
    """Route user to appropriate topic"""
    <http://logger.info|logger.info>(f"Routing to topic: {topic.value}")

    # Find target node for this topic
    target_node_id = self.get_target_node_for_topic(topic)

    if target_node_id:
        # Update conversation data
        cd: ConversationData = self.session.userdata
        cd.current_node_id = target_node_id
        cd.record_interaction(self.current_node['name'], f"Route to {topic.value}", "Routing")

        # Create new agent for target node
        <http://logger.info|logger.info>(f"Returning DynamicFlowAgent for node: {target_node_id}")
        return DynamicFlowAgent(self.job_context, self.flow_config, target_node_id)
    else:
        # Fallback to current agent
        <http://logger.info|logger.info>("Returning current agent")
        return self
class DynamicFlowAgent(Agent):
    """Dynamic agent that adapts to flow configuration"""

    def __init__(self, job_context: JobContext, flow_config: FlowConfig, current_node_id: str):
        self.job_context = job_context
        self.flow_config = flow_config
        self.current_node_id = current_node_id

        # Get current node
        self.current_node = flow_config.nodes[current_node_id]

        # Configure agent with flow settings
        super().__init__(
            instructions=flow_config.instructions,
            stt=self._setup_stt(),
            llm=self._setup_llm(),
            tts=self._setup_tts(),
            vad=silero.VAD.load()
        )

    def _setup_stt(self):
        """Setup STT from flow configuration"""
        stt_config = self.flow_config.stt_settings
        if stt_config['provider'] == 'deepgram':
            return deepgram.STT(
                model=stt_config.get('model', 'nova-2'),
                language=stt_config.get('language', 'en-US'),
                smart_format=stt_config.get('smart_format', True)
            )
        else:
            raise ValueError(f"Unsupported STT provider: {stt_config['provider']}")

    def _setup_llm(self):
        """Setup LLM from flow configuration"""
        llm_config = self.flow_config.llm_settings
        model = llm_config.get('model', 'gpt-4o-mini')
        if model == 'gpt-4.1':
            model = 'gpt-4o-mini'

        if llm_config['provider'] == 'openai':
            return openai.LLM(
                model=model,
                temperature=llm_config.get('temperature', 0.7)
            )
        else:
            raise ValueError(f"Unsupported LLM provider: {llm_config['provider']}")

    def _setup_tts(self):
        """Setup TTS from flow configuration"""
        tts_config = self.flow_config.tts_settings
        if tts_config['tts_provider'] == 'cartesia':
            return cartesia.TTS(
                voice=tts_config.get('voice_id'),
                model=tts_config.get('speech_engine', 'sonic-2'),
                speed=tts_config.get('speed', 'normal')
            )
        elif tts_config['tts_provider'] == 'openai':
            return openai.TTS(
                voice=tts_config.get('voice_id', 'alloy'),
                model=tts_config.get('model', 'tts-1')
            )
        else:
            raise ValueError(f"Unsupported TTS provider: {tts_config['tts_provider']}")

    def get_outgoing_edges(self) -> List[Dict[str, Any]]:
        """Get outgoing edges from current node"""
        return [edge for edge in self.flow_config.edges if edge['from_node_id'] == self.current_node_id]

    def should_skip_current_node(self) -> bool:
        """Check if current node should be skipped"""
        settings = self.current_node['settings']
        return (not settings.get('on_enter_text', '').strip() and
                settings.get('skip_response', False))

    def is_routing_node(self) -> bool:
        """Check if current node is the main routing node"""
        return self.current_node_id == self.flow_config.start_node_id

    def has_multiple_routing_options(self) -> bool:
        """Check if current node has multiple routing options"""
        outgoing_edges = self.get_outgoing_edges()
        prompt_edges = [e for e in outgoing_edges if e['type'] == 'prompt']
        return len(prompt_edges) > 1

    async def on_enter(self) -> Agent:
        """Handle entering the current node"""
        # Handle skip edges automatically
        outgoing_edges = self.get_outgoing_edges()
        skip_edges = [e for e in outgoing_edges if e['type'] == 'skip']

        if skip_edges:
            # Automatically transition to next node
            next_node_id = skip_edges[0]['to_node_id']
            <http://logger.info|logger.info>(f"Auto-transitioning from {self.current_node_id} to {next_node_id}")

            # Update conversation data
            cd: ConversationData = self.session.userdata
            cd.current_node_id = next_node_id

            # Return new agent for next node
            return DynamicFlowAgent(self.job_context, self.flow_config, next_node_id)

        # Say the on_enter_text if not skipped
        settings = self.current_node['settings']
        if settings.get('on_enter_text') and not settings.get('skip_response', False):
            await self.session.say(settings['on_enter_text'])

        # Return self to continue in current agent
        return self

    def build_routing_description(self) -> str:
        """Build routing description from edges"""
        outgoing_edges = self.get_outgoing_edges()
        descriptions = []

        for edge in outgoing_edges:
            if edge['type'] == 'prompt' and edge.get('settings', {}).get('prompt'):
                descriptions.append(edge['settings']['prompt'])

        return ' '.join(descriptions)

    def map_node_to_topic(self, node_id: str) -> Optional[DynamicTopicChoice]:
        """Map node ID to topic choice"""
        node = self.flow_config.nodes[node_id]
        node_name = node['name'].lower()

        if 'application' in node_name or 'billing' in node_id:
            return DynamicTopicChoice.LOAN_APPLICATION
        elif 'rates' in node_name or 'rates' in node_id:
            return DynamicTopicChoice.RATES_AND_TERMS
        elif 'refinanc' in node_name or 'refinanc' in node_id:
            return DynamicTopicChoice.REFINANCING
        elif 'support' in node_name or 'support' in node_id:
            return DynamicTopicChoice.GENERAL_SUPPORT
        elif 'general' in node_name and 'question' in node_name:
            return DynamicTopicChoice.GENERAL_QUESTIONS

        return None

    def get_target_node_for_topic(self, topic: DynamicTopicChoice) -> Optional[str]:
        """Get target node ID for a given topic"""
        for edge in self.get_outgoing_edges():
            if edge['type'] == 'prompt':
                target_node_id = edge['to_node_id']
                mapped_topic = self.map_node_to_topic(target_node_id)
                if mapped_topic == topic:
                    return target_node_id
        return None

    @function_tool
    async def route_to_topic(
            self,
            topic: Annotated[DynamicTopicChoice, Field(description="Choose the appropriate topic based on user intent")]
    ) -> Agent:
        """Route user to appropriate topic"""
        <http://logger.info|logger.info>(f"Routing to topic: {topic.value}")

        # Find target node for this topic
        target_node_id = self.get_target_node_for_topic(topic)

        if target_node_id:
            # Update conversation data
            cd: ConversationData = self.session.userdata
            cd.current_node_id = target_node_id
            cd.record_interaction(self.current_node['name'], f"Route to {topic.value}", "Routing")

            # Create new agent for target node
            <http://logger.info|logger.info>(f"Returning DynamicFlowAgent for node: {target_node_id}")
            return DynamicFlowAgent(self.job_context, self.flow_config, target_node_id)
        else:
            # Fallback to current agent
            <http://logger.info|logger.info>("Returning current agent")
            return self

    @function_tool
    async def switch_topic(
            self,
            new_topic: Annotated[DynamicTopicChoice, Field(description="New topic the user wants to discuss")]
    ) -> Agent:
        """Switch to a different topic"""
        <http://logger.info|logger.info>(f"Switching to topic: {new_topic.value}")

        # Find the routing node (start node) and route from there
        routing_agent = DynamicFlowAgent(self.job_context, self.flow_config, self.flow_config.start_node_id)
        return await routing_agent.route_to_topic(new_topic)

    @function_tool
    async def provide_assistance(
            self,
            user_question: Annotated[str, Field(description="User's specific question")]
    ) -> str:
        """Provide assistance for the current topic"""
        cd: ConversationData = self.session.userdata
        cd.record_interaction(self.current_node['name'], user_question, "Providing assistance")

        return f"I'll help you with your question about {self.current_node['name'].lower()}."
2025-09-15 032702,074 - INFO dynamic-flow-agent - Routing to topic: loan_application 2025-09-15 032702,074 - INFO dynamic-flow-agent - Returning current agent 2025-09-15 032702,074 - DEBUG livekit.agents - tools execution completed {"speech_id": "speech_db368353ae81"} 2025-09-15 032702,401 - ERROR livekit.agents - Error in _update_activity_task Traceback (most recent call last): File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/utils/log.py", line 16, in async_fn_logs return await fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/voice/agent_session.py", line 947, in _update_activity_task await self._update_activity(agent) File "/home/umair/.cache/pypoetry/virtualenvs/livekit-workflow-TFmTJtXu-py3.12/lib/python3.12/site-packages/livekit/agents/voice/agent_session.py", line 898, in _update_activity raise RuntimeError("cannot start agent: an activity is already running") RuntimeError: cannot start agent: an activity is already running
t
Hi there! The error "`cannot start agent: an activity is already running`" usually pops up when a new
AgentActivity
is started while another one is still active. This happens because the framework wants to keep things consistent by allowing only one active activity at a time. Looking at your code, this might be happening because: • The current activity isn’t properly closed before starting a new one. Make sure to call the
aclose()
method on the current activity so that
agent._activity
is set to
None
. • You’re calling
say()
or
start()
methods when the agent isn’t in a running state yet. These methods need the activity to be active. •
update_agent()
is being used incorrectly. This method takes care of closing the current activity and starting a new one automatically, so it’s important to use it properly. To help you debug this, here are a few things to check: 1. Make sure your transitions between agents (like in
route_to_topic
) properly close the current activity before returning a new agent. 2. Double-check that you’re not creating multiple
AgentSession
instances or making duplicate
start()
calls. 3. Review your agents’ lifecycle to ensure their states are managed correctly. If you’d like, I’m here to help you pinpoint the exact part of your code causing this or to clarify any of these steps further. Just let me know! Sources: Still Have Questions | Getting Help | livekit/livekit_composite | DeepWiki | livekit-agents/livekit/agents/voice/agent_activity.py | agents/src/voice/agent_activity.ts