I want to call generate_reply() inside a function ...
# ask-ai
b
I want to call generate_reply() inside a function tool while a async process is running. I want the generate_reply function to be executed in parallell as the async process, but I want to wait_for_playout() before returning the value of the tool call
t
Thanks for reaching out, Petter. I'm an AI assistant and I'm here to help. Give me a moment while I research this topic.
If you want to call
generate_reply()
inside a function tool while an async process is running, you can use the
SpeechHandle
that
generate_reply()
gives you to manage speech alongside your async task. Here’s a simple way to do it: 1. Kick off the speech generation with
generate_reply()
. This won’t block your code and returns a
SpeechHandle
. 2. Run your async process at the same time using
asyncio.create_task()
. 3. Use
await speech_handle.wait_for_playout()
to wait until the speech finishes before moving on. 4. Keep an eye out for interruptions or errors by checking
speech_handle.interrupted
or using try-except blocks. Here’s a quick example to illustrate:
Copy code
from livekit.agents import function_tool, RunContext
import asyncio

@function_tool
async def process_with_speech(context: RunContext, task_description: str):
    # Start speech generation
    speech_handle = context.session.generate_reply(
        instructions=f"Starting to process: {task_description}"
    )

    # Start async process
    async def long_running_process():
        await asyncio.sleep(5)  # Simulate a long-running task
        return "Process completed successfully"

    process_task = asyncio.create_task(long_running_process())

    # Wait for speech to complete
    await speech_handle.wait_for_playout()

    # Wait for async process to complete
    result = await process_task

    # Provide final speech feedback
    await context.session.generate_reply(
        instructions=f"Process completed with result: {result}"
    )

    return result
This way, your speech and async tasks run side-by-side smoothly, keeping everything nicely coordinated. Sources: tests/test_evals.py | Agent speech and audio | LiveKit Docs
b
how do i handle race conditions where process_task is finished before speech_handle and vice versa?
t
To handle race conditions between
process_task
and
speech_handle
, here are some friendly tips you can try out: 1. Use Callbacks: You can add a done callback to the
SpeechHandle
so that something happens right after the speech finishes:
speech_handle.add_done_callback(lambda _: process_task())
2. Await Completion: Simply use
await
to make sure things happen in the right order:
Copy code
await speech_handle
process_task()
3. Check State: Feel free to check if the speech is done or interrupted before moving on:
Copy code
if speech_handle.done():
    process_task()
4. Prevent Interruptions: You can set
allow_interruptions=False
to keep the speech from being interrupted:
Copy code
speech_handle = context.session.generate_reply(
    instructions="...",
    allow_interruptions=False
)
5. Coordinate with RunResult: Use
RunResult
to keep an eye on multiple handles and make sure everything finishes nicely:
Copy code
run_result = RunResult()
run_result._watch_handle(speech_handle)
run_result._watch_handle(process_task)
await run_result
These little tricks should help you manage concurrency smoothly and avoid those pesky race conditions. If you have any questions or want to chat more about it, I’m here to help!
b
Please update with the following: • Make sure speech_handle and process_task is running in parallel • However, only return the results of process_Task when both processes are finished (process_task has resolved its promise and speech_handle has finished wait_for_playout())
t
To make sure
speech_handle
and
process_task
run side by side but only give you results once they’re both done, you can use
asyncio.gather()
. Let me show you how to do it: 1. Create both tasks using `asyncio.create_task()`:
Copy code
speech_task = asyncio.create_task(speech_handle.wait_for_playout())
process_task = asyncio.create_task(long_running_process())
2. Then, use
asyncio.gather()
to wait for both tasks to finish:
results = await asyncio.gather(speech_task, process_task)
3. Once they’re done, grab the results like this:
Copy code
speech_result = results[0]
process_result = results[1]
Here’s a full example to make it clearer:
Copy code
import asyncio

async def long_running_process():
    await asyncio.sleep(5)  # Simulate a long-running task
    return "Process completed successfully"

async def main():
    # Kick off speech and process tasks
    speech_task = asyncio.create_task(speech_handle.wait_for_playout())
    process_task = asyncio.create_task(long_running_process())

    # Wait until both are done
    results = await asyncio.gather(speech_task, process_task)

    # Pull out the results
    speech_result = results[0]
    process_result = results[1]

    return process_result

# Run the main function
asyncio.run(main())
This way, both tasks run at the same time, and you get the
process_task
result only after everything’s wrapped up. Hope that helps!
b
Add this into your implementation and return a fully working function tool
t
Here's a fully working example of a function tool that makes sure
speech_handle
and
process_task
run side by side, and only gives you the result of
process_task
once both are done:
Copy code
from livekit.agents import function_tool, RunContext
import asyncio

@function_tool
async def process_with_speech(context: RunContext, task_description: str):
    """Run a process while providing speech feedback, making sure both tasks finish before returning."""

    # Kick off speech generation
    speech_handle = context.session.generate_reply(
        instructions=f"Starting to process: {task_description}"
    )

    # Define the async process task
    async def long_running_process():
        await asyncio.sleep(5)  # Simulate a long-running task
        return "Process completed successfully"

    # Create tasks for speech and process
    speech_task = asyncio.create_task(speech_handle.wait_for_playout())
    process_task = asyncio.create_task(long_running_process())

    # Wait for both tasks to finish
    results = await asyncio.gather(speech_task, process_task)

    # Grab the process result
    process_result = results[1]

    # Give final speech feedback
    await context.session.generate_reply(
        instructions=f"Process completed with result: {process_result}"
    )

    return process_result
This function tool makes sure the speech and process run together smoothly, and it only returns the process result once both are wrapped up. Hope this helps!