前言

一款电脑智能助手肯定是需要语音对话来实现控制的,没有语音控制,实用性下降很多,有这打字的功夫跟你助手说,我都能自己点点鼠标完成了。所以我在原有基础上,完善了这部分的基础实现。

问题总结

  • 语音播放避雷这个sounddevice库,存在bug,会直接弹窗报错,我后面就改成使用pyaudio,虽然目前局限于在windows上使用,但是智能助手这个项目本身在liunx等系统作用也小很多,所以我暂时不考虑。
  • 回复词会被识别的问题,本来我觉得这个问题很简单,加一个固定的忽略就可以,但是却发现,明明忽略之后的提示音是在回复词之后的,但是还是会识别到回复词,逻辑上也一直没找到根本问题,所以在识别之后加了个过滤,把回复词过滤掉。
  • 回复词之后一段时间的声音会被吞,这个跟上一个问题是结合在一起的,我怀疑可能是一个问题导致的,我暂时是加了一个提示音,在听到“滴”的一声之后的声音是不会被吞掉的。

环境版本

pvporcupine>=4.0.1
ctranslate2>=4.6.3
faster-whisper>=1.2.1
piper-tts>=1.3.0
onnxruntime-gpu>=1.23.2
soundfile>=0.13.1
pyaudio>=0.2.14

porcupine 自定义唤醒词

官方提供了一些可选的唤醒词,同时也提供了自定义的方法
1.点击官网,进入需要一点魔法。找到下图创建提示词按钮
在这里插入图片描述
2.选择语言
在这里插入图片描述
3.输入提示词
在这里插入图片描述
4.录制提示词声音
在这里插入图片描述
5.点击训练
在这里插入图片描述
然后回得到一个ppn和pv,在yaml配置文件里面填写相应位置就可以了

配置文件yaml

voice:
  # 基础设置
  enabled: true                    # 是否启用语音控制
  max_history: 10
  device: "cpu"                     # 设备: cpu, cuda, auto

  # 音频设置
  sample_rate: 16000                # 采样率
  chunk_size: 512                   # 音频块大小
  channels: 1                       # 音频通道数

  # 唤醒词设置
  enable_wake_word: true            # 启用唤醒词检测
  porcupine_access_key: ""          # Picovoice访问密钥(必填)
  wake_words: [ "" ]         # 默认唤醒词列表
  keyword_paths: [ ".ppn" ]                 # 自定义唤醒词文件路径
  model_path: "porcupine_params_zh.pv"

  # 语音识别 (STT)
  enable_stt: true                  # 启用语音识别
  whisper_model: "medium"            # 模型大小: tiny, base, small, medium, large-v2
  compute_type: "int8"              # 计算类型: int8, int8_float16

  # 语音合成 (TTS)
  enable_tts: true                  # 启用语音合成
  tts_model: "zh_CN-huayan-medium"  # TTS语音模型
  use_cuda: false                   # TTS是否使用CUDA

下载语音合成模型

这个代码运行会从huggingface下载模型,下载失败大概率网络问题

"""
语音模型下载脚本
"""

import os
import sys
from pathlib import Path
import requests
import zipfile
import tarfile
from tqdm import tqdm
import platform


def download_file(url: str, save_path: Path):
    """下载文件并显示进度条"""
    try:
        response = requests.get(url, stream=True)
        total_size = int(response.headers.get('content-length', 0))

        with open(save_path, 'wb') as f:
            with tqdm(total=total_size, unit='B', unit_scale=True, desc=f"下载 {save_path.name}") as pbar:
                for data in response.iter_content(chunk_size=1024):
                    f.write(data)
                    pbar.update(len(data))

        return True
    except Exception as e:
        print(f"下载失败: {e}")
        return False


