sensevocie webui.py demo页

部署sensevoice small后,正常启动webui.py可以看到如下的web页面
示例截图

python代码说明:

实测效果很好,但不适合API调用。并且sensevoice API调用要对官网下载的api.py进行下修改,否则会出现标点符号丢失的情况,关于sensevoice API.py文件的配置问题,在下一篇文章里再做详细说明,近期正好测试funasr,索性用funasr apil来写个流式转写,进行测试。
关于funasr的部署和使用可以参考官网的文档
首先上运行后的截图:
流式转写截图

修改说明:

根据自己服务器情况自行修改funasr服务器地址和端口。
修改位置如下图:
修改位置如上图
主机不用说了,就是你服务器的地址或域名
端口:我这里服务器使用的是10096,服务器端口配置具体参考funasr官网文档
使用离线模型,免得每次启动要检查模型升级
测试服务器没有配置SSL,所以使用ws协议而不使用wss协议
最后一项是标点设置。

源代码:

免费开放源代码,需要引用的请保留关于版权和版本信息

'''
 * Copyright 2025 ddwangrun Technologies Co., Ltd
 *
 * Licensed under the Apache License, Version 1.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * by ddyang yangzhaoqing@mail.yuanxiang.net.cn for more details.
'''

import os
import time
import websockets
import asyncio
from queue import Queue
import argparse
import json
import logging
import ssl  # 添加ssl模块的导入
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget, QFileDialog, QLabel, QTextEdit, QMessageBox
from PyQt5.QtCore import Qt, QTimer
import threading

# 设置日志级别为 DEBUG 并记录到文件
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w',
                    format='%(asctime)s - %(levelname)s - %(message)s')

