306 lines
9.2 KiB
Python
306 lines
9.2 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Vosk 语音识别服务
|
||
轻量级、易安装、支持离线
|
||
"""
|
||
|
||
from flask import Flask, request, jsonify
|
||
from flask_cors import CORS
|
||
import os
|
||
import json
|
||
import wave
|
||
from difflib import SequenceMatcher
|
||
|
||
app = Flask(__name__)
|
||
CORS(app)
|
||
|
||
# 全局变量
|
||
vosk_model = None
|
||
model_loaded = False
|
||
|
||
def init_vosk_model():
|
||
"""初始化Vosk模型"""
|
||
global vosk_model, model_loaded
|
||
|
||
try:
|
||
from vosk import Model, KaldiRecognizer
|
||
# 将KaldiRecognizer设为全局变量以便后续使用
|
||
globals()['KaldiRecognizer'] = KaldiRecognizer
|
||
|
||
model_path = "./vosk-model-small-cn-0.22"
|
||
|
||
if not os.path.exists(model_path):
|
||
print(f"[错误] 模型不存在: {model_path}")
|
||
print("请下载模型:https://alphacephei.com/vosk/models/vosk-model-small-cn-0.22.zip")
|
||
return False
|
||
|
||
print(f"正在加载模型: {model_path}")
|
||
vosk_model = Model(model_path)
|
||
model_loaded = True
|
||
print("✓ 模型加载成功!")
|
||
return True
|
||
|
||
except Exception as e:
|
||
print(f"✗ 模型加载失败: {str(e)}")
|
||
model_loaded = False
|
||
return False
|
||
|
||
def convert_audio_to_wav(input_path, output_path):
|
||
"""转换音频为WAV格式(使用pydub)"""
|
||
try:
|
||
from pydub import AudioSegment
|
||
|
||
print(f"[转换] 使用pydub转换音频...")
|
||
|
||
# 加载音频(自动检测格式)
|
||
audio = AudioSegment.from_file(input_path)
|
||
|
||
# 转换为单声道、16kHz、16位WAV
|
||
audio = audio.set_channels(1) # 单声道
|
||
audio = audio.set_frame_rate(16000) # 16kHz采样率
|
||
audio = audio.set_sample_width(2) # 16位
|
||
|
||
# 导出为WAV
|
||
audio.export(output_path, format='wav')
|
||
print(f"[转换] pydub转换成功")
|
||
return True, None
|
||
|
||
except Exception as e:
|
||
error_msg = str(e).lower()
|
||
|
||
# 检查是否是ffmpeg未安装的错误
|
||
if 'ffmpeg' in error_msg or 'ffprobe' in error_msg or 'filenotfounderror' in error_msg:
|
||
return False, (
|
||
"需要安装 ffmpeg 才能转换音频格式。\n"
|
||
"请下载 ffmpeg: https://www.gyan.dev/ffmpeg/builds/\n"
|
||
"或运行: pip install ffmpeg-python\n"
|
||
"错误详情: " + str(e)
|
||
)
|
||
else:
|
||
return False, f"音频转换失败: {str(e)}"
|
||
|
||
def recognize_audio(audio_path):
|
||
"""识别音频文件"""
|
||
converted_path = None
|
||
try:
|
||
# 先尝试直接打开WAV文件
|
||
print(f"[识别] 尝试打开音频文件: {audio_path}")
|
||
try:
|
||
wf = wave.open(audio_path, "rb")
|
||
is_valid_wav = True
|
||
print(f"[识别] 文件是有效的WAV格式")
|
||
except Exception as e:
|
||
is_valid_wav = False
|
||
print(f"[识别] 不是有效的WAV格式: {str(e)}")
|
||
|
||
# 如果不是有效的WAV,尝试转换
|
||
if not is_valid_wav:
|
||
print(f"[识别] 检测到非WAV格式,开始转换...")
|
||
converted_path = audio_path + '.converted.wav'
|
||
success, error = convert_audio_to_wav(audio_path, converted_path)
|
||
|
||
if not success:
|
||
print(f"[识别] 转换失败: {error}")
|
||
return None, f"音频格式转换失败: {error}"
|
||
|
||
print(f"[识别] 转换成功: {converted_path}")
|
||
# 使用转换后的文件
|
||
audio_path = converted_path
|
||
wf = wave.open(audio_path, "rb")
|
||
print(f"[识别] 转换后的文件已打开")
|
||
|
||
# 检查音频参数
|
||
if wf.getnchannels() != 1:
|
||
wf.close()
|
||
return None, "音频必须是单声道"
|
||
|
||
if wf.getsampwidth() != 2:
|
||
wf.close()
|
||
return None, "音频必须是16位"
|
||
|
||
if wf.getframerate() not in [8000, 16000, 32000, 48000]:
|
||
wf.close()
|
||
return None, f"不支持的采样率: {wf.getframerate()}"
|
||
|
||
# 创建识别器
|
||
rec = KaldiRecognizer(vosk_model, wf.getframerate())
|
||
rec.SetWords(True)
|
||
|
||
result_text = ""
|
||
|
||
# 读取并识别
|
||
while True:
|
||
data = wf.readframes(4000)
|
||
if len(data) == 0:
|
||
break
|
||
|
||
if rec.AcceptWaveform(data):
|
||
result = json.loads(rec.Result())
|
||
text = result.get('text', '')
|
||
if text:
|
||
result_text += text + " "
|
||
|
||
# 获取最终结果
|
||
final_result = json.loads(rec.FinalResult())
|
||
final_text = final_result.get('text', '')
|
||
if final_text:
|
||
result_text += final_text
|
||
|
||
wf.close()
|
||
|
||
# 清理转换后的临时文件
|
||
if converted_path and os.path.exists(converted_path):
|
||
try:
|
||
os.remove(converted_path)
|
||
print(f"[识别] 已清理转换文件: {converted_path}")
|
||
except Exception as e:
|
||
print(f"[识别] 清理转换文件失败: {e}")
|
||
|
||
result_text = result_text.strip()
|
||
return result_text, None
|
||
|
||
except Exception as e:
|
||
# 发生错误时也清理转换文件
|
||
if converted_path and os.path.exists(converted_path):
|
||
try:
|
||
os.remove(converted_path)
|
||
except:
|
||
pass
|
||
return None, str(e)
|
||
|
||
def calculate_similarity(text1, text2):
|
||
"""计算文本相似度(0-100分)"""
|
||
if not text1 or not text2:
|
||
return 0
|
||
|
||
# 去除空格
|
||
text1 = text1.replace(" ", "")
|
||
text2 = text2.replace(" ", "")
|
||
|
||
if not text1 or not text2:
|
||
return 0
|
||
|
||
# 计算相似度
|
||
similarity = SequenceMatcher(None, text1, text2).ratio()
|
||
return round(similarity * 100, 2)
|
||
|
||
@app.route('/api/speech/recognize', methods=['POST'])
|
||
def recognize():
|
||
"""语音识别接口"""
|
||
try:
|
||
# 检查模型是否加载
|
||
if not model_loaded:
|
||
return jsonify({
|
||
'code': 500,
|
||
'msg': '模型未加载,请检查服务器日志'
|
||
}), 500
|
||
|
||
# 检查文件
|
||
if 'audio' not in request.files:
|
||
return jsonify({
|
||
'code': 400,
|
||
'msg': '未上传音频文件'
|
||
}), 400
|
||
|
||
audio_file = request.files['audio']
|
||
reference_text = request.form.get('referenceText', '')
|
||
|
||
# 保存临时文件
|
||
temp_dir = './temp_audio'
|
||
if not os.path.exists(temp_dir):
|
||
os.makedirs(temp_dir)
|
||
|
||
import time
|
||
timestamp = str(int(time.time() * 1000))
|
||
temp_path = os.path.join(temp_dir, f'audio_{timestamp}.wav')
|
||
audio_file.save(temp_path)
|
||
|
||
print(f"收到音频: {temp_path}")
|
||
print(f"参考文本: {reference_text}")
|
||
|
||
# 识别音频
|
||
recognized_text, error = recognize_audio(temp_path)
|
||
|
||
# 删除临时文件
|
||
try:
|
||
os.remove(temp_path)
|
||
except:
|
||
pass
|
||
|
||
if error:
|
||
return jsonify({
|
||
'code': 500,
|
||
'msg': f'识别失败: {error}'
|
||
}), 500
|
||
|
||
if not recognized_text:
|
||
return jsonify({
|
||
'code': 500,
|
||
'msg': '未识别到有效语音'
|
||
}), 500
|
||
|
||
# 计算评分
|
||
score = calculate_similarity(recognized_text, reference_text)
|
||
pronunciation_score = max(0, score - 5)
|
||
fluency_score = max(0, score - 3)
|
||
|
||
print(f"识别结果: {recognized_text}")
|
||
print(f"相似度: {score}分")
|
||
|
||
return jsonify({
|
||
'code': 200,
|
||
'msg': '成功',
|
||
'data': {
|
||
'recognizedText': recognized_text,
|
||
'score': score,
|
||
'pronunciationScore': pronunciation_score,
|
||
'fluencyScore': fluency_score,
|
||
'status': 'completed'
|
||
}
|
||
})
|
||
|
||
except Exception as e:
|
||
print(f"处理错误: {str(e)}")
|
||
return jsonify({
|
||
'code': 500,
|
||
'msg': f'处理失败: {str(e)}'
|
||
}), 500
|
||
|
||
@app.route('/api/speech/health', methods=['GET'])
|
||
def health():
|
||
"""健康检查"""
|
||
return jsonify({
|
||
'code': 200,
|
||
'msg': '服务正常',
|
||
'data': {
|
||
'model_loaded': model_loaded,
|
||
'engine': 'vosk'
|
||
}
|
||
})
|
||
|
||
if __name__ == '__main__':
|
||
print("=" * 50)
|
||
print("Vosk 语音识别服务")
|
||
print("=" * 50)
|
||
print("")
|
||
|
||
# 初始化模型
|
||
if init_vosk_model():
|
||
print("")
|
||
print("=" * 50)
|
||
print("服务启动成功!")
|
||
print("访问地址: http://localhost:5000")
|
||
print("健康检查: http://localhost:5000/api/speech/health")
|
||
print("=" * 50)
|
||
print("")
|
||
|
||
# 启动服务
|
||
app.run(host='0.0.0.0', port=5000, debug=False)
|
||
else:
|
||
print("")
|
||
print("=" * 50)
|
||
print("服务启动失败!请检查模型文件")
|
||
print("=" * 50)
|
||
input("按回车键退出...")
|