steep-balloon-41261
02/25/2025, 1:07 PMgray-monkey-83525
02/25/2025, 1:07 PMfrom livekit import VoicePipelineAgent, JobContext
class MyStateMachine:
def __init__(self):
self.state = "initializing"
def transition(self, new_state):
print(f"Transitioning from {self.state} to {new_state}")
self.state = new_state
async def entrypoint(ctx: JobContext):
state_machine = MyStateMachine()
state_machine.transition("listening")
# Connect to the room
await ctx.connect(auto_subscribe=True)
# Wait for a participant
participant = await ctx.wait_for_participant()
print(f"Participant {participant.identity} connected")
# Transition to speaking state
state_machine.transition("speaking")
await ctx.say("Hello, how can I assist you today?")
# Example of transitioning to another state
state_machine.transition("thinking")
# Perform some logic here
# Transition back to listening
state_machine.transition("listening")
# Define the agent
agent = VoicePipelineAgent(entrypoint=entrypoint)
Would you like more detail on how to integrate this with your specific use case?
Sources:
- LiveKit documentation 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.