def download_piper_tts_models():
    """下载 Piper TTS 语音模型"""
    print("=" * 50)
    print("下载 Piper TTS 语音模型")
    print("=" * 50)

    # 语音模型列表
    voices = {
        "zh_CN-huayan-medium": {
            "model": "https://huggingface.co/rhasspy/piper-voices/resolve/main/zh/zh_CN/huayan/medium/zh_CN-huayan-medium.onnx",
            "config": "https://huggingface.co/rhasspy/piper-voices/resolve/main/zh/zh_CN/huayan/medium/zh_CN-huayan-medium.onnx.json"
        },
        "zh_CN-huayan-low": {
            "model": "https://huggingface.co/rhasspy/piper-voices/resolve/main/zh/zh_CN/huayan/low/zh_CN-huayan-low.onnx",
            "config": "https://huggingface.co/rhasspy/piper-voices/resolve/main/zh/zh_CN/huayan/low/zh_CN-huayan-low.onnx.json"
        },
        "en_US-lessac-medium": {
            "model": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/lessac/medium/en_US-lessac-medium.onnx",
            "config": "https://huggingface.co/rhasspy/piper-voices/resolve/main/en/en_US/lessac/medium/en_US-lessac-medium.onnx.json"
        }
    }

    # 选择语音
    print("可用语音:")
    for i, voice_name in enumerate(voices.keys(), 1):
        print(f"  {i}. {voice_name}")

    choice = input("\n请选择要下载的语音编号 (默认 1 中文): ").strip()
    if choice == "":
        choice = "1"

    voice_names = list(voices.keys())
    try:
        selected_idx = int(choice) - 1
        if 0 <= selected_idx < len(voice_names):
            voice_name = voice_names[selected_idx]
        else:
            voice_name = "zh_CN-huayan-medium"
    except:
        voice_name = "zh_CN-huayan-medium"

    print(f"将下载语音: {voice_name}")

    project_root = Path(__file__).parent.parent.parent
    # 创建目录
    voice_dir = project_root/"assets"/ "voices"
    voice_dir.mkdir(parents=True, exist_ok=True)

    # 下载模型和配置
    model_url = voices[voice_name]["model"]
    config_url = voices[voice_name]["config"]

    model_file = voice_dir / f"{voice_name}.onnx"
    config_file = voice_dir / f"{voice_name}.json"

    success = True

    # 下载模型文件
    if not model_file.exists():
        print(f"正在下载模型文件...")
        if not download_file(model_url, model_file):
            success = False

    # 下载配置文件
    if not config_file.exists():
        print(f"正在下载配置文件...")
        if not download_file(config_url, config_file):
            success = False

    if success:
        print(f"✅ 语音模型下载完成:")
        print(f"   模型: {model_file}")
        print(f"   配置: {config_file}")
        return str(voice_dir)
    else:
        print("❌ 语音模型下载失败")
        return None


def main():
    """主函数"""
    print("Unsola 语音模型下载工具")
    print("=" * 50)

    print("下载 Piper TTS 语音合成模型")
    download_piper_tts_models()


if __name__ == "__main__":
    main()

使用需要改一下下载到本地的voice_dir目录路径,而语音识别模型会在使用前检查下没下载,没有下载就会自动下载。

核心代码

我代码分为控制类和助手类,助手类是对控制类的再封装

控制类

"""
语音控制模块
集成唤醒词检测、语音识别、语音合成功能
不使用 sounddevice 库
"""
import threading
import queue
import time
from pathlib import Path
from typing import Optional, Callable, Dict, Any
import numpy as np
import wave

import pvporcupine
import pyaudio
from faster_whisper import WhisperModel
from piper import PiperVoice

from src.utils.config_manager import ConfigManager
from src.utils.logger import LogManager

logger = LogManager().setup_logger("voice_control")

