102 lines
3.4 KiB
Python
102 lines
3.4 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
测试 OSS URL 是否可以公网访问
|
||
|
|
"""
|
||
|
|
|
||
|
|
import os
|
||
|
|
import requests
|
||
|
|
from dotenv import load_dotenv
|
||
|
|
|
||
|
|
# 加载环境变量
|
||
|
|
load_dotenv()
|
||
|
|
|
||
|
|
def test_oss_url_access():
|
||
|
|
"""测试 OSS URL 公网访问"""
|
||
|
|
try:
|
||
|
|
import oss2
|
||
|
|
|
||
|
|
access_key_id = os.getenv('ALIYUN_OSS_ACCESS_KEY_ID')
|
||
|
|
access_key_secret = os.getenv('ALIYUN_OSS_ACCESS_KEY_SECRET')
|
||
|
|
bucket_name = os.getenv('ALIYUN_OSS_BUCKET_NAME')
|
||
|
|
endpoint = os.getenv('ALIYUN_OSS_ENDPOINT')
|
||
|
|
|
||
|
|
print(f"🔧 测试 OSS URL 公网访问...")
|
||
|
|
|
||
|
|
# 创建认证和 bucket
|
||
|
|
auth = oss2.Auth(access_key_id, access_key_secret)
|
||
|
|
bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
||
|
|
|
||
|
|
# 上传测试文件
|
||
|
|
test_content = b"test audio content for ASR"
|
||
|
|
test_key = "voice_call/test_access.mp3"
|
||
|
|
|
||
|
|
print(f"📤 上传测试文件: {test_key}")
|
||
|
|
result = bucket.put_object(test_key, test_content)
|
||
|
|
|
||
|
|
if result.status != 200:
|
||
|
|
print(f"❌ 上传失败: {result.status}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
# 生成 URL
|
||
|
|
file_url = f"https://{bucket_name}.{endpoint.replace('https://', '')}/{test_key}"
|
||
|
|
print(f"🔗 生成的 URL: {file_url}")
|
||
|
|
|
||
|
|
# 测试公网访问
|
||
|
|
print(f"🌐 测试公网访问...")
|
||
|
|
try:
|
||
|
|
response = requests.get(file_url, timeout=10)
|
||
|
|
print(f"📊 HTTP 状态码: {response.status_code}")
|
||
|
|
|
||
|
|
if response.status_code == 200:
|
||
|
|
print(f"✅ 公网访问成功!")
|
||
|
|
print(f"📦 响应内容长度: {len(response.content)} 字节")
|
||
|
|
|
||
|
|
# 验证内容
|
||
|
|
if response.content == test_content:
|
||
|
|
print(f"✅ 内容验证通过")
|
||
|
|
else:
|
||
|
|
print(f"⚠️ 内容不匹配")
|
||
|
|
|
||
|
|
elif response.status_code == 403:
|
||
|
|
print(f"❌ 访问被拒绝 (403) - Bucket 可能设置为私有")
|
||
|
|
print(f"💡 需要设置 Bucket 为公共读权限")
|
||
|
|
|
||
|
|
else:
|
||
|
|
print(f"❌ 访问失败: HTTP {response.status_code}")
|
||
|
|
|
||
|
|
except requests.exceptions.Timeout:
|
||
|
|
print(f"❌ 请求超时")
|
||
|
|
except requests.exceptions.ConnectionError:
|
||
|
|
print(f"❌ 连接失败")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ 请求异常: {e}")
|
||
|
|
|
||
|
|
# 清理测试文件
|
||
|
|
try:
|
||
|
|
bucket.delete_object(test_key)
|
||
|
|
print(f"🗑️ 测试文件已清理")
|
||
|
|
except Exception as e:
|
||
|
|
print(f"⚠️ 清理失败: {e}")
|
||
|
|
|
||
|
|
return response.status_code == 200 if 'response' in locals() else False
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
print(f"❌ 测试失败: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
def main():
|
||
|
|
print("🚀 开始测试 OSS URL 公网访问...")
|
||
|
|
|
||
|
|
if test_oss_url_access():
|
||
|
|
print("🎉 OSS URL 公网访问正常!")
|
||
|
|
print("💡 问题可能在 DashScope ASR 的其他方面")
|
||
|
|
else:
|
||
|
|
print("💥 OSS URL 无法公网访问!")
|
||
|
|
print("\n🔧 解决方案:")
|
||
|
|
print("1. 登录阿里云 OSS 控制台")
|
||
|
|
print("2. 找到 Bucket: hello12312312")
|
||
|
|
print("3. 设置 Bucket 权限为 '公共读'")
|
||
|
|
print("4. 或者配置 Bucket 策略允许匿名访问")
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
main()
|