一、前言

      为了满足实验课对人机对话系统的设计要求,我们在现有Chat对话模型的基础上进行了功能扩展。当前系统已实现基于文本输入的人机交互功能,现计划通过以下技术升级优化用户体验:首先,集成语音识别(STT)模型,将用户的语音提问实时转换为文本;然后将识别结果输入预训练的聊天大模型进行处理;最后通过语音合成(TTS)技术将模型的文本回复转换为自然语音输出。这种端到端的语音交互方案能够显著提升人机对话的自然度和沉浸感,更贴近真实的人际交流场景。

Flask项目源码:SenseVoice: SenseVoice语音识别简单的flask部署使用

二、STT模型将语音转为文字

        SenseVoice语音大模型专注于 语音识别(ASR)、语音合成(TTS) 以及 端到端语音对话 任务。它能够实现 高准确率的语音转文字(STT) 和 自然流畅的语音生成(TTS),适用于智能助手、实时对话系统等场景。

        项目github地址:

https://github.com/FunAudioLLM/SenseVoice

        代码被我简化过,因为我只需要部分功能,并使用Flask快速构建起服务,语音识别的APi接口如下:


@app.route("/voiceapi",methods=["POST"])
def SenseVoiceApi():

    if "vfile" not in request.files:
        return jsonify({"error": "No voice file part"}),400
    #获取传过来的mp3文件或者是wav音频文件

    input_voice = request.files["vfile"]

    if input_voice.filename == "":
        return jsonify({"error": "No selected file"}), 400
    
    if not (input_voice.filename.lower().endswith('.mp3') or input_voice.filename.lower().endswith('.wav')):
        return jsonify({"error": "Only MP3 and WAV files are allowed"}), 400
    
    detecte_voice = None

    output_dir = "./output_voice/"
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    voice_path = output_dir+input_voice.filename
    input_voice.save(voice_path)
    sv = SenseVoice()
    #params:input_wav 传入的音频文件   language 选择的语言 fs 时长 dafult = 16000
    language = "zh"

    detecte_voice = sv.model_inference(voice_path,language)
     
    return detecte_voice,200

        只需要客户端将录入的语音文件Post请求发送到服务端即可,服务端会保存原始语音,并调用SenseVoice模型识别。

        SenseVoice模型主要代码如下:

        SenseVoice 支持识别多种语言,中文、英语等等,但是考虑到实验课的简单需要,在服务端已经固定language参数为中文了,有需求可以自行修改 Api接口。

# coding=utf-8
import os
import librosa
import base64
import io
import gradio as gr
import re

import numpy as np
import torch
import torchaudio
import re

from funasr import AutoModel

