399 lines
12 KiB
Python
399 lines
12 KiB
Python
"""
|
|
音乐库路由
|
|
"""
|
|
from typing import List, Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form
|
|
from sqlalchemy.orm import Session
|
|
from pydantic import BaseModel, Field
|
|
from datetime import datetime
|
|
import os
|
|
import uuid
|
|
|
|
from lover.db import get_db
|
|
from lover.deps import get_current_user
|
|
from lover.models import User, MusicLibrary, MusicLike
|
|
from lover.response import success_response, error_response, ApiResponse
|
|
from lover.config import settings
|
|
|
|
router = APIRouter(prefix="/music", tags=["音乐库"])
|
|
|
|
|
|
# ========== Pydantic 模型 ==========
|
|
class MusicOut(BaseModel):
|
|
id: int
|
|
user_id: int
|
|
username: str = ""
|
|
user_avatar: str = ""
|
|
title: str
|
|
artist: Optional[str] = None
|
|
music_url: str
|
|
cover_url: Optional[str] = None
|
|
duration: Optional[int] = None
|
|
upload_type: str
|
|
play_count: int = 0
|
|
like_count: int = 0
|
|
is_liked: bool = False
|
|
created_at: datetime
|
|
|
|
class Config:
|
|
from_attributes = True
|
|
|
|
|
|
class MusicListResponse(BaseModel):
|
|
total: int
|
|
list: List[MusicOut]
|
|
|
|
|
|
class MusicAddLinkRequest(BaseModel):
|
|
title: str = Field(..., min_length=1, max_length=255, description="歌曲标题")
|
|
artist: Optional[str] = Field(None, max_length=255, description="艺术家")
|
|
music_url: str = Field(..., min_length=1, max_length=500, description="音乐链接")
|
|
cover_url: Optional[str] = Field(None, max_length=500, description="封面图链接")
|
|
duration: Optional[int] = Field(None, description="时长(秒)")
|
|
|
|
|
|
# ========== 辅助函数 ==========
|
|
def _cdnize(url: str) -> str:
|
|
"""将相对路径转换为 CDN 完整路径"""
|
|
if not url:
|
|
return ""
|
|
if url.startswith("http://") or url.startswith("https://"):
|
|
return url
|
|
cdn_domain = getattr(settings, "CDN_DOMAIN", "")
|
|
if cdn_domain:
|
|
return f"{cdn_domain.rstrip('/')}/{url.lstrip('/')}"
|
|
return url
|
|
|
|
|
|
def _save_upload_file(file: UploadFile, folder: str = "music") -> str:
|
|
"""保存上传的文件"""
|
|
# 生成唯一文件名
|
|
ext = os.path.splitext(file.filename)[1]
|
|
filename = f"{uuid.uuid4().hex}{ext}"
|
|
|
|
# 创建保存路径
|
|
upload_dir = os.path.join("public", folder)
|
|
os.makedirs(upload_dir, exist_ok=True)
|
|
|
|
file_path = os.path.join(upload_dir, filename)
|
|
|
|
# 保存文件
|
|
with open(file_path, "wb") as f:
|
|
f.write(file.file.read())
|
|
|
|
# 返回相对路径
|
|
return f"{folder}/{filename}"
|
|
|
|
|
|
# ========== API 路由 ==========
|
|
@router.get("/library", response_model=ApiResponse[MusicListResponse])
|
|
def get_music_library(
|
|
page: int = 1,
|
|
page_size: int = 20,
|
|
keyword: Optional[str] = None,
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
获取音乐库列表(所有公开的音乐)
|
|
"""
|
|
query = db.query(MusicLibrary).filter(
|
|
MusicLibrary.deleted_at.is_(None),
|
|
MusicLibrary.is_public == 1,
|
|
MusicLibrary.status == "approved"
|
|
)
|
|
|
|
# 关键词搜索
|
|
if keyword:
|
|
query = query.filter(
|
|
(MusicLibrary.title.like(f"%{keyword}%")) |
|
|
(MusicLibrary.artist.like(f"%{keyword}%"))
|
|
)
|
|
|
|
# 总数
|
|
total = query.count()
|
|
|
|
# 分页
|
|
offset = (page - 1) * page_size
|
|
music_list = query.order_by(MusicLibrary.created_at.desc()).offset(offset).limit(page_size).all()
|
|
|
|
# 获取用户点赞信息
|
|
liked_music_ids = set()
|
|
if user:
|
|
likes = db.query(MusicLike.music_id).filter(MusicLike.user_id == user.id).all()
|
|
liked_music_ids = {like.music_id for like in likes}
|
|
|
|
# 获取上传用户信息
|
|
user_ids = [m.user_id for m in music_list]
|
|
users_map = {}
|
|
if user_ids:
|
|
users = db.query(User).filter(User.id.in_(user_ids)).all()
|
|
users_map = {u.id: u for u in users}
|
|
|
|
# 构建返回数据
|
|
result = []
|
|
for music in music_list:
|
|
uploader = users_map.get(music.user_id)
|
|
result.append(MusicOut(
|
|
id=music.id,
|
|
user_id=music.user_id,
|
|
username=uploader.username if uploader else "未知用户",
|
|
user_avatar=_cdnize(uploader.avatar) if uploader and uploader.avatar else "",
|
|
title=music.title,
|
|
artist=music.artist,
|
|
music_url=_cdnize(music.music_url),
|
|
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
|
|
duration=music.duration,
|
|
upload_type=music.upload_type,
|
|
play_count=music.play_count,
|
|
like_count=music.like_count,
|
|
is_liked=music.id in liked_music_ids,
|
|
created_at=music.created_at
|
|
))
|
|
|
|
return success_response(MusicListResponse(total=total, list=result))
|
|
|
|
|
|
@router.post("/add-link", response_model=ApiResponse[MusicOut])
|
|
def add_music_link(
|
|
data: MusicAddLinkRequest,
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
添加音乐链接到音乐库
|
|
"""
|
|
# 创建音乐记录
|
|
music = MusicLibrary(
|
|
user_id=user.id,
|
|
title=data.title,
|
|
artist=data.artist,
|
|
music_url=data.music_url,
|
|
cover_url=data.cover_url,
|
|
duration=data.duration,
|
|
upload_type="link",
|
|
is_public=1,
|
|
status="approved"
|
|
)
|
|
|
|
db.add(music)
|
|
db.commit()
|
|
db.refresh(music)
|
|
|
|
return success_response(MusicOut(
|
|
id=music.id,
|
|
user_id=music.user_id,
|
|
username=user.username,
|
|
user_avatar=_cdnize(user.avatar) if user.avatar else "",
|
|
title=music.title,
|
|
artist=music.artist,
|
|
music_url=_cdnize(music.music_url),
|
|
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
|
|
duration=music.duration,
|
|
upload_type=music.upload_type,
|
|
play_count=music.play_count,
|
|
like_count=music.like_count,
|
|
is_liked=False,
|
|
created_at=music.created_at
|
|
))
|
|
|
|
|
|
@router.post("/upload", response_model=ApiResponse[MusicOut])
|
|
async def upload_music_file(
|
|
title: str = Form(...),
|
|
artist: Optional[str] = Form(None),
|
|
duration: Optional[int] = Form(None),
|
|
music_file: UploadFile = File(...),
|
|
cover_file: Optional[UploadFile] = File(None),
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
上传音乐文件到音乐库
|
|
"""
|
|
# 检查文件类型
|
|
allowed_audio = [".mp3", ".wav", ".m4a", ".flac", ".ogg"]
|
|
allowed_image = [".jpg", ".jpeg", ".png", ".gif", ".webp"]
|
|
|
|
music_ext = os.path.splitext(music_file.filename)[1].lower()
|
|
if music_ext not in allowed_audio:
|
|
return error_response("不支持的音频格式")
|
|
|
|
# 保存音乐文件
|
|
music_path = _save_upload_file(music_file, "music")
|
|
|
|
# 保存封面文件
|
|
cover_path = None
|
|
if cover_file:
|
|
cover_ext = os.path.splitext(cover_file.filename)[1].lower()
|
|
if cover_ext in allowed_image:
|
|
cover_path = _save_upload_file(cover_file, "music/covers")
|
|
|
|
# 创建音乐记录
|
|
music = MusicLibrary(
|
|
user_id=user.id,
|
|
title=title,
|
|
artist=artist,
|
|
music_url=music_path,
|
|
cover_url=cover_path,
|
|
duration=duration,
|
|
upload_type="file",
|
|
is_public=1,
|
|
status="approved"
|
|
)
|
|
|
|
db.add(music)
|
|
db.commit()
|
|
db.refresh(music)
|
|
|
|
return success_response(MusicOut(
|
|
id=music.id,
|
|
user_id=music.user_id,
|
|
username=user.username,
|
|
user_avatar=_cdnize(user.avatar) if user.avatar else "",
|
|
title=music.title,
|
|
artist=music.artist,
|
|
music_url=_cdnize(music.music_url),
|
|
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
|
|
duration=music.duration,
|
|
upload_type=music.upload_type,
|
|
play_count=music.play_count,
|
|
like_count=music.like_count,
|
|
is_liked=False,
|
|
created_at=music.created_at
|
|
))
|
|
|
|
|
|
@router.post("/{music_id}/like", response_model=ApiResponse[dict])
|
|
def like_music(
|
|
music_id: int,
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
点赞音乐
|
|
"""
|
|
# 检查音乐是否存在
|
|
music = db.query(MusicLibrary).filter(
|
|
MusicLibrary.id == music_id,
|
|
MusicLibrary.deleted_at.is_(None)
|
|
).first()
|
|
|
|
if not music:
|
|
return error_response("音乐不存在")
|
|
|
|
# 检查是否已点赞
|
|
existing_like = db.query(MusicLike).filter(
|
|
MusicLike.user_id == user.id,
|
|
MusicLike.music_id == music_id
|
|
).first()
|
|
|
|
if existing_like:
|
|
# 取消点赞
|
|
db.delete(existing_like)
|
|
music.like_count = max(0, music.like_count - 1)
|
|
db.commit()
|
|
return success_response({"is_liked": False, "like_count": music.like_count})
|
|
else:
|
|
# 添加点赞
|
|
like = MusicLike(user_id=user.id, music_id=music_id)
|
|
db.add(like)
|
|
music.like_count += 1
|
|
db.commit()
|
|
return success_response({"is_liked": True, "like_count": music.like_count})
|
|
|
|
|
|
@router.post("/{music_id}/play", response_model=ApiResponse[dict])
|
|
def record_play(
|
|
music_id: int,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
记录播放次数
|
|
"""
|
|
music = db.query(MusicLibrary).filter(
|
|
MusicLibrary.id == music_id,
|
|
MusicLibrary.deleted_at.is_(None)
|
|
).first()
|
|
|
|
if not music:
|
|
return error_response("音乐不存在")
|
|
|
|
music.play_count += 1
|
|
db.commit()
|
|
|
|
return success_response({"play_count": music.play_count})
|
|
|
|
|
|
@router.delete("/{music_id}", response_model=ApiResponse[dict])
|
|
def delete_music(
|
|
music_id: int,
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
删除音乐(仅限上传者)
|
|
"""
|
|
music = db.query(MusicLibrary).filter(
|
|
MusicLibrary.id == music_id,
|
|
MusicLibrary.deleted_at.is_(None)
|
|
).first()
|
|
|
|
if not music:
|
|
return error_response("音乐不存在")
|
|
|
|
if music.user_id != user.id:
|
|
return error_response("无权删除此音乐")
|
|
|
|
music.deleted_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
return success_response({"message": "删除成功"})
|
|
|
|
|
|
@router.get("/my", response_model=ApiResponse[MusicListResponse])
|
|
def get_my_music(
|
|
page: int = 1,
|
|
page_size: int = 20,
|
|
user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
获取我上传的音乐
|
|
"""
|
|
query = db.query(MusicLibrary).filter(
|
|
MusicLibrary.user_id == user.id,
|
|
MusicLibrary.deleted_at.is_(None)
|
|
)
|
|
|
|
total = query.count()
|
|
|
|
offset = (page - 1) * page_size
|
|
music_list = query.order_by(MusicLibrary.created_at.desc()).offset(offset).limit(page_size).all()
|
|
|
|
# 获取点赞信息
|
|
liked_music_ids = set()
|
|
likes = db.query(MusicLike.music_id).filter(MusicLike.user_id == user.id).all()
|
|
liked_music_ids = {like.music_id for like in likes}
|
|
|
|
result = []
|
|
for music in music_list:
|
|
result.append(MusicOut(
|
|
id=music.id,
|
|
user_id=music.user_id,
|
|
username=user.username,
|
|
user_avatar=_cdnize(user.avatar) if user.avatar else "",
|
|
title=music.title,
|
|
artist=music.artist,
|
|
music_url=_cdnize(music.music_url),
|
|
cover_url=_cdnize(music.cover_url) if music.cover_url else "",
|
|
duration=music.duration,
|
|
upload_type=music.upload_type,
|
|
play_count=music.play_count,
|
|
like_count=music.like_count,
|
|
is_liked=music.id in liked_music_ids,
|
|
created_at=music.created_at
|
|
))
|
|
|
|
return success_response(MusicListResponse(total=total, list=result))
|