Ai_GirlFriend/lover/routers/dynamic.py

597 lines
18 KiB
Python
Raw Normal View History

2026-01-31 19:15:41 +08:00
import time
from collections import defaultdict, deque
from datetime import datetime
from typing import Dict, List, Optional, Tuple
import requests
from fastapi import APIRouter, Depends, HTTPException, Path, Query, status
from pydantic import BaseModel, ConfigDict, Field, field_validator
from sqlalchemy import or_
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from ..db import get_db
from ..deps import AuthedUser, get_current_user
from ..models import (
ChatMessage,
Dynamic,
DynamicComment,
DynamicLike,
FriendRelation,
User,
)
from ..response import ApiResponse, success_response
router = APIRouter(prefix="/dynamic", tags=["dynamic"])
FEED_PAGE_SIZE = 10
COMMENT_PAGE_SIZE = 5
FEED_CACHE_TTL_SECONDS = 30
_feed_cache: Dict[Tuple[int, int, int], Tuple[float, dict]] = {}
_rate_buckets: Dict[Tuple[str, int], deque] = defaultdict(deque)
def _check_rate_limit(user_id: int, action: str, limit: int, window_seconds: int):
"""
简单的本地节流防止频繁点赞/评论/发布无分布式保障但可挡住瞬时刷接口
"""
now = time.time()
key = (action, user_id)
bucket = _rate_buckets[key]
while bucket and bucket[0] < now - window_seconds:
bucket.popleft()
if len(bucket) >= limit:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="操作过于频繁,请稍后再试",
)
bucket.append(now)
def _check_text_safe(text: Optional[str], field_name: str):
"""调用违禁词检测,检测到风险则抛出 400无法调用时不阻断流程。"""
if not text:
return
try:
resp = requests.post(
"https://uapis.cn/api/v1/text/profanitycheck",
json={"text": text},
headers={"User-Agent": "lover-app/1.0"},
timeout=5,
)
except Exception:
return
if resp.status_code != 200:
return
try:
data = resp.json()
except Exception:
return
status_str = str(data.get("status") or "").lower()
forbidden_words = [str(w).strip() for w in (data.get("forbidden_words") or []) if str(w).strip()]
masked_text = data.get("masked_text")
if status_str == "forbidden" or forbidden_words:
words_str = ", ".join(forbidden_words) if forbidden_words else ""
masked_str = f",屏蔽后:{masked_text}" if masked_text else ""
detail = (
f"{field_name}包含违禁词: {words_str}{masked_str}"
if words_str
else f"{field_name}包含违禁词,请调整后再试{masked_str}"
)
raise HTTPException(status_code=400, detail=detail)
def _get_friend_ids(db: Session, user_id: int) -> List[int]:
rows = (
db.query(FriendRelation)
.filter(
FriendRelation.status == "1",
or_(FriendRelation.user_id == user_id, FriendRelation.friend_id == user_id),
)
.all()
)
result = set()
for row in rows:
if row.user_id == user_id and row.friend_id:
result.add(int(row.friend_id))
elif row.friend_id == user_id and row.user_id:
result.add(int(row.user_id))
return list(result)
def _get_users_map(db: Session, user_ids: List[int]) -> Dict[int, User]:
unique_ids = [uid for uid in set(user_ids) if uid is not None]
if not unique_ids:
return {}
rows = db.query(User).filter(User.id.in_(unique_ids)).all()
return {int(row.id): row for row in rows}
def _user_brief(user_row: Optional[User]) -> "UserBrief":
return UserBrief(
id=int(user_row.id) if user_row and user_row.id is not None else 0,
nickname=user_row.nickname if user_row else "",
avatar=user_row.avatar if user_row else None,
)
def _clear_feed_cache():
_feed_cache.clear()
def _get_cached_feed(user_id: int, page: int, size: int) -> Optional[dict]:
key = (user_id, page, size)
cached = _feed_cache.get(key)
if not cached:
return None
ts, data = cached
if time.time() - ts > FEED_CACHE_TTL_SECONDS:
_feed_cache.pop(key, None)
return None
return data
def _set_feed_cache(user_id: int, page: int, size: int, data: dict):
key = (user_id, page, size)
_feed_cache[key] = (time.time(), data)
class ShareIn(BaseModel):
source_message_id: int = Field(..., gt=0, description="来源消息ID需包含视频URL")
content: Optional[str] = Field(None, max_length=50, description="动态文案,空则默认填充")
class UserBrief(BaseModel):
id: int
nickname: str = ""
avatar: Optional[str] = None
class LikeUserOut(BaseModel):
user: UserBrief
created_at: datetime
class CommentOut(BaseModel):
id: int
user: UserBrief
content: str
created_at: datetime
class DynamicItemOut(BaseModel):
id: int
user: UserBrief
content: str
video_url: str
visibility: str
like_count: int
comment_count: int
likes: List[LikeUserOut]
liked: bool
comments: List[CommentOut]
comments_has_more: bool
created_at: datetime
class FeedOut(BaseModel):
page: int
size: int
has_more: bool
items: List[DynamicItemOut]
class ShareOut(BaseModel):
id: int
content: str
video_url: str
created_at: datetime
class LikeActionIn(BaseModel):
action: str = Field(default="like", description="like 或 unlike")
model_config = ConfigDict(extra="forbid")
@field_validator("action")
@classmethod
def validate_action(cls, v: str) -> str:
if v not in {"like", "unlike"}:
raise ValueError("action 仅支持 like/unlike")
return v
class LikeOut(BaseModel):
dynamic_id: int
like_count: int
liked: bool
class CommentIn(BaseModel):
content: str = Field(..., min_length=1, max_length=50, description="评论内容≤50")
model_config = ConfigDict(extra="forbid")
class CommentCreateOut(BaseModel):
dynamic_id: int
comment: CommentOut
comment_count: int
class CommentPageOut(BaseModel):
dynamic_id: int
page: int
size: int
has_more: bool
comments: List[CommentOut]
def _ensure_dynamic_visible(db: Session, dynamic: Dynamic, user: AuthedUser, friend_ids: List[int]):
if dynamic.deleted_at:
raise HTTPException(status_code=404, detail="动态不存在或已删除")
# 当前仅 friends 可见;预留 public/private
if dynamic.visibility == "private" and dynamic.user_id != user.id:
raise HTTPException(status_code=403, detail="无权查看该动态")
if dynamic.visibility == "friends":
allowed_ids = set(friend_ids)
allowed_ids.add(user.id)
if dynamic.user_id not in allowed_ids:
raise HTTPException(status_code=403, detail="无权查看该动态")
@router.post("/share", response_model=ApiResponse[ShareOut])
async def share_dynamic(
payload: ShareIn,
db: Session = Depends(get_db),
user: AuthedUser = Depends(get_current_user),
):
_check_rate_limit(user.id, "share", limit=5, window_seconds=60)
message = db.query(ChatMessage).filter(ChatMessage.id == payload.source_message_id).first()
if not message:
raise HTTPException(status_code=404, detail="消息不存在")
if message.user_id != user.id:
raise HTTPException(status_code=403, detail="只能分享自己的视频消息")
video_url = None
if isinstance(message.extra, dict):
video_url = message.extra.get("video_url")
if not video_url:
raise HTTPException(status_code=400, detail="该消息没有可分享的视频")
content = (payload.content or "").strip()
if not content:
fallback = f"{user.nickname or '用户'}发布了动态"
content = fallback[:50]
else:
_check_text_safe(content, "动态文案")
dynamic = Dynamic(
user_id=user.id,
lover_id=message.lover_id,
source_message_id=message.id,
video_url=str(video_url),
content=content,
visibility="friends",
created_at=datetime.utcnow(),
updated_at=datetime.utcnow(),
)
db.add(dynamic)
db.flush()
_clear_feed_cache()
return success_response(
ShareOut(
id=dynamic.id,
content=dynamic.content,
video_url=dynamic.video_url,
created_at=dynamic.created_at,
)
)
@router.get("/feed", response_model=ApiResponse[FeedOut])
async def get_feed(
page: int = Query(1, ge=1),
size: int = Query(FEED_PAGE_SIZE, ge=1, le=50),
db: Session = Depends(get_db),
user: AuthedUser = Depends(get_current_user),
):
cached = _get_cached_feed(user.id, page, size)
if cached:
return success_response(FeedOut(**cached))
friend_ids = _get_friend_ids(db, user.id)
allowed_user_ids = set(friend_ids)
allowed_user_ids.add(user.id)
query = (
db.query(Dynamic)
.filter(
Dynamic.deleted_at.is_(None),
Dynamic.visibility == "friends",
Dynamic.user_id.in_(allowed_user_ids),
)
.order_by(Dynamic.created_at.desc())
)
rows = query.offset((page - 1) * size).limit(size + 1).all()
has_more = len(rows) > size
dynamics = rows[:size]
dynamic_ids = [int(d.id) for d in dynamics]
# 点赞批量拉取
likes = (
db.query(DynamicLike)
.filter(
DynamicLike.dynamic_id.in_(dynamic_ids),
DynamicLike.deleted_at.is_(None),
)
.order_by(DynamicLike.created_at.asc())
.all()
)
# 评论按动态单独分页,取前 COMMENT_PAGE_SIZE+1 判断是否还有更多
comments_map: Dict[int, List[DynamicComment]] = {}
for dyn in dynamics:
comments = (
db.query(DynamicComment)
.filter(
DynamicComment.dynamic_id == dyn.id,
DynamicComment.deleted_at.is_(None),
)
.order_by(DynamicComment.created_at.desc())
.limit(COMMENT_PAGE_SIZE + 1)
.all()
)
comments_map[int(dyn.id)] = comments
user_ids: List[int] = []
for dyn in dynamics:
if dyn.user_id:
user_ids.append(int(dyn.user_id))
user_ids.extend(int(l.user_id) for l in likes if l.user_id is not None)
for comment_list in comments_map.values():
user_ids.extend(int(c.user_id) for c in comment_list if c.user_id is not None)
users_map = _get_users_map(db, user_ids)
def render_comments(dyn_id: int) -> Tuple[List[CommentOut], bool]:
comment_rows = comments_map.get(dyn_id) or []
has_more_comments = len(comment_rows) > COMMENT_PAGE_SIZE
trimmed = comment_rows[:COMMENT_PAGE_SIZE]
return (
[
CommentOut(
id=c.id,
user=_user_brief(users_map.get(int(c.user_id))),
content=c.content,
created_at=c.created_at,
)
for c in trimmed
],
has_more_comments,
)
items: List[DynamicItemOut] = []
for dyn in dynamics:
dyn_likes = [lk for lk in likes if int(lk.dynamic_id) == int(dyn.id)]
like_users = [
LikeUserOut(
user=_user_brief(users_map.get(int(l.user_id))),
created_at=l.created_at,
)
for l in dyn_likes
]
comments_out, comments_has_more = render_comments(int(dyn.id))
items.append(
DynamicItemOut(
id=int(dyn.id),
user=_user_brief(users_map.get(int(dyn.user_id))),
content=dyn.content,
video_url=dyn.video_url,
visibility=dyn.visibility,
like_count=dyn.like_count or 0,
comment_count=dyn.comment_count or 0,
likes=like_users,
liked=any(l.user_id == user.id and l.deleted_at is None for l in dyn_likes),
comments=comments_out,
comments_has_more=comments_has_more,
created_at=dyn.created_at,
)
)
resp_data = FeedOut(page=page, size=size, has_more=has_more, items=items)
_set_feed_cache(user.id, page, size, resp_data.model_dump())
return success_response(resp_data)
@router.post("/like/{dynamic_id}", response_model=ApiResponse[LikeOut])
async def like_dynamic(
action: LikeActionIn,
dynamic_id: int = Path(..., gt=0),
db: Session = Depends(get_db),
user: AuthedUser = Depends(get_current_user),
):
_check_rate_limit(user.id, "like", limit=30, window_seconds=60)
dynamic = (
db.query(Dynamic)
.filter(Dynamic.id == dynamic_id, Dynamic.deleted_at.is_(None))
.with_for_update()
.first()
)
if not dynamic:
raise HTTPException(status_code=404, detail="动态不存在")
friend_ids = _get_friend_ids(db, user.id)
_ensure_dynamic_visible(db, dynamic, user, friend_ids)
like_row = (
db.query(DynamicLike)
.filter(DynamicLike.dynamic_id == dynamic_id, DynamicLike.user_id == user.id)
.with_for_update()
.first()
)
now = datetime.utcnow()
liked = action.action == "like"
liked_result = liked
if liked:
if like_row and like_row.deleted_at is None:
pass
elif like_row:
like_row.deleted_at = None
like_row.created_at = now
dynamic.like_count = (dynamic.like_count or 0) + 1
db.add(like_row)
else:
db.add(
DynamicLike(
dynamic_id=dynamic_id,
user_id=user.id,
created_at=now,
)
)
dynamic.like_count = (dynamic.like_count or 0) + 1
else:
# 未点过赞不能取消
if not like_row or like_row.deleted_at is not None:
raise HTTPException(status_code=400, detail="尚未点赞,无法取消点赞")
like_row.deleted_at = now
dynamic.like_count = max((dynamic.like_count or 0) - 1, 0)
db.add(like_row)
liked_result = False
db.add(dynamic)
_clear_feed_cache()
try:
db.flush()
except IntegrityError:
db.rollback()
raise HTTPException(status_code=409, detail="点赞请求冲突,请稍后重试")
return success_response(
LikeOut(dynamic_id=dynamic_id, like_count=dynamic.like_count or 0, liked=liked_result)
)
@router.post("/comment/{dynamic_id}", response_model=ApiResponse[CommentCreateOut])
async def comment_dynamic(
payload: CommentIn,
dynamic_id: int = Path(..., gt=0),
db: Session = Depends(get_db),
user: AuthedUser = Depends(get_current_user),
):
_check_rate_limit(user.id, "comment", limit=10, window_seconds=60)
dynamic = (
db.query(Dynamic)
.filter(Dynamic.id == dynamic_id, Dynamic.deleted_at.is_(None))
.with_for_update()
.first()
)
if not dynamic:
raise HTTPException(status_code=404, detail="动态不存在")
friend_ids = _get_friend_ids(db, user.id)
_ensure_dynamic_visible(db, dynamic, user, friend_ids)
content = payload.content.strip()
if not content:
raise HTTPException(status_code=400, detail="评论内容不能为空")
_check_text_safe(content, "评论内容")
now = datetime.utcnow()
comment_row = DynamicComment(
dynamic_id=dynamic_id,
user_id=user.id,
content=content,
created_at=now,
)
db.add(comment_row)
dynamic.comment_count = (dynamic.comment_count or 0) + 1
db.add(dynamic)
db.flush()
author = db.query(User).filter(User.id == user.id).first()
_clear_feed_cache()
comment_out = CommentOut(
id=comment_row.id,
user=_user_brief(author),
content=comment_row.content,
created_at=comment_row.created_at,
)
return success_response(
CommentCreateOut(
dynamic_id=dynamic_id,
comment=comment_out,
comment_count=dynamic.comment_count or 0,
)
)
async def _list_comments_impl(
dynamic_id: int = Path(..., gt=0),
page: int = Query(1, ge=1),
size: int = Query(COMMENT_PAGE_SIZE, ge=1, le=50),
db: Session = Depends(get_db),
user: AuthedUser = Depends(get_current_user),
):
dynamic = (
db.query(Dynamic)
.filter(Dynamic.id == dynamic_id, Dynamic.deleted_at.is_(None))
.first()
)
if not dynamic:
raise HTTPException(status_code=404, detail="动态不存在")
friend_ids = _get_friend_ids(db, user.id)
_ensure_dynamic_visible(db, dynamic, user, friend_ids)
rows = (
db.query(DynamicComment)
.filter(DynamicComment.dynamic_id == dynamic_id, DynamicComment.deleted_at.is_(None))
.order_by(DynamicComment.created_at.desc())
.offset((page - 1) * size)
.limit(size + 1)
.all()
)
has_more = len(rows) > size
comments = rows[:size]
user_ids = [int(c.user_id) for c in comments if c.user_id is not None]
users_map = _get_users_map(db, user_ids)
comments_out = [
CommentOut(
id=c.id,
user=_user_brief(users_map.get(int(c.user_id))),
content=c.content,
created_at=c.created_at,
)
for c in comments
]
return success_response(
CommentPageOut(
dynamic_id=dynamic_id,
page=page,
size=size,
has_more=has_more,
comments=comments_out,
)
)
@router.get("/comments/{dynamic_id}", response_model=ApiResponse[CommentPageOut])
async def list_comments(
dynamic_id: int = Path(..., gt=0),
page: int = Query(1, ge=1),
size: int = Query(COMMENT_PAGE_SIZE, ge=1, le=50),
db: Session = Depends(get_db),
user: AuthedUser = Depends(get_current_user),
):
return await _list_comments_impl(dynamic_id=dynamic_id, page=page, size=size, db=db, user=user)