Ai_GirlFriend/lover/oss_utils.py

153 lines
5.3 KiB
Python
Raw Normal View History

2026-03-03 19:06:01 +08:00
"""
阿里云 OSS 上传工具
"""
import os
import uuid
from typing import Optional
import oss2
from .config import settings
import logging
logger = logging.getLogger(__name__)
def get_oss_bucket():
"""获取 OSS bucket 实例"""
if not all([
settings.ALIYUN_OSS_ACCESS_KEY_ID,
settings.ALIYUN_OSS_ACCESS_KEY_SECRET,
settings.ALIYUN_OSS_BUCKET_NAME,
settings.ALIYUN_OSS_ENDPOINT
]):
raise ValueError("OSS 配置不完整")
auth = oss2.Auth(
settings.ALIYUN_OSS_ACCESS_KEY_ID,
settings.ALIYUN_OSS_ACCESS_KEY_SECRET
)
bucket = oss2.Bucket(
auth,
settings.ALIYUN_OSS_ENDPOINT,
settings.ALIYUN_OSS_BUCKET_NAME
)
return bucket
def test_oss_connection() -> bool:
"""测试 OSS 连接是否正常"""
try:
logger.info(f"测试 OSS 连接...")
logger.info(f"Bucket: {settings.ALIYUN_OSS_BUCKET_NAME}")
logger.info(f"Endpoint: {settings.ALIYUN_OSS_ENDPOINT}")
logger.info(f"AccessKeyId: {settings.ALIYUN_OSS_ACCESS_KEY_ID[:8]}***")
bucket = get_oss_bucket()
# 尝试列出 bucket 中的对象限制1个
result = bucket.list_objects(max_keys=1)
logger.info(f"OSS 连接测试成功bucket: {settings.ALIYUN_OSS_BUCKET_NAME}")
return True
except Exception as e:
logger.error(f"OSS 连接测试失败: {e}")
logger.error(f"错误类型: {type(e)}")
# 检查是否是权限问题
error_str = str(e)
if "AccessDenied" in error_str:
logger.error("权限被拒绝 - 可能的原因:")
logger.error("1. AccessKey 没有该 Bucket 的访问权限")
logger.error("2. Bucket 不存在或属于其他账户")
logger.error("3. AccessKey 已过期或被禁用")
elif "NoSuchBucket" in error_str:
logger.error("Bucket 不存在 - 请检查 Bucket 名称是否正确")
elif "InvalidAccessKeyId" in error_str:
logger.error("AccessKey 无效 - 请检查 AccessKey 是否正确")
elif "SignatureDoesNotMatch" in error_str:
logger.error("签名不匹配 - 请检查 AccessKeySecret 是否正确")
return False
def upload_audio_file(audio_data: bytes, file_extension: str = "wav") -> str:
"""
上传音频文件到 OSS
Args:
audio_data: 音频二进制数据
file_extension: 文件扩展名不含点
Returns:
公网可访问的文件 URL
"""
try:
bucket = get_oss_bucket()
# 生成唯一文件名
file_id = str(uuid.uuid4())
object_key = f"voice_call/{file_id}.{file_extension}"
# 上传文件
result = bucket.put_object(object_key, audio_data)
if result.status == 200:
# 构建公网访问 URL
if settings.ALIYUN_OSS_CDN_DOMAIN:
# 使用 CDN 域名
2026-03-05 13:34:40 +08:00
cdn_domain = settings.ALIYUN_OSS_CDN_DOMAIN.rstrip('/')
# 确保 CDN 域名包含协议
if not cdn_domain.startswith('http://') and not cdn_domain.startswith('https://'):
cdn_domain = f"https://{cdn_domain}"
file_url = f"{cdn_domain}/{object_key}"
2026-03-03 19:06:01 +08:00
else:
# 使用默认域名 - 修复 URL 格式
endpoint_clean = settings.ALIYUN_OSS_ENDPOINT.replace('https://', '').replace('http://', '').rstrip('/')
file_url = f"https://{settings.ALIYUN_OSS_BUCKET_NAME}.{endpoint_clean}/{object_key}"
logger.info(f"文件上传成功: {object_key} -> {file_url}")
# 验证 URL 格式
2026-03-05 13:34:40 +08:00
if not file_url.startswith('http://') and not file_url.startswith('https://'):
2026-03-03 19:06:01 +08:00
logger.error(f"URL 格式错误: {file_url}")
raise Exception(f"生成的 URL 格式不正确: {file_url}")
return file_url
else:
raise Exception(f"上传失败,状态码: {result.status}")
except Exception as e:
logger.error(f"OSS 上传失败: {e}")
raise
def delete_audio_file(file_url: str) -> bool:
"""
删除 OSS 上的音频文件
Args:
file_url: 文件的公网 URL
Returns:
是否删除成功
"""
try:
bucket = get_oss_bucket()
# 从 URL 提取 object_key
if settings.ALIYUN_OSS_CDN_DOMAIN and file_url.startswith(settings.ALIYUN_OSS_CDN_DOMAIN):
object_key = file_url.replace(settings.ALIYUN_OSS_CDN_DOMAIN.rstrip('/') + '/', '')
else:
# 从默认域名提取
domain_prefix = f"https://{settings.ALIYUN_OSS_BUCKET_NAME}.{settings.ALIYUN_OSS_ENDPOINT.replace('https://', '')}/"
if file_url.startswith(domain_prefix):
object_key = file_url.replace(domain_prefix, '')
else:
logger.warning(f"无法解析文件 URL: {file_url}")
return False
# 删除文件
result = bucket.delete_object(object_key)
logger.info(f"文件删除成功: {object_key}")
return True
except Exception as e:
logger.error(f"OSS 删除失败: {e}")
return False