Ai_GirlFriend/lover/routers/music_library.py

555 lines
17 KiB
Python
Raw Normal View History

"""
音乐库路由
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
from sqlalchemy.orm import Session
from pydantic import BaseModel, Field
from datetime import datetime
import os
import uuid
import hashlib
import time
from lover.db import get_db
from lover.deps import get_current_user
from lover.models import User, MusicLibrary, MusicLike, SongLibrary, Lover
from lover.response import success_response, error_response, ApiResponse
from lover.config import settings
router = APIRouter(prefix="/music", tags=["音乐库"])
# ========== Pydantic 模型 ==========
class MusicOut(BaseModel):
id: int
user_id: int
username: str = ""
user_avatar: str = ""
title: str
artist: Optional[str] = None
music_url: str
cover_url: Optional[str] = None
duration: Optional[int] = None
upload_type: str
external_platform: Optional[str] = None
external_id: Optional[str] = None
external_url: Optional[str] = None
play_count: int = 0
like_count: int = 0
is_liked: bool = False
created_at: datetime
class Config:
from_attributes = True
class MusicListResponse(BaseModel):
total: int
list: List[MusicOut]
class MusicAddLinkRequest(BaseModel):
title: str = Field(..., min_length=1, max_length=255, description="歌曲标题")
artist: Optional[str] = Field(None, max_length=255, description="艺术家")
music_url: str = Field(..., min_length=1, max_length=500, description="音乐链接")
cover_url: Optional[str] = Field(None, max_length=500, description="封面图链接")
duration: Optional[int] = Field(None, description="时长(秒)")
class MusicAddExternalRequest(BaseModel):
title: str = Field(..., min_length=1, max_length=255, description="歌曲标题")
artist: Optional[str] = Field(None, max_length=255, description="艺术家")
platform: str = Field(..., description="外部平台: netease=网易云, qq=QQ音乐, kugou=酷狗, kuwo=酷我")
external_id: str = Field(..., min_length=1, max_length=100, description="外部平台歌曲ID")
external_url: str = Field(..., min_length=1, max_length=500, description="外部平台完整链接")
cover_url: Optional[str] = Field(None, max_length=500, description="封面图链接")
duration: Optional[int] = Field(None, description="时长(秒)")
# ========== 辅助函数 ==========
def _cdnize(url: str) -> str:
"""将相对路径转换为 CDN 完整路径"""
if not url:
return ""
if url.startswith("http://") or url.startswith("https://"):
return url
cdn_domain = getattr(settings, "CDN_DOMAIN", "")
if cdn_domain:
return f"{cdn_domain.rstrip('/')}/{url.lstrip('/')}"
return url
def _save_upload_file(file: UploadFile, folder: str = "music") -> str:
"""保存上传的文件"""
# 生成唯一文件名
ext = os.path.splitext(file.filename)[1]
filename = f"{uuid.uuid4().hex}{ext}"
# 创建保存路径
upload_dir = os.path.join("public", folder)
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, filename)
# 保存文件
with open(file_path, "wb") as f:
f.write(file.file.read())
# 返回相对路径
return f"{folder}/{filename}"
# ========== API 路由 ==========
@router.get("/library", response_model=ApiResponse[MusicListResponse])
def get_music_library(
page: int = 1,
page_size: int = 20,
keyword: Optional[str] = None,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
获取音乐库列表所有公开的音乐
"""
query = db.query(MusicLibrary).filter(
MusicLibrary.deleted_at.is_(None),
MusicLibrary.is_public == 1,
MusicLibrary.status == "approved"
)
# 关键词搜索
if keyword:
query = query.filter(
(MusicLibrary.title.like(f"%{keyword}%")) |
(MusicLibrary.artist.like(f"%{keyword}%"))
)
# 总数
total = query.count()
# 分页
offset = (page - 1) * page_size
music_list = query.order_by(MusicLibrary.created_at.desc()).offset(offset).limit(page_size).all()
# 获取用户点赞信息
liked_music_ids = set()
if user:
likes = db.query(MusicLike.music_id).filter(MusicLike.user_id == user.id).all()
liked_music_ids = {like.music_id for like in likes}
# 获取上传用户信息
user_ids = [m.user_id for m in music_list]
users_map = {}
if user_ids:
users = db.query(User).filter(User.id.in_(user_ids)).all()
users_map = {u.id: u for u in users}
# 构建返回数据
result = []
for music in music_list:
uploader = users_map.get(music.user_id)
result.append(MusicOut(
id=music.id,
user_id=music.user_id,
username=uploader.username if uploader else "未知用户",
user_avatar=_cdnize(uploader.avatar) if uploader and uploader.avatar else "",
title=music.title,
artist=music.artist,
music_url=_cdnize(music.music_url) if music.upload_type != 'external' else music.music_url,
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
duration=music.duration,
upload_type=music.upload_type,
external_platform=music.external_platform,
external_id=music.external_id,
external_url=music.external_url,
play_count=music.play_count,
like_count=music.like_count,
is_liked=music.id in liked_music_ids,
created_at=music.created_at
))
return success_response(MusicListResponse(total=total, list=result))
@router.post("/add-link", response_model=ApiResponse[MusicOut])
def add_music_link(
data: MusicAddLinkRequest,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
添加音乐链接到音乐库
"""
# 创建音乐记录
music = MusicLibrary(
user_id=user.id,
title=data.title,
artist=data.artist,
music_url=data.music_url,
cover_url=data.cover_url,
duration=data.duration,
upload_type="link",
is_public=1,
status="approved"
)
db.add(music)
db.commit()
db.refresh(music)
return success_response(MusicOut(
id=music.id,
user_id=music.user_id,
username=user.username,
user_avatar=_cdnize(user.avatar) if user.avatar else "",
title=music.title,
artist=music.artist,
music_url=_cdnize(music.music_url),
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
duration=music.duration,
upload_type=music.upload_type,
external_platform=None,
external_id=None,
external_url=None,
play_count=music.play_count,
like_count=music.like_count,
is_liked=False,
created_at=music.created_at
))
@router.post("/add-external", response_model=ApiResponse[MusicOut])
def add_external_music(
data: MusicAddExternalRequest,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
添加外部平台音乐链接网易云QQ音乐等
"""
# 验证平台
allowed_platforms = ["netease", "qq", "kugou", "kuwo"]
if data.platform not in allowed_platforms:
return error_response(f"不支持的平台,仅支持: {', '.join(allowed_platforms)}")
# 创建音乐记录
music = MusicLibrary(
user_id=user.id,
title=data.title,
artist=data.artist,
music_url=data.external_url,
cover_url=data.cover_url,
duration=data.duration,
upload_type="external",
external_platform=data.platform,
external_id=data.external_id,
external_url=data.external_url,
is_public=1,
status="approved"
)
db.add(music)
db.commit()
db.refresh(music)
return success_response(MusicOut(
id=music.id,
user_id=music.user_id,
username=user.username,
user_avatar=_cdnize(user.avatar) if user.avatar else "",
title=music.title,
artist=music.artist,
music_url=music.music_url,
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
duration=music.duration,
upload_type=music.upload_type,
external_platform=music.external_platform,
external_id=music.external_id,
external_url=music.external_url,
play_count=music.play_count,
like_count=music.like_count,
is_liked=False,
created_at=music.created_at
))
@router.post("/upload", response_model=ApiResponse[MusicOut])
async def upload_music_file(
title: str = Form(...),
artist: Optional[str] = Form(None),
duration: Optional[int] = Form(None),
music_file: UploadFile = File(...),
cover_file: Optional[UploadFile] = File(None),
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
上传音乐文件到音乐库
"""
# 检查文件类型
allowed_audio = [".mp3", ".wav", ".m4a", ".flac", ".ogg"]
allowed_image = [".jpg", ".jpeg", ".png", ".gif", ".webp"]
music_ext = os.path.splitext(music_file.filename)[1].lower()
if music_ext not in allowed_audio:
return error_response("不支持的音频格式")
# 保存音乐文件
music_path = _save_upload_file(music_file, "music")
# 保存封面文件
cover_path = None
if cover_file:
cover_ext = os.path.splitext(cover_file.filename)[1].lower()
if cover_ext in allowed_image:
cover_path = _save_upload_file(cover_file, "music/covers")
# 创建音乐记录
music = MusicLibrary(
user_id=user.id,
title=title,
artist=artist,
music_url=music_path,
cover_url=cover_path,
duration=duration,
upload_type="file",
is_public=1,
status="approved"
)
db.add(music)
db.commit()
db.refresh(music)
return success_response(MusicOut(
id=music.id,
user_id=music.user_id,
username=user.username,
user_avatar=_cdnize(user.avatar) if user.avatar else "",
title=music.title,
artist=music.artist,
music_url=_cdnize(music.music_url),
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
duration=music.duration,
upload_type=music.upload_type,
external_platform=None,
external_id=None,
external_url=None,
play_count=music.play_count,
like_count=music.like_count,
is_liked=False,
created_at=music.created_at
))
@router.post("/{music_id}/like", response_model=ApiResponse[dict])
def like_music(
music_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
点赞音乐
"""
# 检查音乐是否存在
music = db.query(MusicLibrary).filter(
MusicLibrary.id == music_id,
MusicLibrary.deleted_at.is_(None)
).first()
if not music:
return error_response("音乐不存在")
# 检查是否已点赞
existing_like = db.query(MusicLike).filter(
MusicLike.user_id == user.id,
MusicLike.music_id == music_id
).first()
if existing_like:
# 取消点赞
db.delete(existing_like)
music.like_count = max(0, music.like_count - 1)
db.commit()
return success_response({"is_liked": False, "like_count": music.like_count})
else:
# 添加点赞
like = MusicLike(user_id=user.id, music_id=music_id)
db.add(like)
music.like_count += 1
db.commit()
return success_response({"is_liked": True, "like_count": music.like_count})
@router.post("/{music_id}/play", response_model=ApiResponse[dict])
def record_play(
music_id: int,
db: Session = Depends(get_db),
):
"""
记录播放次数
"""
music = db.query(MusicLibrary).filter(
MusicLibrary.id == music_id,
MusicLibrary.deleted_at.is_(None)
).first()
if not music:
return error_response("音乐不存在")
music.play_count += 1
db.commit()
return success_response({"play_count": music.play_count})
@router.delete("/{music_id}", response_model=ApiResponse[dict])
def delete_music(
music_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
删除音乐仅限上传者
"""
music = db.query(MusicLibrary).filter(
MusicLibrary.id == music_id,
MusicLibrary.deleted_at.is_(None)
).first()
if not music:
return error_response("音乐不存在")
if music.user_id != user.id:
return error_response("无权删除此音乐")
music.deleted_at = datetime.utcnow()
db.commit()
return success_response({"message": "删除成功"})
@router.get("/my", response_model=ApiResponse[MusicListResponse])
def get_my_music(
page: int = 1,
page_size: int = 20,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
获取我上传的音乐
"""
query = db.query(MusicLibrary).filter(
MusicLibrary.user_id == user.id,
MusicLibrary.deleted_at.is_(None)
)
total = query.count()
offset = (page - 1) * page_size
music_list = query.order_by(MusicLibrary.created_at.desc()).offset(offset).limit(page_size).all()
# 获取点赞信息
liked_music_ids = set()
likes = db.query(MusicLike.music_id).filter(MusicLike.user_id == user.id).all()
liked_music_ids = {like.music_id for like in likes}
result = []
for music in music_list:
result.append(MusicOut(
id=music.id,
user_id=music.user_id,
username=user.username,
user_avatar=_cdnize(user.avatar) if user.avatar else "",
title=music.title,
artist=music.artist,
music_url=_cdnize(music.music_url) if music.upload_type != 'external' else music.music_url,
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
duration=music.duration,
upload_type=music.upload_type,
external_platform=music.external_platform,
external_id=music.external_id,
external_url=music.external_url,
play_count=music.play_count,
like_count=music.like_count,
is_liked=music.id in liked_music_ids,
created_at=music.created_at
))
return success_response(MusicListResponse(total=total, list=result))
@router.post("/convert-to-song", response_model=ApiResponse[dict])
def convert_music_to_song(
music_id: int,
user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""
将音乐库音乐转换为系统歌曲用于生成唱歌视频
"""
# 1. 检查音乐
music = db.query(MusicLibrary).filter(
MusicLibrary.id == music_id,
MusicLibrary.deleted_at.is_(None)
).first()
if not music:
return error_response("音乐不存在")
# 2. 检查音乐类型
if music.upload_type == 'external':
return error_response("外部平台音乐无法生成视频,请使用直链或上传的音乐")
# 3. 获取音乐 URL
music_url = _cdnize(music.music_url)
if not music_url:
return error_response("音乐地址不可用")
# 4. 检查是否已转换(避免重复)
audio_hash = hashlib.md5(music_url.encode()).hexdigest()
existing_song = db.query(SongLibrary).filter(
SongLibrary.audio_hash == audio_hash,
SongLibrary.deletetime.is_(None)
).first()
if existing_song:
return success_response({
"song_id": existing_song.id,
"title": existing_song.title,
"from_cache": True
})
# 5. 获取恋人性别
lover = db.query(Lover).filter(Lover.user_id == user.id).first()
if not lover:
return error_response("请先创建恋人")
# 6. 创建系统歌曲记录
now_ts = int(time.time())
song = SongLibrary(
title=music.title or "未命名",
artist=music.artist or "",
gender=lover.gender,
audio_url=music.music_url, # 存储原始路径(不含 CDN
status=True,
weigh=0,
createtime=now_ts,
updatetime=now_ts,
audio_hash=audio_hash,
duration_sec=music.duration
)
db.add(song)
db.commit()
db.refresh(song)
return success_response({
"song_id": song.id,
"title": song.title,
"from_cache": False
})