Ai_GirlFriend/test_oss.py

95 lines
2.9 KiB
Python
Raw Permalink Normal View History

2026-03-03 19:06:01 +08:00
#!/usr/bin/env python3
"""
OSS 连接测试脚本
用于诊断 OSS 配置问题
"""
import os
import sys
from dotenv import load_dotenv
# 加载环境变量
load_dotenv()
def test_oss_basic():
"""基础 OSS 连接测试"""
try:
import oss2
# 从环境变量读取配置
access_key_id = os.getenv('ALIYUN_OSS_ACCESS_KEY_ID')
access_key_secret = os.getenv('ALIYUN_OSS_ACCESS_KEY_SECRET')
bucket_name = os.getenv('ALIYUN_OSS_BUCKET_NAME')
endpoint = os.getenv('ALIYUN_OSS_ENDPOINT')
print(f"🔧 OSS 配置信息:")
print(f" AccessKeyId: {access_key_id[:8]}***")
print(f" Bucket: {bucket_name}")
print(f" Endpoint: {endpoint}")
if not all([access_key_id, access_key_secret, bucket_name, endpoint]):
print("❌ OSS 配置不完整")
return False
# 创建认证对象
auth = oss2.Auth(access_key_id, access_key_secret)
# 创建 Bucket 对象
bucket = oss2.Bucket(auth, endpoint, bucket_name)
print(f"🔍 测试 Bucket 访问权限...")
# 测试1: 列出对象
try:
result = bucket.list_objects(max_keys=1)
print(f"✅ 列出对象成功")
except Exception as e:
print(f"❌ 列出对象失败: {e}")
return False
# 测试2: 上传小文件
try:
test_content = b"test content for voice call"
test_key = "voice_call/test.txt"
result = bucket.put_object(test_key, test_content)
if result.status == 200:
print(f"✅ 上传测试文件成功: {test_key}")
# 测试3: 删除测试文件
bucket.delete_object(test_key)
print(f"✅ 删除测试文件成功")
return True
else:
print(f"❌ 上传失败,状态码: {result.status}")
return False
except Exception as e:
print(f"❌ 上传测试失败: {e}")
return False
except ImportError:
print("❌ oss2 模块未安装,请运行: pip install oss2")
return False
except Exception as e:
print(f"❌ OSS 测试失败: {e}")
return False
def main():
print("🚀 开始 OSS 连接测试...")
if test_oss_basic():
print("🎉 OSS 连接测试通过!")
return 0
else:
print("💥 OSS 连接测试失败!")
print("\n🔧 可能的解决方案:")
print("1. 检查 AccessKey 是否有效")
print("2. 检查 Bucket 名称是否正确")
print("3. 检查 Endpoint 区域是否匹配")
print("4. 检查 AccessKey 是否有该 Bucket 的读写权限")
return 1
if __name__ == "__main__":
sys.exit(main())