写一个类似sensevoice demo的音频转写python代码
·
sensevocie webui.py demo页
部署sensevoice small后,正常启动webui.py可以看到如下的web页面
python代码说明:
实测效果很好,但不适合API调用。并且sensevoice API调用要对官网下载的api.py进行下修改,否则会出现标点符号丢失的情况,关于sensevoice API.py文件的配置问题,在下一篇文章里再做详细说明,近期正好测试funasr,索性用funasr apil来写个流式转写,进行测试。
关于funasr的部署和使用可以参考官网的文档
首先上运行后的截图:
修改说明:
根据自己服务器情况自行修改funasr服务器地址和端口。
修改位置如下图:
主机不用说了,就是你服务器的地址或域名
端口:我这里服务器使用的是10096,服务器端口配置具体参考funasr官网文档
使用离线模型,免得每次启动要检查模型升级
测试服务器没有配置SSL,所以使用ws协议而不使用wss协议
最后一项是标点设置。
源代码:
免费开放源代码,需要引用的请保留关于版权和版本信息
'''
* Copyright 2025 ddwangrun Technologies Co., Ltd
*
* Licensed under the Apache License, Version 1.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* by ddyang yangzhaoqing@mail.yuanxiang.net.cn for more details.
'''
import os
import time
import websockets
import asyncio
from queue import Queue
import argparse
import json
import logging
import ssl # 添加ssl模块的导入
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow, QPushButton, QVBoxLayout, QWidget, QFileDialog, QLabel, QTextEdit, QMessageBox
from PyQt5.QtCore import Qt, QTimer
import threading
# 设置日志级别为 DEBUG 并记录到文件
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w',
format='%(asctime)s - %(levelname)s - %(message)s')
class WebSocketClient:
def __init__(self, host, port, mode, audio_in, output_dir, ssl_flag, use_itn):
self.host = host
self.port = port
self.mode = mode
self.audio_in = audio_in
self.output_dir = output_dir
self.ssl_flag = ssl_flag
self.use_itn = use_itn
self.uri = f"wss://{host}:{port}" if ssl_flag else f"ws://{host}:{port}"
self.websocket = None
self.offline_msg_done = False
self.text_print = ""
self.final_text = ""
async def record_from_scp(self, websocket, chunk_begin, chunk_size):
if self.audio_in.endswith(".scp"):
with open(self.audio_in) as f_scp:
wavs = f_scp.readlines()
else:
wavs = [self.audio_in]
fst_dict = {}
hotword_msg = ""
sample_rate = 16000
wav_format = "pcm"
use_itn = bool(self.use_itn)
if chunk_size > 0:
wavs = wavs[chunk_begin:chunk_begin + chunk_size]
for wav in wavs:
wav_splits = wav.strip().split()
wav_name = wav_splits[0] if len(wav_splits) > 1 else "demo"
wav_path = wav_splits[1] if len(wav_splits) > 1 else wav_splits[0]
if not wav_path.strip():
continue
if wav_path.endswith(".pcm"):
with open(wav_path, "rb") as f:
audio_bytes = f.read()
elif wav_path.endswith(".wav"):
import wave
with wave.open(wav_path, "rb") as wav_file:
params = wav_file.getparams()
sample_rate = wav_file.getframerate()
frames = wav_file.readframes(wav_file.getnframes())
audio_bytes = bytes(frames)
else:
wav_format = "others"
with open(wav_path, "rb") as f:
audio_bytes = f.read()
stride = int(60 * 10 / 10 / 1000 * sample_rate * 2)
chunk_num = (len(audio_bytes) - 1) // stride + 1
message = json.dumps({"mode": self.mode, "chunk_size": [5, 10, 5], "chunk_interval": 10, "audio_fs": sample_rate,
"wav_name": wav_name, "wav_format": wav_format, "is_speaking": True, "hotwords": hotword_msg, "itn": use_itn})
await websocket.send(message)
logging.debug(f"Sent initial message: {message}")
is_speaking = True
for i in range(chunk_num):
beg = i * stride
data = audio_bytes[beg:beg + stride]
await websocket.send(data)
logging.debug(f"Sent audio chunk {i+1}/{chunk_num}")
if i == chunk_num - 1:
is_speaking = False
message = json.dumps({"is_speaking": is_speaking})
await websocket.send(message)
logging.debug(f"Sent final speaking status: {message}")
sleep_duration = 0.001 if self.mode == "offline" else 60 * 10 / 10 / 1000
await asyncio.sleep(sleep_duration)
if not self.mode == "offline":
await asyncio.sleep(2)
if self.mode == "offline":
while not self.offline_msg_done:
await asyncio.sleep(1)
await websocket.close()
logging.info("Websocket closed after processing.")
async def message_handler(self, websocket, id):
ibest_writer = None
try:
while True:
meg = await websocket.recv()
meg = json.loads(meg)
wav_name = meg.get("wav_name", "demo")
text = meg["text"]
timestamp = ""
self.offline_msg_done = meg.get("is_final", False)
if "timestamp" in meg:
timestamp = meg["timestamp"]
if self.output_dir is not None:
ibest_writer = open(os.path.join(self.output_dir, "text.{}".format(id)), "a", encoding="utf-8")
if timestamp != "":
text_write_line = "{}\t{}\t{}\n".format(wav_name, text, timestamp)
else:
text_write_line = "{}\t{}\n".format(wav_name, text)
ibest_writer.write(text_write_line)
logging.debug(f"Wrote to file: {text_write_line}")
if 'mode' not in meg:
continue
if meg["mode"] == "online":
self.text_print += "{}".format(text)
self.final_text += "{}".format(text)
self.text_print = self.text_print[-10000:]
logging.debug(f"Updated online text: {self.text_print}")
elif meg["mode"] == "offline":
if timestamp != "":
self.text_print += "{} timestamp: {}\n".format(text, timestamp)
self.final_text += "{} timestamp: {}\n".format(text, timestamp)
else:
self.text_print += "{}\n".format(text)
self.final_text += "{}\n".format(text)
self.offline_msg_done = True
logging.debug(f"Updated offline text: {self.text_print}")
else:
self.text_print += "{}".format(text)
self.final_text += "{}".format(text)
self.text_print = self.text_print[-10000:]
logging.debug(f"Updated 2pass text: {self.text_print}")
except websockets.ConnectionClosedOK:
logging.info("Connection closed normally.")
except Exception as e:
logging.error(e)
self.text_print += f"\nError: {e}\n"
self.final_text += f"\nError: {e}\n"
async def main(self):
try:
async with websockets.connect(self.uri, ssl=ssl.create_default_context() if self.ssl_flag else None) as websocket:
self.websocket = websocket
logging.info(f"Connected to server at {self.uri}")
tasks = []
if self.audio_in:
tasks.append(asyncio.create_task(self.record_from_scp(websocket, 0, -1)))
tasks.append(asyncio.create_task(self.message_handler(websocket, 1)))
await asyncio.gather(*tasks)
except ConnectionResetError as e:
logging.error(f"Connection reset by peer: {e}")
self.text_print += f"\nConnection reset by peer: {e}\n"
self.final_text += f"\nConnection reset by peer: {e}\n"
except websockets.ConnectionClosedOK:
logging.info("Connection closed normally.")
except Exception as e:
logging.error(f"An error occurred: {e}")
self.text_print += f"\nAn error occurred: {e}\n"
self.final_text += f"\nAn error occurred: {e}\n"
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("wav文件流式转写for dd-wangrun")
self.setGeometry(100, 100, 600, 400)
layout = QVBoxLayout()
self.label = QLabel("Select an audio file:", self)
layout.addWidget(self.label)
self.file_button = QPushButton("Browse...", self)
self.file_button.clicked.connect(self.browse_audio)
layout.addWidget(self.file_button)
self.start_button = QPushButton("Start Processing", self)
self.start_button.clicked.connect(self.start_processing)
layout.addWidget(self.start_button)
self.status_label = QLabel("", self)
layout.addWidget(self.status_label)
self.result_text = QTextEdit(self)
self.result_text.setReadOnly(True)
layout.addWidget(self.result_text)
container = QWidget()
container.setLayout(layout)
self.setCentralWidget(container)
self.client = None
self.timer = QTimer(self)
self.timer.timeout.connect(self.update_result_text)
# Connect the close event to stop the event loop
self.setAttribute(Qt.WA_DeleteOnClose)
self.destroyed.connect(self.on_destroyed)
def browse_audio(self):
options = QFileDialog.Options()
filename, _ = QFileDialog.getOpenFileName(self, "QFileDialog.getOpenFileName()", "", "Audio Files (*.wav *.pcm);;All Files (*)", options=options)
if filename:
self.audio_path = filename
self.label.setText(f"Selected file: {filename}")
def start_processing(self):
if hasattr(self, 'audio_path'):
self.client = WebSocketClient(
host="xxx.xxxxxx.xxxx.cn",
port=10096,
mode="offline",
audio_in=self.audio_path,
output_dir="./results",
ssl_flag=False, # 设置为 False 如果服务器不支持 SSL
use_itn=True
)
self.timer.start(100)
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)
self.task = self.event_loop.create_task(self.client.main())
self.thread = threading.Thread(target=self.event_loop.run_until_complete, args=(self.task,))
self.thread.start()
self.status_label.setText("请等待,正在处理文件和转写文本......")
else:
self.label.setText("Please select an audio file first.")
QMessageBox.warning(self, "Warning", "Please select an audio file first.")
def update_result_text(self):
if self.client:
if self.client.text_print:
self.result_text.setText(self.client.text_print)
if self.client.offline_msg_done and self.client.final_text:
self.status_label.setText("")
base_name = os.path.splitext(os.path.basename(self.audio_path))[0]
output_file_path = os.path.join(os.path.dirname(self.audio_path), f"{base_name}.txt")
self.result_text.setText(f"转写文件完成:文件路径 {output_file_path} 请查看")
self.save_transcription_to_file(self.client.final_text)
self.timer.stop() # 停止定时器
def save_transcription_to_file(self, text):
base_name = os.path.splitext(os.path.basename(self.audio_path))[0]
output_file_path = os.path.join(os.path.dirname(self.audio_path), f"{base_name}.txt")
try:
with open(output_file_path, "w", encoding="utf-8") as file:
file.write(text)
logging.info(f"Transcription saved to {output_file_path}")
except Exception as e:
logging.error(f"Failed to save transcription to {output_file_path}: {e}")
QMessageBox.critical(self, "Error", f"Failed to save transcription to {output_file_path}: {e}")
def on_destroyed(self):
if self.event_loop.is_running():
self.event_loop.stop()
if self.thread.is_alive():
self.thread.join()
def run_app():
app = QApplication(sys.argv)
window = MainWindow()
window.show()
return_code = app.exec_()
sys.exit(return_code)
if __name__ == "__main__":
try:
run_app()
except KeyboardInterrupt:
print("\nProgram interrupted by user.")
sys.exit(1)
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)