class WebSocketClient:
    def __init__(self, host, port, mode, audio_in, output_dir, ssl_flag, use_itn):
        self.host = host
        self.port = port
        self.mode = mode
        self.audio_in = audio_in
        self.output_dir = output_dir
        self.ssl_flag = ssl_flag
        self.use_itn = use_itn
        self.uri = f"wss://{host}:{port}" if ssl_flag else f"ws://{host}:{port}"
        self.websocket = None
        self.offline_msg_done = False
        self.text_print = ""
        self.final_text = ""

    async def record_from_scp(self, websocket, chunk_begin, chunk_size):
        if self.audio_in.endswith(".scp"):
            with open(self.audio_in) as f_scp:
                wavs = f_scp.readlines()
        else:
            wavs = [self.audio_in]

        fst_dict = {}
        hotword_msg = ""
        sample_rate = 16000
        wav_format = "pcm"
        use_itn = bool(self.use_itn)

        if chunk_size > 0:
            wavs = wavs[chunk_begin:chunk_begin + chunk_size]
        for wav in wavs:
            wav_splits = wav.strip().split()
            wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo"
            wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0]
            if not wav_path.strip():
                continue
            if wav_path.endswith(".pcm"):
                with open(wav_path, "rb") as f:
                    audio_bytes = f.read()
            elif wav_path.endswith(".wav"):
                import wave
                with wave.open(wav_path, "rb") as wav_file:
                    params = wav_file.getparams()
                    sample_rate = wav_file.getframerate()
                    frames = wav_file.readframes(wav_file.getnframes())
                    audio_bytes = bytes(frames)
            else:
                wav_format = "others"
                with open(wav_path, "rb") as f:
                    audio_bytes = f.read()

            stride = int(60 * 10 / 10 / 1000 * sample_rate * 2)
            chunk_num = (len(audio_bytes) - 1) // stride + 1

            message = json.dumps({"mode": self.mode, "chunk_size": [5, 10, 5], "chunk_interval": 10, "audio_fs": sample_rate,
                                  "wav_name": wav_name, "wav_format": wav_format, "is_speaking": True, "hotwords": hotword_msg, "itn": use_itn})

            await websocket.send(message)
            logging.debug(f"Sent initial message: {message}")
            is_speaking = True
            for i in range(chunk_num):
                beg = i * stride
                data = audio_bytes[beg:beg + stride]
                await websocket.send(data)
                logging.debug(f"Sent audio chunk {i+1}/{chunk_num}")

                if i == chunk_num - 1:
                    is_speaking = False
                    message = json.dumps({"is_speaking": is_speaking})
                    await websocket.send(message)
                    logging.debug(f"Sent final speaking status: {message}")

                sleep_duration = 0.001 if self.mode == "offline" else 60 * 10 / 10 / 1000
                await asyncio.sleep(sleep_duration)

            if not self.mode == "offline":
                await asyncio.sleep(2)

            if self.mode == "offline":
                while not self.offline_msg_done:
                    await asyncio.sleep(1)

            await websocket.close()
            logging.info("Websocket closed after processing.")

    async def message_handler(self, websocket, id):
        ibest_writer = None
        try:
            while True:
                meg = await websocket.recv()
                meg = json.loads(meg)
                wav_name = meg.get("wav_name", "demo")
                text = meg["text"]
                timestamp = ""
                self.offline_msg_done = meg.get("is_final", False)
                if "timestamp" in meg:
                    timestamp = meg["timestamp"]

                if self.output_dir is not None:
                    ibest_writer = open(os.path.join(self.output_dir, "text.{}".format(id)), "a", encoding="utf-8")
                    if timestamp != "":
                        text_write_line = "{}\t{}\t{}\n".format(wav_name, text, timestamp)
                    else:
                        text_write_line = "{}\t{}\n".format(wav_name, text)
                    ibest_writer.write(text_write_line)
                    logging.debug(f"Wrote to file: {text_write_line}")

                if 'mode' not in meg:
                    continue
                if meg["mode"] == "online":
                    self.text_print += "{}".format(text)
                    self.final_text += "{}".format(text)
                    self.text_print = self.text_print[-10000:]
                    logging.debug(f"Updated online text: {self.text_print}")
                elif meg["mode"] == "offline":
                    if timestamp != "":
                        self.text_print += "{} timestamp: {}\n".format(text, timestamp)
                        self.final_text += "{} timestamp: {}\n".format(text, timestamp)
                    else:
                        self.text_print += "{}\n".format(text)
                        self.final_text += "{}\n".format(text)
                    self.offline_msg_done = True
                    logging.debug(f"Updated offline text: {self.text_print}")
                else:
                    self.text_print += "{}".format(text)
                    self.final_text += "{}".format(text)
                    self.text_print = self.text_print[-10000:]
                    logging.debug(f"Updated 2pass text: {self.text_print}")
        except websockets.ConnectionClosedOK:
            logging.info("Connection closed normally.")
        except Exception as e:
            logging.error(e)
            self.text_print += f"\nError: {e}\n"
            self.final_text += f"\nError: {e}\n"

    async def main(self):
        try:
            async with websockets.connect(self.uri, ssl=ssl.create_default_context() if self.ssl_flag else None) as websocket:
                self.websocket = websocket
                logging.info(f"Connected to server at {self.uri}")
                
                tasks = []
                if self.audio_in:
                    tasks.append(asyncio.create_task(self.record_from_scp(websocket, 0, -1)))
                tasks.append(asyncio.create_task(self.message_handler(websocket, 1)))

                await asyncio.gather(*tasks)
        except ConnectionResetError as e:
            logging.error(f"Connection reset by peer: {e}")
            self.text_print += f"\nConnection reset by peer: {e}\n"
            self.final_text += f"\nConnection reset by peer: {e}\n"
        except websockets.ConnectionClosedOK:
            logging.info("Connection closed normally.")
        except Exception as e:
            logging.error(f"An error occurred: {e}")
            self.text_print += f"\nAn error occurred: {e}\n"
            self.final_text += f"\nAn error occurred: {e}\n"