class VoiceControl:
    """语音控制核心类"""

    def __init__(self, config_manager: ConfigManager):
        """
        初始化语音控制

        Args:
            config_manager: 配置管理器
        """
        self.config = config_manager
        self.voice_config=self.config.get('voice', {})
        self.is_running = False
        self.is_listening = False
        self.audio_queue = queue.Queue()
        self.wake_word_detected = threading.Event()

        # 语音组件
        self.porcupine = None  # 唤醒词检测
        self.whisper_model = None  # 语音识别
        self.tts_engine = None  # 语音合成

        # 音频流
        self.audio_stream = None
        self.pyaudio_instance = None

        # 回调函数
        self.on_wake_word_callback = None
        self.on_transcription_callback = None
        self.on_error_callback = None

        # 音频参数
        self.sample_rate = self.voice_config.get('sample_rate', 16000)
        self.channels = self.voice_config.get('channels', 1)
        self.chunk_size = self.voice_config.get('chunk_size', 512)

        # 线程
        self.audio_thread = None
        self.processing_thread = None

        self.project_root = Path(__file__).parent.parent.parent

        # 初始化语音组件
        self._initialize_components()

    def _initialize_components(self):
        """初始化语音组件"""
        try:
            # 初始化唤醒词检测
            if pvporcupine and self.voice_config.get('enable_wake_word', True):
                self._init_wake_word_detector()

            # 初始化语音识别
            if WhisperModel and self.voice_config.get('enable_stt', True):
                self._init_speech_recognition()

            # 初始化语音合成
            if PiperVoice and self.voice_config.get('enable_tts', True):
                self._init_text_to_speech()

            # 初始化音频采集
            self.pyaudio_instance = pyaudio.PyAudio()
            logger.info("语音组件初始化完成")

        except Exception as e:
            logger.error(f"初始化语音组件失败: {e}")

    def _init_wake_word_detector(self):
        """初始化唤醒词检测"""
        try:
            access_key = self.voice_config.get('porcupine_access_key', '')
            if not access_key:
                logger.warning("未配置Porcupine Access Key,跳过唤醒词检测")
                return

            # 获取唤醒词配置
            wake_words = self.voice_config.get('wake_words', ['porcupine'])
            keyword_paths = self.voice_config.get('keyword_paths', [])
            model_path = self.voice_config.get('model_path', '')

            # 创建Porcupine实例
            if keyword_paths:
                self.porcupine = pvporcupine.create(
                    access_key=access_key,
                    keyword_paths=keyword_paths,
                    model_path=model_path
                )
            else:
                self.porcupine = pvporcupine.create(
                    access_key=access_key,
                    keywords=wake_words
                )

            self.sample_rate = self.porcupine.sample_rate
            self.chunk_size = self.porcupine.frame_length
            logger.info(f"唤醒词检测初始化完成,采样率: {self.sample_rate}Hz, 帧长: {self.chunk_size}")

        except Exception as e:
            logger.error(f"初始化唤醒词检测失败: {e}")
            self.porcupine = None

    def _init_speech_recognition(self):
        """初始化语音识别"""
        try:
            model_size = self.voice_config.get('whisper_model', 'small')
            device = self.voice_config.get('device', 'cpu')
            compute_type = self.voice_config.get('compute_type', 'int8')


            self.whisper_model = WhisperModel(
                model_size,
                device=device,
                compute_type=compute_type,
                num_workers=2,
                download_root=str(self.project_root/"assets"/ "models")
            )

            logger.info(f"语音识别初始化完成,模型: {model_size}")

        except Exception as e:
            logger.error(f"初始化语音识别失败: {e}")
            self.whisper_model = None

    def _init_text_to_speech(self):
        """初始化语音合成"""
        try:
            voice_model_name = self.voice_config.get('tts_model', 'zh_CN-huayan-medium')
            use_cuda = self.voice_config.get('use_cuda', False)

            # 创建模型目录
            model_dir = self.project_root/"assets"/"voices"
            model_dir.mkdir(parents=True, exist_ok=True)

            # 定义完整的模型和配置文件路径
            model_file = model_dir / f"{voice_model_name}.onnx"
            config_file = model_dir / f"{voice_model_name}.json"

            # 检查文件是否存在
            if not model_file.exists():
                print(f"错误: 找不到模型文件: {model_file}")
                print("请下载模型文件:")
                print(f"1. https://huggingface.co/rhasspy/piper-voices/tree/main/zh/{voice_model_name}")
                print(f"2. 需要下载: {voice_model_name}.onnx 和 {voice_model_name}.json")
                return False

            # 使用完整路径加载
            self.tts_engine = PiperVoice.load(
                model_path=str(model_file),
                config_path=str(config_file),
                use_cuda=use_cuda
            )

            logger.info(f"语音合成初始化完成,语音模型: {voice_model_name}")

        except Exception as e:
            logger.error(f"初始化语音合成失败: {e}")
            self.tts_engine = None

    def start(self):
        """启动语音控制"""
        if self.is_running:
            logger.warning("语音控制已经在运行")
            return

        self.is_running = True

        # 启动音频采集线程
        self.audio_thread = threading.Thread(target=self._audio_capture_loop, daemon=True)
        self.audio_thread.start()

        # 启动处理线程
        self.processing_thread = threading.Thread(target=self._processing_loop, daemon=True)
        self.processing_thread.start()

        logger.info("语音控制已启动")

    def stop(self):
        """停止语音控制"""
        self.is_running = False

        # 停止音频流
        if self.audio_stream:
            try:
                self.audio_stream.stop_stream()
                self.audio_stream.close()
            except:
                pass
            self.audio_stream = None

        # 关闭 PyAudio
        if self.pyaudio_instance:
            try:
                self.pyaudio_instance.terminate()
            except:
                pass

        # 等待线程结束
        if self.audio_thread:
            self.audio_thread.join(timeout=2)

        if self.processing_thread:
            self.processing_thread.join(timeout=2)

        # 释放资源
        if self.porcupine:
            self.porcupine.delete()

        logger.info("语音控制已停止")

    def _audio_capture_loop(self):
        """音频采集循环 - 使用 PyAudio"""
        try:
            # 创建队列
            self.audio_queue = queue.Queue(maxsize=50)

            # 打开音频流
            self.audio_stream = self.pyaudio_instance.open(
                format=pyaudio.paFloat32,
                channels=self.channels,
                rate=self.sample_rate,
                input=True,
                frames_per_buffer=self.chunk_size,
                stream_callback=self._audio_callback
            )

            logger.info(f"音频采集已启动,采样率: {self.sample_rate}Hz, 块大小: {self.chunk_size}")

            # 保持流运行
            self.audio_stream.start_stream()

            # 等待直到停止
            while self.is_running and self.audio_stream.is_active():
                time.sleep(0.1)

        except Exception as e:
            logger.error(f"音频采集错误: {e}")
            if self.on_error_callback:
                self.on_error_callback(f"音频采集错误: {e}")

    def _audio_callback(self, in_data, frame_count, time_info, status):
        """PyAudio 音频回调函数"""
        try:
            if status:
                if status == pyaudio.paInputOverflowed:
                    logger.warning("音频输入溢出,请降低输入音量")
                else:
                    logger.warning(f"音频状态: {status}")

            # 将数据转换为 numpy 数组
            audio_data = np.frombuffer(in_data, dtype=np.float32).reshape(-1, self.channels)

            # 放入队列
            try:
                self.audio_queue.put_nowait(audio_data)
            except queue.Full:
                # 队列满了,丢弃一些旧数据
                logger.debug("音频队列已满,丢弃数据")

        except Exception as e:
            logger.error(f"音频回调错误: {e}")

        return in_data, pyaudio.paContinue

    def _processing_loop(self):
        """处理循环"""
        audio_buffer = []
        is_recording = False
        silence_frames = 0
        max_silence_frames = int(self.sample_rate / self.chunk_size * 2)  # 2秒静默
        ignore_frames = 0  # 添加忽略帧计数器
        ignore_frames_threshold = 10
        is_start=True

        while self.is_running:
            try:
                # 从队列获取音频数据
                audio_data = self.audio_queue.get(timeout=0.1)

                # 检查音频数据长度
                if len(audio_data) != self.chunk_size:
                    logger.warning(f"音频数据长度不匹配: 期望 {self.chunk_size}, 实际 {len(audio_data)}")
                    continue

                # 唤醒词检测
                if self.porcupine and not is_recording:
                    # 转换为 16 位 PCM
                    pcm_data = (audio_data.flatten() * 32767).astype(np.int16)

                    # 确保长度正确
                    if len(pcm_data) == self.porcupine.frame_length:
                        keyword_index = self.porcupine.process(pcm_data)

                        if keyword_index >= 0:
                            logger.info("检测到唤醒词!")
                            is_recording = True
                            is_start = True
                            audio_buffer = []
                            silence_frames = 0
                            ignore_frames = 0

                            if self.on_wake_word_callback:
                                self.on_wake_word_callback()

                            continue

                # 语音识别
                if is_recording:
                    # 如果还在忽略期,跳过当前音频帧
                    if ignore_frames < ignore_frames_threshold:
                        ignore_frames += 1
                        continue

                    if is_start:
                        self.play_notification_sound()
                        is_start = False

                    audio_buffer.append(audio_data)

                    # 检测静默(VAD)
                    energy = np.mean(np.abs(audio_data))
                    if energy < 0.01:  # 静默阈值
                        silence_frames += 1
                    else:
                        silence_frames = 0

                    # 如果静默时间过长,结束录音
                    if silence_frames > max_silence_frames:
                        logger.info("检测到静默,结束录音")
                        is_recording = False

                        if audio_buffer:
                            self._process_audio(audio_buffer)

                        audio_buffer = []
                        silence_frames = 0
                        ignore_frames = 0

            except queue.Empty:
                continue
            except Exception as e:
                logger.error(f"处理音频数据错误: {e}")
                import traceback
                traceback.print_exc()

    def _process_audio(self, audio_buffer):
        """处理音频数据"""
        try:
            # 合并音频数据
            audio_np = np.concatenate(audio_buffer, axis=0)

            # 保存临时文件
            temp_file = self.project_root/"assets" / "temp_audio.wav"
            self._save_audio_to_wav(audio_np, temp_file)

            # 语音识别
            if self.whisper_model:
                segments, info = self.whisper_model.transcribe(
                    str(temp_file),
                    language=self.voice_config.get('language', 'zh'),
                    beam_size=5,
                    vad_filter=True
                )

                text = " ".join([segment.text for segment in segments])

                if text.strip():
                    # 过滤掉可能的回复词
                    filter_words = ["我在听", "请说"]
                    filtered_text = text.strip()

                    for word in filter_words:
                        filtered_text = filtered_text.replace(word, "")

                    # 去除多余的空格
                    filtered_text = " ".join(filtered_text.split())

                    logger.info(f"原始识别结果: {text}")
                    logger.info(f"过滤后结果: {filtered_text}")

                    if self.on_transcription_callback:
                        self.on_transcription_callback(filtered_text)

            # 删除临时文件
            temp_file.unlink(missing_ok=True)

        except Exception as e:
            logger.error(f"处理音频错误: {e}")

    def _save_audio_to_wav(self, audio_data, filepath):
        """保存音频到WAV文件"""
        try:
            # 转换为16位整数
            audio_int16 = (audio_data * 32767).astype(np.int16)

            with wave.open(str(filepath), 'wb') as wf:
                wf.setnchannels(self.channels)
                wf.setsampwidth(2)  # 16位 = 2字节
                wf.setframerate(self.sample_rate)
                wf.writeframes(audio_int16.tobytes())

        except Exception as e:
            logger.error(f"保存音频文件错误: {e}")

    def synthesize_speech(self, text: str) -> Optional[bytes]:
        """
        语音合成(文本转语音)
        """
        if not self.tts_engine:
            logger.warning("语音合成未初始化")
            return None

        try:
            logger.info(f"语音合成: {text[:50]}...")

            import io

            # 创建内存中的缓冲区
            buffer = io.BytesIO()

            # 打开一个 wave 写入对象,写入到内存缓冲区
            with wave.open(buffer, 'wb') as wav_file:
                self.tts_engine.synthesize_wav(text, wav_file)

            # 获取完整的 WAV 数据
            wav_data = buffer.getvalue()

            if wav_data and len(wav_data) > 0:
                logger.info(f"语音合成成功: {len(wav_data)} bytes")

                # 验证是有效的 WAV 文件
                if wav_data.startswith(b'RIFF'):
                    logger.debug("确认是有效的 WAV 格式")

                return wav_data
            else:
                logger.error("语音合成返回空数据")
                return None

        except Exception as e:
            logger.error(f"语音合成失败: {e}")
            import traceback
            traceback.print_exc()
            return None

    def speak(self, text: str) -> bool:
        """
        语音合成并播放
        """
        try:
            # 获取音频数据
            audio_data = self.synthesize_speech(text)

            if not audio_data:
                logger.error("语音合成失败,无音频数据")
                return False

            # 播放音频
            if self._play_audio_with_pyaudio(audio_data):
                logger.info(f"语音播放成功: {text[:30]}...")

                return True
            else:
                logger.error("语音播放失败")
                return False

        except Exception as e:
            logger.error(f"语音合成/播放失败: {e}")
            import traceback
            traceback.print_exc()
            return False

    def _play_audio_with_pyaudio(self, audio_data: bytes) -> bool:
        """
        使用 PyAudio 播放音频
        """
        try:
            import io
            import wave

            # 检查是否是有效的 WAV 数据
            if not audio_data.startswith(b'RIFF'):
                logger.error("无效的音频数据格式")
                return False

            # 从字节数据创建 wave 文件对象
            buffer = io.BytesIO(audio_data)

            with wave.open(buffer, 'rb') as wf:
                # 获取音频参数
                channels = wf.getnchannels()
                sample_width = wf.getsampwidth()
                sample_rate = wf.getframerate()
                n_frames = wf.getnframes()
                audio_format = None

                # 根据采样宽度设置格式
                if sample_width == 1:
                    audio_format = pyaudio.paInt8
                elif sample_width == 2:
                    audio_format = pyaudio.paInt16
                elif sample_width == 4:
                    audio_format = pyaudio.paInt32
                else:
                    logger.error(f"不支持的采样宽度: {sample_width}")
                    return False

                # 读取所有音频数据
                audio_data_bytes = wf.readframes(n_frames)

                # 创建 PyAudio 实例(如果不存在)
                if not self.pyaudio_instance:
                    self.pyaudio_instance = pyaudio.PyAudio()

                # 打开输出流
                output_stream = self.pyaudio_instance.open(
                    format=audio_format,
                    channels=channels,
                    rate=sample_rate,
                    output=True
                )

                # 播放音频
                output_stream.write(audio_data_bytes)

                # 等待播放完成
                time.sleep(n_frames / sample_rate + 0.1)

                # 关闭流
                output_stream.stop_stream()
                output_stream.close()

            return True

        except Exception as e:
            logger.error(f"PyAudio 播放失败: {e}")
            return False

    def play_notification_sound(self):
        """播放通知音"""
        try:
            # 生成一个简单的提示音
            duration = 0.1
            frequency = 1000
            t = np.linspace(0, duration, int(self.sample_rate * duration), False)
            tone = 0.3 * np.sin(2 * np.pi * frequency * t)

            # 转换为字节
            tone_bytes = (tone * 32767).astype(np.int16).tobytes()

            # 播放提示音
            stream = self.pyaudio_instance.open(
                format=pyaudio.paInt16,
                channels=1,
                rate=self.sample_rate,
                output=True
            )

            stream.write(tone_bytes)
            stream.stop_stream()
            stream.close()

        except Exception as e:
            logger.error(f"播放提示音失败: {e}")

    def set_callbacks(self,
                      on_wake_word: Optional[Callable] = None,
                      on_transcription: Optional[Callable] = None,
                      on_error: Optional[Callable] = None):
        """
        设置回调函数

        Args:
            on_wake_word: 唤醒词检测回调
            on_transcription: 语音识别回调
            on_error: 错误回调
        """
        self.on_wake_word_callback = on_wake_word
        self.on_transcription_callback = on_transcription
        self.on_error_callback = on_error

    def get_status(self) -> Dict[str, Any]:
        """获取语音控制状态"""
        return {
            'running': self.is_running,
            'listening': self.is_listening,
            'wake_word_ready': self.porcupine,
            'stt_ready': self.whisper_model,
            'tts_ready': self.tts_engine,
            'sample_rate': self.sample_rate
        }

