""" 音乐库路由 """ from typing import List, Optional from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form from sqlalchemy.orm import Session from pydantic import BaseModel, Field from datetime import datetime import os import uuid from lover.db import get_db from lover.deps import get_current_user from lover.models import User, MusicLibrary, MusicLike from lover.response import success_response, error_response, ApiResponse from lover.config import settings router = APIRouter(prefix="/music", tags=["音乐库"]) # ========== Pydantic 模型 ========== class MusicOut(BaseModel): id: int user_id: int username: str = "" user_avatar: str = "" title: str artist: Optional[str] = None music_url: str cover_url: Optional[str] = None duration: Optional[int] = None upload_type: str play_count: int = 0 like_count: int = 0 is_liked: bool = False created_at: datetime class Config: from_attributes = True class MusicListResponse(BaseModel): total: int list: List[MusicOut] class MusicAddLinkRequest(BaseModel): title: str = Field(..., min_length=1, max_length=255, description="歌曲标题") artist: Optional[str] = Field(None, max_length=255, description="艺术家") music_url: str = Field(..., min_length=1, max_length=500, description="音乐链接") cover_url: Optional[str] = Field(None, max_length=500, description="封面图链接") duration: Optional[int] = Field(None, description="时长(秒)") # ========== 辅助函数 ========== def _cdnize(url: str) -> str: """将相对路径转换为 CDN 完整路径""" if not url: return "" if url.startswith("http://") or url.startswith("https://"): return url cdn_domain = getattr(settings, "CDN_DOMAIN", "") if cdn_domain: return f"{cdn_domain.rstrip('/')}/{url.lstrip('/')}" return url def _save_upload_file(file: UploadFile, folder: str = "music") -> str: """保存上传的文件""" # 生成唯一文件名 ext = os.path.splitext(file.filename)[1] filename = f"{uuid.uuid4().hex}{ext}" # 创建保存路径 upload_dir = os.path.join("public", folder) os.makedirs(upload_dir, exist_ok=True) file_path = os.path.join(upload_dir, filename) # 保存文件 with open(file_path, "wb") as f: f.write(file.file.read()) # 返回相对路径 return f"{folder}/{filename}" # ========== API 路由 ========== @router.get("/library", response_model=ApiResponse[MusicListResponse]) def get_music_library( page: int = 1, page_size: int = 20, keyword: Optional[str] = None, user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ 获取音乐库列表(所有公开的音乐) """ query = db.query(MusicLibrary).filter( MusicLibrary.deleted_at.is_(None), MusicLibrary.is_public == 1, MusicLibrary.status == "approved" ) # 关键词搜索 if keyword: query = query.filter( (MusicLibrary.title.like(f"%{keyword}%")) | (MusicLibrary.artist.like(f"%{keyword}%")) ) # 总数 total = query.count() # 分页 offset = (page - 1) * page_size music_list = query.order_by(MusicLibrary.created_at.desc()).offset(offset).limit(page_size).all() # 获取用户点赞信息 liked_music_ids = set() if user: likes = db.query(MusicLike.music_id).filter(MusicLike.user_id == user.id).all() liked_music_ids = {like.music_id for like in likes} # 获取上传用户信息 user_ids = [m.user_id for m in music_list] users_map = {} if user_ids: users = db.query(User).filter(User.id.in_(user_ids)).all() users_map = {u.id: u for u in users} # 构建返回数据 result = [] for music in music_list: uploader = users_map.get(music.user_id) result.append(MusicOut( id=music.id, user_id=music.user_id, username=uploader.username if uploader else "未知用户", user_avatar=_cdnize(uploader.avatar) if uploader and uploader.avatar else "", title=music.title, artist=music.artist, music_url=_cdnize(music.music_url), cover_url=_cdnize(music.cover_url) if music.cover_url else "", duration=music.duration, upload_type=music.upload_type, play_count=music.play_count, like_count=music.like_count, is_liked=music.id in liked_music_ids, created_at=music.created_at )) return success_response(MusicListResponse(total=total, list=result)) @router.post("/add-link", response_model=ApiResponse[MusicOut]) def add_music_link( data: MusicAddLinkRequest, user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ 添加音乐链接到音乐库 """ # 创建音乐记录 music = MusicLibrary( user_id=user.id, title=data.title, artist=data.artist, music_url=data.music_url, cover_url=data.cover_url, duration=data.duration, upload_type="link", is_public=1, status="approved" ) db.add(music) db.commit() db.refresh(music) return success_response(MusicOut( id=music.id, user_id=music.user_id, username=user.username, user_avatar=_cdnize(user.avatar) if user.avatar else "", title=music.title, artist=music.artist, music_url=_cdnize(music.music_url), cover_url=_cdnize(music.cover_url) if music.cover_url else "", duration=music.duration, upload_type=music.upload_type, play_count=music.play_count, like_count=music.like_count, is_liked=False, created_at=music.created_at )) @router.post("/upload", response_model=ApiResponse[MusicOut]) async def upload_music_file( title: str = Form(...), artist: Optional[str] = Form(None), duration: Optional[int] = Form(None), music_file: UploadFile = File(...), cover_file: Optional[UploadFile] = File(None), user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ 上传音乐文件到音乐库 """ # 检查文件类型 allowed_audio = [".mp3", ".wav", ".m4a", ".flac", ".ogg"] allowed_image = [".jpg", ".jpeg", ".png", ".gif", ".webp"] music_ext = os.path.splitext(music_file.filename)[1].lower() if music_ext not in allowed_audio: return error_response("不支持的音频格式") # 保存音乐文件 music_path = _save_upload_file(music_file, "music") # 保存封面文件 cover_path = None if cover_file: cover_ext = os.path.splitext(cover_file.filename)[1].lower() if cover_ext in allowed_image: cover_path = _save_upload_file(cover_file, "music/covers") # 创建音乐记录 music = MusicLibrary( user_id=user.id, title=title, artist=artist, music_url=music_path, cover_url=cover_path, duration=duration, upload_type="file", is_public=1, status="approved" ) db.add(music) db.commit() db.refresh(music) return success_response(MusicOut( id=music.id, user_id=music.user_id, username=user.username, user_avatar=_cdnize(user.avatar) if user.avatar else "", title=music.title, artist=music.artist, music_url=_cdnize(music.music_url), cover_url=_cdnize(music.cover_url) if music.cover_url else "", duration=music.duration, upload_type=music.upload_type, play_count=music.play_count, like_count=music.like_count, is_liked=False, created_at=music.created_at )) @router.post("/{music_id}/like", response_model=ApiResponse[dict]) def like_music( music_id: int, user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ 点赞音乐 """ # 检查音乐是否存在 music = db.query(MusicLibrary).filter( MusicLibrary.id == music_id, MusicLibrary.deleted_at.is_(None) ).first() if not music: return error_response("音乐不存在") # 检查是否已点赞 existing_like = db.query(MusicLike).filter( MusicLike.user_id == user.id, MusicLike.music_id == music_id ).first() if existing_like: # 取消点赞 db.delete(existing_like) music.like_count = max(0, music.like_count - 1) db.commit() return success_response({"is_liked": False, "like_count": music.like_count}) else: # 添加点赞 like = MusicLike(user_id=user.id, music_id=music_id) db.add(like) music.like_count += 1 db.commit() return success_response({"is_liked": True, "like_count": music.like_count}) @router.post("/{music_id}/play", response_model=ApiResponse[dict]) def record_play( music_id: int, db: Session = Depends(get_db), ): """ 记录播放次数 """ music = db.query(MusicLibrary).filter( MusicLibrary.id == music_id, MusicLibrary.deleted_at.is_(None) ).first() if not music: return error_response("音乐不存在") music.play_count += 1 db.commit() return success_response({"play_count": music.play_count}) @router.delete("/{music_id}", response_model=ApiResponse[dict]) def delete_music( music_id: int, user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ 删除音乐(仅限上传者) """ music = db.query(MusicLibrary).filter( MusicLibrary.id == music_id, MusicLibrary.deleted_at.is_(None) ).first() if not music: return error_response("音乐不存在") if music.user_id != user.id: return error_response("无权删除此音乐") music.deleted_at = datetime.utcnow() db.commit() return success_response({"message": "删除成功"}) @router.get("/my", response_model=ApiResponse[MusicListResponse]) def get_my_music( page: int = 1, page_size: int = 20, user: User = Depends(get_current_user), db: Session = Depends(get_db), ): """ 获取我上传的音乐 """ query = db.query(MusicLibrary).filter( MusicLibrary.user_id == user.id, MusicLibrary.deleted_at.is_(None) ) total = query.count() offset = (page - 1) * page_size music_list = query.order_by(MusicLibrary.created_at.desc()).offset(offset).limit(page_size).all() # 获取点赞信息 liked_music_ids = set() likes = db.query(MusicLike.music_id).filter(MusicLike.user_id == user.id).all() liked_music_ids = {like.music_id for like in likes} result = [] for music in music_list: result.append(MusicOut( id=music.id, user_id=music.user_id, username=user.username, user_avatar=_cdnize(user.avatar) if user.avatar else "", title=music.title, artist=music.artist, music_url=_cdnize(music.music_url), cover_url=_cdnize(music.cover_url) if music.cover_url else "", duration=music.duration, upload_type=music.upload_type, play_count=music.play_count, like_count=music.like_count, is_liked=music.id in liked_music_ids, created_at=music.created_at )) return success_response(MusicListResponse(total=total, list=result))