class MainWindow(QMainWindow):
    def __init__(self):
        super().__init__()

        self.setWindowTitle("wav文件流式转写for dd-wangrun")
        self.setGeometry(100, 100, 600, 400)

        layout = QVBoxLayout()

        self.label = QLabel("Select an audio file:", self)
        layout.addWidget(self.label)

        self.file_button = QPushButton("Browse...", self)
        self.file_button.clicked.connect(self.browse_audio)
        layout.addWidget(self.file_button)

        self.start_button = QPushButton("Start Processing", self)
        self.start_button.clicked.connect(self.start_processing)
        layout.addWidget(self.start_button)

        self.status_label = QLabel("", self)
        layout.addWidget(self.status_label)

        self.result_text = QTextEdit(self)
        self.result_text.setReadOnly(True)
        layout.addWidget(self.result_text)

        container = QWidget()
        container.setLayout(layout)
        self.setCentralWidget(container)

        self.client = None
        self.timer = QTimer(self)
        self.timer.timeout.connect(self.update_result_text)

        # Connect the close event to stop the event loop
        self.setAttribute(Qt.WA_DeleteOnClose)
        self.destroyed.connect(self.on_destroyed)

    def browse_audio(self):
        options = QFileDialog.Options()
        filename, _ = QFileDialog.getOpenFileName(self, "QFileDialog.getOpenFileName()", "", "Audio Files (*.wav *.pcm);;All Files (*)", options=options)
        if filename:
            self.audio_path = filename
            self.label.setText(f"Selected file: {filename}")

    def start_processing(self):
        if hasattr(self, 'audio_path'):
            self.client = WebSocketClient(
                host="xxx.xxxxxx.xxxx.cn",
                port=10096,
                mode="offline",
                audio_in=self.audio_path,
                output_dir="./results",
                ssl_flag=False,  # 设置为 False 如果服务器不支持 SSL
                use_itn=True
            )
            self.timer.start(100)
            self.event_loop = asyncio.new_event_loop()
            asyncio.set_event_loop(self.event_loop)
            self.task = self.event_loop.create_task(self.client.main())
            self.thread = threading.Thread(target=self.event_loop.run_until_complete, args=(self.task,))
            self.thread.start()
            self.status_label.setText("请等待,正在处理文件和转写文本......")
        else:
            self.label.setText("Please select an audio file first.")
            QMessageBox.warning(self, "Warning", "Please select an audio file first.")

    def update_result_text(self):
        if self.client:
            if self.client.text_print:
                self.result_text.setText(self.client.text_print)
            if self.client.offline_msg_done and self.client.final_text:
                self.status_label.setText("")
                base_name = os.path.splitext(os.path.basename(self.audio_path))[0]
                output_file_path = os.path.join(os.path.dirname(self.audio_path), f"{base_name}.txt")
                self.result_text.setText(f"转写文件完成:文件路径 {output_file_path} 请查看")
                self.save_transcription_to_file(self.client.final_text)
                self.timer.stop()  # 停止定时器

    def save_transcription_to_file(self, text):
        base_name = os.path.splitext(os.path.basename(self.audio_path))[0]
        output_file_path = os.path.join(os.path.dirname(self.audio_path), f"{base_name}.txt")
        try:
            with open(output_file_path, "w", encoding="utf-8") as file:
                file.write(text)
            logging.info(f"Transcription saved to {output_file_path}")
        except Exception as e:
            logging.error(f"Failed to save transcription to {output_file_path}: {e}")
            QMessageBox.critical(self, "Error", f"Failed to save transcription to {output_file_path}: {e}")

    def on_destroyed(self):
        if self.event_loop.is_running():
            self.event_loop.stop()
        if self.thread.is_alive():
            self.thread.join()

def run_app():
    app = QApplication(sys.argv)
    window = MainWindow()
    window.show()
    return_code = app.exec_()
    sys.exit(return_code)

if __name__ == "__main__":
    try:
        run_app()
    except KeyboardInterrupt:
        print("\nProgram interrupted by user.")
        sys.exit(1)



Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