助手类

"""
语音助手集成类
将语音控制集成到主助手中
"""

from typing import Optional, Dict, Any
from src.utils.config_manager import ConfigManager
from src.features.voice_control import VoiceControl
from src.utils.logger import LogManager

logger = LogManager().setup_logger("voice_assistant")


class VoiceAssistant:
    """语音助手"""

    def __init__(self, assistant, config_manager: ConfigManager):
        """
        初始化语音助手

        Args:
            assistant: 主助手实例
            config_manager: 配置管理器
        """
        self.assistant = assistant
        self.config = config_manager
        self.voice_config=self.config.get('voice', {})
        self.voice_control = None
        self.is_enabled = self.voice_config.get('enabled', False)

        # 对话历史
        self.conversation_history = []
        self.max_history = self.voice_config.get('max_history', 10)

        # 初始化语音控制
        self._initialize_voice_control()

    def _initialize_voice_control(self):
        """初始化语音控制"""
        try:
            self.voice_control = VoiceControl(self.config)

            # 设置回调函数
            self.voice_control.set_callbacks(
                on_wake_word=self._on_wake_word_detected,
                on_transcription=self._on_voice_command,
                on_error=self._on_voice_error
            )

            logger.info("语音助手初始化完成")

        except Exception as e:
            logger.error(f"初始化语音助手失败: {e}")
            self.is_enabled = False

    def _on_wake_word_detected(self):
        """唤醒词检测回调"""
        logger.info("唤醒词检测到")

        # 播放提示音
        self.speak("我在听,请说")

    def _on_voice_command(self, text: str):
        """语音命令处理回调"""
        logger.info(f"收到语音命令: {text}")

        # 添加到历史
        self._add_to_history(f"用户: {text}")

        # 处理命令
        response = self._process_voice_command(text)

        # 语音回复
        if response:
            self._add_to_history(f"助手: {response}")
            self.speak(response)

    def _on_voice_error(self, error: str):
        """语音错误回调"""
        logger.error(f"语音错误: {error}")

    def _process_voice_command(self, text: str) -> Optional[str]:
        """
        处理语音命令

        Args:
            text: 语音命令文本

        Returns:
            回复文本
        """
        try:
            # 检查是否为特殊命令
            if self._is_special_command(text):
                return self._handle_special_command(text)

            # 交给主助手处理
            if hasattr(self.assistant, 'chat'):
                return self.assistant.chat(text)

            # 默认回复
            return f"收到命令: {text}"

        except Exception as e:
            logger.error(f"处理语音命令失败: {e}")
            return "抱歉,处理命令时出错"

    def _is_special_command(self, text: str) -> bool:
        """检查是否为特殊命令"""
        special_commands = ['退出', '停止', '关闭语音', '开启语音', '静音', '取消']
        text_lower = text.lower()

        for cmd in special_commands:
            if cmd in text_lower:
                return True

        return False

    def _handle_special_command(self, text: str) -> Optional[str]:
        """处理特殊命令"""
        text_lower = text.lower()

        if '退出' in text_lower or '停止' in text_lower:
            self.stop()
            return "语音助手已停止"

        elif '关闭语音' in text_lower:
            self.set_enabled(False)
            return "语音功能已关闭"

        elif '开启语音' in text_lower:
            self.set_enabled(True)
            return "语音功能已开启"

        elif '静音' in text_lower:
            self.set_muted(True)
            return "已静音"

        elif '取消' in text_lower:
            return "好的,已取消"

        return None

    def _add_to_history(self, text: str):
        """添加到历史记录"""
        self.conversation_history.append(text)

        # 保持历史记录长度
        if len(self.conversation_history) > self.max_history:
            self.conversation_history = self.conversation_history[-self.max_history:]

    def speak(self, text: str):
        """语音合成"""
        if not self.is_enabled or not self.voice_control:
            return

        try:
            # 语音合成
            success = self.voice_control.speak(text)

            if not success:
                logger.warning("语音合成失败")

        except Exception as e:
            logger.error(f"语音合成错误: {e}")

    def start(self):
        """启动语音助手,返回是否成功"""
        if not self.is_enabled:
            logger.warning("语音助手未启用")
            return False  # 明确返回 False

        if not self.voice_control:
            logger.warning("语音控制未初始化")
            return False  # 明确返回 False

        try:
            self.voice_control.start()
            logger.info("语音助手已启动")
            return True  # 成功时返回 True

        except Exception as e:
            logger.error(f"启动语音助手失败: {e}")
            return False  # 失败时返回 False

    def stop(self):
        """停止语音助手"""
        if self.voice_control:
            self.voice_control.stop()
            logger.info("语音助手已停止")

    def set_enabled(self, enabled: bool):
        """启用/禁用语音助手"""
        self.is_enabled = enabled

        if enabled and not self.voice_control:
            self._initialize_voice_control()

        if enabled and self.voice_control:
            self.start()
        elif self.voice_control:
            self.stop()

        # 保存配置
        self.voice_config.set('enabled', enabled)
        self.voice_config.save()

    def set_muted(self, muted: bool):
        """设置静音"""
        # 这里可以扩展静音功能
        pass

    def get_status(self) -> Dict[str, Any]:
        """获取状态"""
        status = {
            'enabled': self.is_enabled,
            'history_count': len(self.conversation_history)
        }

        if self.voice_control:
            status.update(self.voice_control.get_status())

        return status

    def get_conversation_history(self) -> list:
        """获取对话历史"""
        return self.conversation_history.copy()

    def clear_history(self):
        """清空历史记录"""
        self.conversation_history.clear()

对外使用只需要初始化一下,start一下就可以保持在后台运行。

        if self.voice_enabled:
            try:
                self.voice_assistant = VoiceAssistant(self, config)
                # 立即启动语音助手(后台运行)
                if self.voice_assistant.start():
                    logger.info("语音助手已在后台启动")
                    print("🎤 语音助手已启动(后台运行)")
                else:
                    logger.warning("语音助手启动失败")
                    print("⚠️  语音助手启动失败")

                logger.info("语音助手初始化完成")
            except Exception as e:
                logger.error(f"初始化语音助手失败: {e}")
                print(f"❌ 语音助手初始化失败: {e}")
                self.voice_enabled = False

总结

目前算是能实现唤醒,识别,合成和播放,但是一次唤醒只能对话一次,我觉得这点后续还可以改进。

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