如下代码是否有问题呢
import logging
import asyncio
import base64
import os
import traceback
import contextlib
import io
from dotenv import load_dotenv
from PIL import Image
from tenacity import retry, stop_after_attempt, wait_fixed
from livekit import agents
from livekit.agents import Agent, AgentSession, JobContext, RoomInputOptions, cli, get_job_context
from livekit.agents.llm import ImageContent
from livekit.plugins import google, cartesia, noise_cancellation, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel
# ===== 加载配置 =====
load_dotenv()
logger = logging.getLogger("vision-assistant")
logging.basicConfig(level=
logging.INFO)
VOICE_ID = os.getenv("VOICE_ID", "0b904166-a29f-4d2e-bb20-41ca302f98e9")
MODEL_NAME = os.getenv("TTS_MODEL", "sonic-2")
MAX_IMAGE_SIZE = (1024, 1024) # 最大宽高
class VisionAssistant(Agent):
def __init__(self) -> None:
self._tasks: set[asyncio.Task] = set()
super().__init__(
instructions="""
你是一个中文多模态语音助手,支持图片与语音交互。
始终使用简体中文回答;收到图片时请用简洁的中文描述并给出有帮助的回复。
"""
)
async def on_enter(self):
def _image_received_handler(reader, participant_identity):
task = asyncio.create_task(self._image_received(reader, participant_identity))
self._tasks.add(task)
task.add_done_callback(self._task_done_callback)
get_job_context().room.register_byte_stream_handler("test", _image_received_handler)
await self._safe_generate_reply(
"请用中文简短问候用户:说明你能看图片并用中文语音播报结果。"
)
def _task_done_callback(self, t: asyncio.Task):
with contextlib.suppress(KeyError):
self._tasks.remove(t)
if t.exception():
logger.error("任务执行异常: %s", t.exception(), exc_info=True)
async def _image_received(self, reader, participant_identity):
logger.info("收到来自 %s 的图像: '%s'", participant_identity, getattr(
reader.info, "name", "<unknown>"))
try:
image_bytes = bytearray()
async for chunk in reader:
image_bytes.extend(chunk)
# 压缩 & 转换格式
image_bytes = self._process_image(image_bytes)
chat_ctx = self.chat_ctx.copy()
chat_ctx.add_message(
role="user",
content=[
ImageContent(
image=f"data:image/jpeg;base64,{base64.b64encode(image_bytes).decode('utf-8')}"
)
],
)
await self.update_chat_ctx(chat_ctx)
await self._safe_generate_reply(
"请用简洁的简体中文描述这张图片的主要内容,并给出一句友好建议(如果适用)。"
)
logger.info("已把图片交给视觉模型并请求生成回复。")
except Exception:
logger.error("处理图像时出错: %s", traceback.format_exc())
@staticmethod
def _process_image(image_bytes: bytes) -> bytes:
"""压缩 & 转换为 JPEG"""
try:
img = Image.open(io.BytesIO(image_bytes))
img.thumbnail(MAX_IMAGE_SIZE)
buffer = io.BytesIO()
img.save(buffer, format="JPEG", quality=85)
return buffer.getvalue()
except Exception as e:
logger.warning("图片处理失败,使用原图: %s", e)
return image_bytes
@retry(stop=stop_after_attempt(3), wait=wait_fixed(1))
async def _safe_generate_reply(self, instructions: str):
try:
await self.session.generate_reply(instructions=instructions)
except Exception as e:
logger.error("生成回复失败: %s", e)
raise
async def entrypoint(ctx: JobContext):
if not os.getenv("OPENAI_API_KEY"):
logger.warning("未检测到 OPENAI_API_KEY(用于 OpenAI TTS/LLM)")
if not os.getenv("GOOGLE_API_KEY"):
logger.warning("未检测到 GOOGLE_API_KEY(用于 Gemini)")
await ctx.connect()
logger.info("已连接到 LiveKit 房间:%s", ctx.room.name)
vision_llm = google.beta.realtime.RealtimeModel(
instructions="你是一个中文视觉理解助手,只返回简体中文文本,不输出语音。",
voice=None,
temperature=0.2,
modalities=["text"]
)
tts = cartesia.TTS(model=MODEL_NAME, voice=VOICE_ID)
session = AgentSession(
llm=vision_llm,
tts=tts,
vad=silero.VAD.load(),
turn_detection=MultilingualModel(),
)
try:
await session.start(
agent=VisionAssistant(),
room=ctx.room,
room_input_options=RoomInputOptions(
video_enabled=True,
noise_cancellation=noise_cancellation.BVC(),
),
)
finally:
await session.stop()
if
name == "__main__":
cli.run_app(agents.WorkerOptions(entrypoint_fnc=entrypoint))