class SenseVoice:

    def __init__(self):
        
        self.m = "iic/SenseVoiceSmall"
        self.model = AutoModel(model=self.m,
				  vad_model="iic/speech_fsmn_vad_zh-cn-16k-common-pytorch",
				  vad_kwargs={"max_single_segment_time": 30000},
				  trust_remote_code=True,
				  )
        self.set_dict()

    def model_inference(self,input_wav, language, fs=16000):
        # task_abbr = {"Speech Recognition": "ASR", "Rich Text Transcription": ("ASR", "AED", "SER")}
        language_abbr = {"auto": "auto", "zh": "zh", "en": "en", "yue": "yue", "ja": "ja", "ko": "ko",
                        "nospeech": "nospeech"}
        
        # task = "Speech Recognition" if task is None else task
        language = "auto" if len(language) < 1 else language
        selected_language = language_abbr[language]
        # selected_task = task_abbr.get(task)
        
        # print(f"input_wav: {type(input_wav)}, {input_wav[1].shape}, {input_wav}")
        
        if isinstance(input_wav, tuple):
            fs, input_wav = input_wav
            input_wav = input_wav.astype(np.float32) / np.iinfo(np.int16).max
            if len(input_wav.shape) > 1:
                input_wav = input_wav.mean(-1)
            if fs != 16000:
                print(f"audio_fs: {fs}")
                resampler = torchaudio.transforms.Resample(fs, 16000)
                input_wav_t = torch.from_numpy(input_wav).to(torch.float32)
                input_wav = resampler(input_wav_t[None, :])[0, :].numpy()
        
        
        merge_vad = True #False if selected_task == "ASR" else True
        print(f"language: {language}, merge_vad: {merge_vad}")
        text = self.model.generate(input=input_wav,
                            cache={},
                            language=language,
                            use_itn=True,
                            batch_size_s=60, merge_vad=merge_vad)
        
        print(text)
        text = text[0]["text"]
        text = self.format_str_v3(text)
        
        print(text)
        
        return text
    def format_str(self,s):
        for sptk in self.emoji_dict:
            s = s.replace(sptk, self.emoji_dict[sptk])
        return s


    def format_str_v2(self,s):
        sptk_dict = {}
        for sptk in self.emoji_dict:
            sptk_dict[sptk] = s.count(sptk)
            s = s.replace(sptk, "")
        emo = "<|NEUTRAL|>"
        for e in self.emo_dict:
            if sptk_dict[e] > sptk_dict[emo]:
                emo = e
        for e in self.event_dict:
            if sptk_dict[e] > 0:
                s = self.event_dict[e] + s
        s = s + self.emo_dict[emo]

        for emoji in self.emo_set.union(self.event_set):
            s = s.replace(" " + emoji, emoji)
            s = s.replace(emoji + " ", emoji)
        return s.strip()

    def format_str_v3(self,s):
        def get_emo(s):
            return s[-1] if s[-1] in self.emo_set else None
        def get_event(s):
            return s[0] if s[0] in self.event_set else None

        s = s.replace("<|nospeech|><|Event_UNK|>", "❓")
        for lang in self.lang_dict:
            s = s.replace(lang, "<|lang|>")
        s_list = [self.format_str_v2(s_i).strip(" ") for s_i in s.split("<|lang|>")]
        new_s = " " + s_list[0]
        cur_ent_event = get_event(new_s)
        for i in range(1, len(s_list)):
            if len(s_list[i]) == 0:
                continue
            if get_event(s_list[i]) == cur_ent_event and get_event(s_list[i]) != None:
                s_list[i] = s_list[i][1:]
            #else:
            cur_ent_event = get_event(s_list[i])
            if get_emo(s_list[i]) != None and get_emo(s_list[i]) == get_emo(new_s):
                new_s = new_s[:-1]
            new_s += s_list[i].strip().lstrip()
        new_s = new_s.replace("The.", " ")
        return new_s.strip()




    def set_dict(self):
        self.emo_dict = {
	"<|HAPPY|>": "😊",
	"<|SAD|>": "😔",
	"<|ANGRY|>": "😡",
	"<|NEUTRAL|>": "",
	"<|FEARFUL|>": "😰",
	"<|DISGUSTED|>": "🤢",
	"<|SURPRISED|>": "😮",
}
        
        self.event_dict = {
	"<|BGM|>": "🎼",
	"<|Speech|>": "",
	"<|Applause|>": "👏",
	"<|Laughter|>": "😀",
	"<|Cry|>": "😭",
	"<|Sneeze|>": "🤧",
	"<|Breath|>": "",
	"<|Cough|>": "🤧",
}

        self.emoji_dict = {
	"<|nospeech|><|Event_UNK|>": "❓",
	"<|zh|>": "",
	"<|en|>": "",
	"<|yue|>": "",
	"<|ja|>": "",
	"<|ko|>": "",
	"<|nospeech|>": "",
	"<|HAPPY|>": "😊",
	"<|SAD|>": "😔",
	"<|ANGRY|>": "😡",
	"<|NEUTRAL|>": "",
	"<|BGM|>": "🎼",
	"<|Speech|>": "",
	"<|Applause|>": "👏",
	"<|Laughter|>": "😀",
	"<|FEARFUL|>": "😰",
	"<|DISGUSTED|>": "🤢",
	"<|SURPRISED|>": "😮",
	"<|Cry|>": "😭",
	"<|EMO_UNKNOWN|>": "",
	"<|Sneeze|>": "🤧",
	"<|Breath|>": "",
	"<|Cough|>": "😷",
	"<|Sing|>": "",
	"<|Speech_Noise|>": "",
	"<|withitn|>": "",
	"<|woitn|>": "",
	"<|GBG|>": "",
	"<|Event_UNK|>": "",
}

        self.lang_dict =  {
    "<|zh|>": "<|lang|>",
    "<|en|>": "<|lang|>",
    "<|yue|>": "<|lang|>",
    "<|ja|>": "<|lang|>",
    "<|ko|>": "<|lang|>",
    "<|nospeech|>": "<|lang|>",
}

        self.emo_set = {"😊", "😔", "😡", "😰", "🤢", "😮"}
        self.event_set = {"🎼", "👏", "😀", "😭", "🤧", "😷",}



        self.audio_examples = [
    ["example/zh.mp3", "zh"],
    ["example/yue.mp3", "yue"],
    ["example/en.mp3", "en"],
    ["example/ja.mp3", "ja"],
    ["example/ko.mp3", "ko"],
    ["example/emo_1.wav", "auto"],
    ["example/emo_2.wav", "auto"],
    ["example/emo_3.wav", "auto"],
    #["example/emo_4.wav", "auto"],
    #["example/event_1.wav", "auto"],
    #["example/event_2.wav", "auto"],
    #["example/event_3.wav", "auto"],
    ["example/rich_1.wav", "auto"],
    ["example/rich_2.wav", "auto"],
    #["example/rich_3.wav", "auto"],
    ["example/longwav_1.wav", "auto"],
    ["example/longwav_2.wav", "auto"],
    ["example/longwav_3.wav", "auto"],
    #["example/longwav_4.wav", "auto"],
]

        初次调用APi会下载模型,这可能导致第一次请求超时或者失败,所以等模型下载完成,再试一次即可。

三、通义千问语音合成模型(TTS)

        阿里百炼平台提供了多个语音合成模型,项目使用的是基础语音合成模型 ,使用请Api,只需要去申请一个Api即可,非常方便。传送门:qwen-tts

qwen-tts

 服务端代码如下:

@app.route("/tts",methods=["GET"])
def tts():

    text = request.args.get("text", "")
    response = dashscope.audio.qwen_tts.SpeechSynthesizer.call(
        model="qwen-tts",
        api_key="keyxxxxxxxxxxxxxxxxxxxx",
        text=text,
        voice="Cherry",
    )
    
    save_dir = "/dir/"
    audio_url = response.output.audio["url"]
    random_Word = str(uuid.uuid4())[:8]+".wav"
    save_path = save_dir + random_Word  # 自定义保存路径

    try:
        response = requests.get(audio_url)
        response.raise_for_status()  # 检查请求是否成功
        with open(save_path, 'wb') as f:
            f.write(response.content)
        print(f"音频文件已保存至:{save_path}")
    except Exception as e:
        print(f"下载失败:{str(e)}")
    
    return save_path,200

        这里简单使用了Get请求,text的关键内容就是Chat模型聊天返回的结果,通过保存的位置,用户端去请求这个语音合成文件,调用audio标签自动播放聊天模型回答的内容,从而实现整个人机对话过程。

四、项目启动

配置好自己的Conda虚拟环境,并安装好所需要的包以后,运行

python flaskapp.py

flask服务默认工作再本地5005端口。

安装所需包,cd到根目录

pip install -r requirements.txt

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