#!/usr/bin/env python3 """ OSS 连接测试脚本 用于诊断 OSS 配置问题 """ import os import sys from dotenv import load_dotenv # 加载环境变量 load_dotenv() def test_oss_basic(): """基础 OSS 连接测试""" try: import oss2 # 从环境变量读取配置 access_key_id = os.getenv('ALIYUN_OSS_ACCESS_KEY_ID') access_key_secret = os.getenv('ALIYUN_OSS_ACCESS_KEY_SECRET') bucket_name = os.getenv('ALIYUN_OSS_BUCKET_NAME') endpoint = os.getenv('ALIYUN_OSS_ENDPOINT') print(f"🔧 OSS 配置信息:") print(f" AccessKeyId: {access_key_id[:8]}***") print(f" Bucket: {bucket_name}") print(f" Endpoint: {endpoint}") if not all([access_key_id, access_key_secret, bucket_name, endpoint]): print("❌ OSS 配置不完整") return False # 创建认证对象 auth = oss2.Auth(access_key_id, access_key_secret) # 创建 Bucket 对象 bucket = oss2.Bucket(auth, endpoint, bucket_name) print(f"🔍 测试 Bucket 访问权限...") # 测试1: 列出对象 try: result = bucket.list_objects(max_keys=1) print(f"✅ 列出对象成功") except Exception as e: print(f"❌ 列出对象失败: {e}") return False # 测试2: 上传小文件 try: test_content = b"test content for voice call" test_key = "voice_call/test.txt" result = bucket.put_object(test_key, test_content) if result.status == 200: print(f"✅ 上传测试文件成功: {test_key}") # 测试3: 删除测试文件 bucket.delete_object(test_key) print(f"✅ 删除测试文件成功") return True else: print(f"❌ 上传失败,状态码: {result.status}") return False except Exception as e: print(f"❌ 上传测试失败: {e}") return False except ImportError: print("❌ oss2 模块未安装,请运行: pip install oss2") return False except Exception as e: print(f"❌ OSS 测试失败: {e}") return False def main(): print("🚀 开始 OSS 连接测试...") if test_oss_basic(): print("🎉 OSS 连接测试通过!") return 0 else: print("💥 OSS 连接测试失败!") print("\n🔧 可能的解决方案:") print("1. 检查 AccessKey 是否有效") print("2. 检查 Bucket 名称是否正确") print("3. 检查 Endpoint 区域是否匹配") print("4. 检查 AccessKey 是否有该 Bucket 的读写权限") return 1 if __name__ == "__main__": sys.exit(main())