Ai_GirlFriend/test_dashscope_asr_complete.py

239 lines
8.5 KiB
Python
Raw Permalink Normal View History

2026-03-05 13:34:40 +08:00
"""
完整测试 DashScope ASR 批量识别
按照官方文档要求测试
"""
import os
import sys
import time
import logging
# 添加 lover 目录到路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'lover'))
from lover.config import settings
from lover.oss_utils import upload_audio_file, delete_audio_file
import dashscope
from dashscope.audio.asr import Transcription
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def create_test_audio():
"""创建一个简单的测试音频PCM 格式)"""
# 生成 1 秒的 16kHz 单声道 PCM 数据
# 简单的正弦波
import struct
import math
sample_rate = 16000
duration = 2 # 2 秒
frequency = 440 # A4 音符
samples = []
for i in range(sample_rate * duration):
# 生成正弦波
value = int(32767 * 0.3 * math.sin(2 * math.pi * frequency * i / sample_rate))
samples.append(struct.pack('<h', value))
return b''.join(samples)
def test_dashscope_asr():
"""测试 DashScope ASR 完整流程"""
print("=" * 60)
print("测试 DashScope ASR 批量识别")
print("=" * 60)
# 检查配置
print(f"\n📋 检查配置:")
if not settings.DASHSCOPE_API_KEY:
print("❌ 未配置 DASHSCOPE_API_KEY")
return False
print(f" API Key: {settings.DASHSCOPE_API_KEY[:10]}***")
print(f" Bucket: {settings.ALIYUN_OSS_BUCKET_NAME}")
# 设置 API Key
dashscope.api_key = settings.DASHSCOPE_API_KEY
# 创建测试音频
print(f"\n🎵 创建测试音频...")
audio_data = create_test_audio()
print(f" 音频大小: {len(audio_data)} 字节")
print(f" 预期时长: 2 秒")
# 上传到 OSS
print(f"\n📤 上传音频到 OSS...")
try:
# 转换为 WAV 格式
import wave
import tempfile
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
with wave.open(temp_file.name, 'wb') as wav_file:
wav_file.setnchannels(1) # 单声道
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(16000) # 16kHz
wav_file.writeframes(audio_data)
temp_file_path = temp_file.name
# 读取 WAV 文件
with open(temp_file_path, 'rb') as f:
wav_data = f.read()
# 清理临时文件
os.unlink(temp_file_path)
file_url = upload_audio_file(wav_data, "wav")
print(f"✅ 上传成功")
print(f" URL: {file_url}")
# 验证 URL 格式
if not (file_url.startswith('https://') or file_url.startswith('http://')):
print(f"❌ URL 格式错误: {file_url}")
return False
except Exception as e:
print(f"❌ 上传失败: {e}")
import traceback
traceback.print_exc()
return False
# 调用 ASR
print(f"\n🎤 调用 DashScope ASR...")
try:
print(f" 模型: paraformer-v2")
print(f" 文件: {file_url}")
# 创建任务
task_response = Transcription.async_call(
model='paraformer-v2',
file_urls=[file_url],
parameters={
'format': 'wav',
'sample_rate': 16000,
'enable_words': False
}
)
print(f"\n📋 任务响应:")
print(f" 状态码: {task_response.status_code}")
if task_response.status_code != 200:
error_msg = getattr(task_response, 'message', 'Unknown error')
print(f"❌ 任务创建失败: {error_msg}")
# 打印详细错误信息
if hasattr(task_response, 'output'):
print(f" 错误详情: {task_response.output}")
return False
task_id = task_response.output.task_id
print(f"✅ 任务创建成功")
print(f" 任务 ID: {task_id}")
# 等待结果
print(f"\n⏳ 等待识别完成...")
max_wait = 30
start_time = time.time()
while time.time() - start_time < max_wait:
result = Transcription.wait(task=task_id)
if result.status_code == 200:
task_status = result.output.task_status
print(f" 任务状态: {task_status}")
if task_status == "SUCCEEDED":
print(f"\n✅ 识别成功")
# 解析结果
if hasattr(result.output, 'results') and result.output.results:
print(f"\n📝 识别结果:")
for i, item in enumerate(result.output.results):
if isinstance(item, dict) and 'transcription_url' in item:
transcription_url = item['transcription_url']
print(f" 转录 URL: {transcription_url}")
# 下载转录结果
import requests
resp = requests.get(transcription_url, timeout=10)
if resp.status_code == 200:
transcription_data = resp.json()
print(f" 转录数据: {transcription_data}")
if 'transcripts' in transcription_data:
for transcript in transcription_data['transcripts']:
if 'text' in transcript:
print(f" 识别文本: {transcript['text']}")
else:
print(f"⚠️ 未找到识别结果")
print(f" 输出: {result.output}")
break
elif task_status == "FAILED":
error_code = getattr(result.output, 'code', 'Unknown')
error_message = getattr(result.output, 'message', 'Unknown error')
print(f"\n❌ 识别失败")
print(f" 错误码: {error_code}")
print(f" 错误信息: {error_message}")
# 分析常见错误
if error_code == "SUCCESS_WITH_NO_VALID_FRAGMENT":
print(f"\n💡 原因分析:")
print(f" - 音频中未检测到有效语音")
print(f" - 这是正常的,因为我们使用的是测试音频(正弦波)")
print(f" - 使用真实语音录音应该可以正常识别")
elif error_code == "FILE_DOWNLOAD_FAILED":
print(f"\n💡 原因分析:")
print(f" - DashScope 无法下载 OSS 文件")
print(f" - 检查 OSS Bucket 权限设置")
print(f" - 确保文件 URL 可公开访问")
break
else:
# 继续等待
time.sleep(2)
else:
print(f"❌ 查询失败: {result.status_code}")
break
if time.time() - start_time >= max_wait:
print(f"\n⏰ 等待超时({max_wait}秒)")
return False
except Exception as e:
print(f"❌ ASR 调用失败: {e}")
import traceback
traceback.print_exc()
return False
finally:
# 清理 OSS 文件
print(f"\n🗑️ 清理 OSS 文件...")
try:
delete_audio_file(file_url)
print(f"✅ 文件已删除")
except Exception as e:
print(f"⚠️ 删除失败: {e}")
print(f"\n" + "=" * 60)
print("🎉 DashScope ASR 测试完成!")
print("=" * 60)
print(f"\n📚 官方文档要求总结:")
print(f" 1. ✅ 文件必须通过 HTTPS URL 访问")
print(f" 2. ✅ 支持的格式: WAV, MP3, PCM 等")
print(f" 3. ✅ 推荐采样率: 16kHz")
print(f" 4. ✅ 推荐声道: 单声道")
print(f" 5. ⚠️ 音频必须包含有效语音内容")
return True
if __name__ == "__main__":
success = test_dashscope_asr()
sys.exit(0 if success else 1)