from typing import Dict, List, Optional import json import time import requests import oss2 from datetime import datetime from fastapi import APIRouter, Depends, HTTPException, Query from pydantic import BaseModel, Field, field_validator from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from ..db import get_db from ..deps import AuthedUser, get_current_user from decimal import Decimal from ..models import Lover, OutfitItem, OutfitLook, User, OutfitPurchaseLog, UserMoneyLog from ..response import ApiResponse, success_response from ..config import settings router = APIRouter(prefix="/outfit", tags=["outfit"]) def _ensure_balance(user_row: User) -> Decimal: try: return Decimal(str(user_row.money or "0")) except Exception: return Decimal("0") def _now_ts() -> int: import time return int(time.time()) def _parse_owned_outfits(raw: Optional[str]) -> set[int]: owned: set[int] = set() if not raw: return owned # 已是列表 if isinstance(raw, list): for v in raw: try: owned.add(int(v)) except Exception: continue return owned # JSON 字符串 if isinstance(raw, str): try: parsed = json.loads(raw) if isinstance(parsed, list): for v in parsed: try: owned.add(int(v)) except Exception: continue return owned except Exception: # 逗号分隔兜底 for part in str(raw).split(","): part = part.strip() if part.isdigit(): owned.add(int(part)) return owned return owned def _serialize_owned_outfits(ids: set[int]) -> list[int]: return sorted(list(ids)) def _cdnize(url: Optional[str]) -> Optional[str]: """ 将以 /uploads 开头的相对路径补全为 CDN/OSS 完整 URL。 """ if not url: return url if url.startswith("http://") or url.startswith("https://"): return url prefix = "https://nvlovers.oss-cn-qingdao.aliyuncs.com" if url.startswith("/"): return prefix + url return f"{prefix}/{url}" def _clean_url(url: Optional[str]) -> Optional[str]: """去掉首尾空白,避免试衣接口因空格拒绝。""" if url is None: return None return url.strip() def _upload_to_oss(file_bytes: bytes, object_name: str) -> str: """上传到 OSS,返回可访问 URL(优先 CDN 域名)。""" auth = oss2.Auth(settings.ALIYUN_OSS_ACCESS_KEY_ID, settings.ALIYUN_OSS_ACCESS_KEY_SECRET) endpoint = settings.ALIYUN_OSS_ENDPOINT.rstrip("/") bucket = oss2.Bucket(auth, endpoint, settings.ALIYUN_OSS_BUCKET_NAME) bucket.put_object(object_name, file_bytes) cdn = settings.ALIYUN_OSS_CDN_DOMAIN if cdn: return f"{cdn.rstrip('/')}/{object_name}" return f"https://{settings.ALIYUN_OSS_BUCKET_NAME}.{endpoint.replace('https://', '').replace('http://', '')}/{object_name}" def _call_aitryon(person_image_url: str, top_url: Optional[str], bottom_url: Optional[str]) -> str: """ 调用 DashScope AI试衣-基础版(aitryon),同步轮询结果,返回 image_url。 """ api_key = (settings.DASHSCOPE_API_KEY or "").strip() if not api_key: raise HTTPException(status_code=500, detail="未配置 DASHSCOPE_API_KEY") headers = { "Content-Type": "application/json", "Accept": "application/json", "Authorization": f"Bearer {api_key}", "X-DashScope-Async": "enable", } payload = { "model": settings.TRYON_MODEL or "aitryon", "input": { "person_image_url": _clean_url(person_image_url), }, "parameters": { "resolution": -1, "restore_face": True, }, } if top_url: payload["input"]["top_garment_url"] = _clean_url(top_url) if bottom_url: payload["input"]["bottom_garment_url"] = _clean_url(bottom_url) try: create_resp = requests.post( "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2image/image-synthesis/", headers=headers, json=payload, timeout=10, ) except Exception as exc: raise HTTPException(status_code=502, detail="试衣任务创建失败") from exc if create_resp.status_code != 200: err_msg = create_resp.text or "试衣任务创建失败" raise HTTPException(status_code=502, detail=f"试衣任务创建失败({create_resp.status_code}): {err_msg}") task_info = create_resp.json() task_id = task_info.get("output", {}).get("task_id") or task_info.get("task_id") if not task_id: raise HTTPException(status_code=502, detail="试衣任务未返回task_id") query_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}" deadline = time.time() + 60 while time.time() < deadline: time.sleep(2) try: q_resp = requests.get( query_url, headers={ "Authorization": f"Bearer {api_key}", "Accept": "application/json", }, timeout=8, ) except Exception: continue if q_resp.status_code != 200: continue data = q_resp.json() status_str = str( data.get("output", {}).get("task_status") or data.get("task_status") or data.get("status") ).lower() if status_str in ("succeeded", "success", "succeed", "finished"): image_url = ( data.get("output", {}).get("image_url") or (data.get("output", {}).get("results") or [{}])[0].get("url") ) if image_url: return image_url raise HTTPException(status_code=502, detail="试衣任务返回空结果") if status_str in ("failed", "error", "canceled"): err = data.get("output", {}).get("message") or data.get("message") or "试衣任务失败" raise HTTPException(status_code=502, detail=f"试衣失败: {err}") raise HTTPException(status_code=504, detail="试衣任务超时") class OutfitItemOut(BaseModel): id: int name: str category: str gender: str image_url: str is_free: bool price_gold: int is_vip_only: bool is_owned: bool class OutfitLookOut(BaseModel): id: int image_url: str class OutfitListResponse(BaseModel): top: List[OutfitItemOut] bottom: List[OutfitItemOut] dress: List[OutfitItemOut] top_total: int bottom_total: int dress_total: int page: int size: int balance: float clothes_num: int outfit_slots: int owned_outfit_ids: List[int] = Field(default_factory=list) current_outfit: Optional[dict] = None looks: List[OutfitLookOut] = Field(default_factory=list) class OutfitPurchaseIn(BaseModel): item_id: int # 目前仅小程序,预留字段,未来扩展平台时再打开校验 platform: Optional[str] = Field(default=None) class OutfitPurchaseOut(BaseModel): item_id: int is_owned: bool balance: float owned_outfit_ids: List[int] class OutfitChangeIn(BaseModel): top_item_id: Optional[int] = None bottom_item_id: Optional[int] = None dress_item_id: Optional[int] = None save_to_look: bool = False look_name: Optional[str] = None result_image_url: Optional[str] = None # 若前端已有生成结果,可传入落库 @field_validator("top_item_id", "bottom_item_id", "dress_item_id", mode="before") @classmethod def empty_str_to_none(cls, v): # 兼容前端传空字符串导致 422 的情况 if isinstance(v, str) and v.strip() == "": return None return v class OutfitChangeOut(BaseModel): image_url: Optional[str] = None current_outfit: dict clothes_num: int owned_outfit_ids: List[int] looks: List[OutfitLookOut] = Field(default_factory=list) class OutfitLooksResponse(BaseModel): looks: List[OutfitLookOut] class OutfitUseResponse(BaseModel): image_url: str current_outfit: dict looks: List[OutfitLookOut] class OutfitMallItem(BaseModel): id: int name: str image_url: str price_gold: int gender: str category: str class OutfitMallResponse(BaseModel): items: List[OutfitMallItem] owned_outfit_ids: List[int] balance: float @router.post("/purchase", response_model=ApiResponse[OutfitPurchaseOut]) def purchase_outfit( payload: OutfitPurchaseIn, db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): """ 购买服饰: - 校验性别匹配(恋人 gender 或 unisex) - 免费不可购买 - 已拥有不可购买 - 校验金币余额 """ lover = db.query(Lover).filter(Lover.user_id == user.id).first() if not lover: raise HTTPException(status_code=404, detail="恋人未找到") item = db.query(OutfitItem).filter(OutfitItem.id == payload.item_id, OutfitItem.status == "1").first() if not item: raise HTTPException(status_code=404, detail="服饰不存在或已下架") # 性别校验 if item.gender not in ("unisex", lover.gender): raise HTTPException(status_code=400, detail="服饰与恋人性别不匹配") # 免费服饰无需购买 if bool(item.is_free): raise HTTPException(status_code=400, detail="该服饰为免费,无需购买") try: user_row = ( db.query(User) .filter(User.id == user.id) .with_for_update() .first() ) except Exception: user_row = None if not user_row: raise HTTPException(status_code=404, detail="用户不存在") balance = _ensure_balance(user_row) owned_ids = _parse_owned_outfits(user_row.owned_outfit_ids) if int(item.id) in owned_ids: raise HTTPException(status_code=400, detail="已拥有该服饰,无需重复购买") price = Decimal(item.price_gold or 0) if price <= 0: raise HTTPException(status_code=400, detail="服饰价格异常") if balance < price: raise HTTPException(status_code=400, detail="余额不足") # 扣减余额,写拥有数据 & 日志(行锁保证并发安全) before_balance = balance balance -= price user_row.money = float(balance) owned_ids.add(int(item.id)) user_row.owned_outfit_ids = _serialize_owned_outfits(owned_ids) db.add(user_row) db.add( OutfitPurchaseLog( user_id=user.id, item_id=item.id, price_gold=int(price), platform="miniapp", # 目前仅小程序 status="success", createtime=_now_ts(), updatetime=_now_ts(), remark=None, ) ) db.add( UserMoneyLog( user_id=user.id, money=-price, before=before_balance, after=Decimal(user_row.money), memo=f"购买服饰:{item.name}", createtime=_now_ts(), ) ) try: db.flush() except IntegrityError: db.rollback() raise HTTPException(status_code=409, detail="购买请求冲突,请重试") return success_response( OutfitPurchaseOut( item_id=item.id, is_owned=True, balance=float(balance), owned_outfit_ids=sorted(list(owned_ids)), ) ) @router.post("/change", response_model=ApiResponse[OutfitChangeOut]) def change_outfit( payload: OutfitChangeIn, db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): """ 换装接口: - 校验换装次数 `clothes_num` > 0 - 校验所选服饰存在、性别匹配、已拥有或免费 - 连衣裙(dress) 与 上/下装互斥;上/下装可单选或组合 - 可选择保存当前形象到形象栏(需有空位) - 更新当前穿戴服饰 ID,落库传入的 result_image_url(如未提供则沿用旧图) """ lover = ( db.query(Lover) .filter(Lover.user_id == user.id) .with_for_update() .first() ) if not lover: raise HTTPException(status_code=404, detail="恋人未找到") user_row = ( db.query(User) .filter(User.id == user.id) .with_for_update() .first() ) if not user_row: raise HTTPException(status_code=404, detail="用户不存在") # 换装次数校验 clothes_num = int(user_row.clothes_num or 0) if clothes_num <= 0: raise HTTPException(status_code=400, detail="换装次数不足") # 选择校验 top_id = payload.top_item_id bottom_id = payload.bottom_item_id dress_id = payload.dress_item_id if not any([top_id, bottom_id, dress_id]): raise HTTPException(status_code=400, detail="请选择至少一个服饰") if dress_id and (top_id or bottom_id): raise HTTPException(status_code=400, detail="连衣裙/连体服与上/下装不可同时选择") # 获取拥有集合 owned_ids = _parse_owned_outfits(user_row.owned_outfit_ids) def check_item(item_id: int, expect_category: str): item = ( db.query(OutfitItem) .filter( OutfitItem.id == item_id, OutfitItem.category == expect_category, OutfitItem.status == "1", ) .first() ) if not item: raise HTTPException(status_code=404, detail="服饰不存在或已下架") if item.gender not in ("unisex", lover.gender): raise HTTPException(status_code=400, detail="服饰与恋人性别不匹配") if not bool(item.is_free) and int(item.id) not in owned_ids: raise HTTPException(status_code=400, detail="该服饰未拥有") return item top_item = check_item(top_id, "top") if top_id else None bottom_item = check_item(bottom_id, "bottom") if bottom_id else None dress_item = check_item(dress_id, "dress") if dress_id else None # 保存前的形象信息(成功后若需要再写入形象栏) prev_image = lover.image_url prev_top = lover.outfit_top_id prev_bottom = lover.outfit_bottom_id prev_dress = lover.outfit_dress_id slot_limit = int(user_row.outfit_slots or 0) or 0 if payload.save_to_look: if slot_limit <= 0: raise HTTPException(status_code=400, detail="形象栏位已满") existing_count = ( db.query(OutfitLook) .filter(OutfitLook.user_id == user.id, OutfitLook.deleted_at.is_(None)) .with_for_update() .count() ) if existing_count >= slot_limit: raise HTTPException(status_code=400, detail="形象栏位已满") # 如需生成,调用试衣模型 result_url = payload.result_image_url if not result_url: person_image = _cdnize(_clean_url(lover.image_url)) if not person_image: raise HTTPException(status_code=400, detail="缺少模特图") top_url = _cdnize(_clean_url(top_item.image_url)) if top_item else None bottom_url = _cdnize(_clean_url(bottom_item.image_url)) if bottom_item else None # 连衣裙走 top_garment_url if dress_item: top_url = _cdnize(_clean_url(dress_item.image_url)) bottom_url = None result_url = _call_aitryon(person_image, top_url, bottom_url) # 扣减换装次数 user_row.clothes_num = max(0, clothes_num - 1) # 更新当前穿戴 lover.outfit_top_id = top_item.id if top_item else None lover.outfit_bottom_id = bottom_item.id if bottom_item else None lover.outfit_dress_id = dress_item.id if dress_item else None # 下载试衣结果并转存 OSS,避免临时 URL 失效 try: img_resp = requests.get(result_url, timeout=15) if img_resp.status_code != 200: raise HTTPException(status_code=502, detail="生成图片下载失败") object_name = f"lover/{lover.id}/images/{int(time.time())}_outfit.jpg" oss_url = _upload_to_oss(img_resp.content, object_name) result_url = oss_url except HTTPException: raise except Exception as exc: raise HTTPException(status_code=502, detail="生成图片保存失败") from exc lover.image_url = _cdnize(result_url) db.add(user_row) db.add(lover) # 成功后按需保存先前形象到形象栏 if payload.save_to_look: db.add( OutfitLook( user_id=user.id, lover_id=lover.id, name=payload.look_name or "当前形象", image_url=_cdnize(prev_image) or "", top_item_id=prev_top, bottom_item_id=prev_bottom, dress_item_id=prev_dress, createtime=datetime.utcnow(), updatetime=datetime.utcnow(), ) ) db.flush() looks = ( db.query(OutfitLook) .filter(OutfitLook.user_id == user.id, OutfitLook.deleted_at.is_(None)) .order_by(OutfitLook.id.desc()) .all() ) return success_response( OutfitChangeOut( image_url=lover.image_url, current_outfit={ "top_id": lover.outfit_top_id, "bottom_id": lover.outfit_bottom_id, "dress_id": lover.outfit_dress_id, }, clothes_num=user_row.clothes_num, owned_outfit_ids=_serialize_owned_outfits(owned_ids), looks=[ OutfitLookOut( id=lk.id, image_url=_cdnize(lk.image_url), ) for lk in looks ], ) ) @router.delete("/looks/{look_id}", response_model=ApiResponse[OutfitLooksResponse]) def delete_look( look_id: int, db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): """ 删除形象栏记录(软删),返回删除后的形象栏列表。 """ look = ( db.query(OutfitLook) .filter(OutfitLook.id == look_id, OutfitLook.user_id == user.id, OutfitLook.deleted_at.is_(None)) .first() ) if not look: raise HTTPException(status_code=404, detail="形象不存在") look.deleted_at = datetime.utcnow() db.add(look) db.flush() looks = ( db.query(OutfitLook) .filter(OutfitLook.user_id == user.id, OutfitLook.deleted_at.is_(None)) .order_by(OutfitLook.id.desc()) .all() ) return success_response( OutfitLooksResponse( looks=[ OutfitLookOut( id=lk.id, image_url=_cdnize(lk.image_url), ) for lk in looks ] ) ) @router.post("/looks/{look_id}/use", response_model=ApiResponse[OutfitUseResponse]) @router.post("/looks/use/{look_id}", response_model=ApiResponse[OutfitUseResponse]) def use_look( look_id: int, db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): """ 将形象栏的服饰组合和图片应用为当前形象。 """ lover = db.query(Lover).filter(Lover.user_id == user.id).first() if not lover: raise HTTPException(status_code=404, detail="恋人未找到") # 记录当前恋人形象,用于互换 prev_image = lover.image_url prev_top = lover.outfit_top_id prev_bottom = lover.outfit_bottom_id prev_dress = lover.outfit_dress_id look = ( db.query(OutfitLook) .filter(OutfitLook.id == look_id, OutfitLook.user_id == user.id, OutfitLook.deleted_at.is_(None)) .first() ) if not look: raise HTTPException(status_code=404, detail="形象不存在") if not look.image_url: raise HTTPException(status_code=400, detail="形象缺少图片") lover.outfit_top_id = look.top_item_id lover.outfit_bottom_id = look.bottom_item_id lover.outfit_dress_id = look.dress_item_id lover.image_url = _cdnize(look.image_url) # 将原恋人形象写回该形象栏记录,实现互换 look.top_item_id = prev_top look.bottom_item_id = prev_bottom look.dress_item_id = prev_dress look.image_url = _cdnize(prev_image) if prev_image else look.image_url db.add(lover) db.add(look) db.flush() looks = ( db.query(OutfitLook) .filter(OutfitLook.user_id == user.id, OutfitLook.deleted_at.is_(None)) .order_by(OutfitLook.id.desc()) .all() ) return success_response( OutfitUseResponse( image_url=lover.image_url, current_outfit={ "top_id": lover.outfit_top_id, "bottom_id": lover.outfit_bottom_id, "dress_id": lover.outfit_dress_id, }, looks=[ OutfitLookOut( id=lk.id, image_url=_cdnize(lk.image_url), ) for lk in looks ], ) ) @router.get("/list", response_model=ApiResponse[OutfitListResponse]) def list_outfits( page: int = Query(1, ge=1), size: int = Query(7, ge=1), db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): """ 根据恋人性别 + unisex 返回三类服饰列表(上装/下装/连衣裙),分页(默认每页 7),并回传当前穿戴的服饰 ID 与形象栏信息。 """ lover = db.query(Lover).filter(Lover.user_id == user.id).first() if not lover: raise HTTPException(status_code=404, detail="恋人未找到") gender_filter = [lover.gender, "unisex"] # 用户已拥有的服饰集合:仅使用 nf_user.owned_outfit_ids(逗号或 JSON) owned_ids: set[int] = set() user_row = db.query(User).filter(User.id == user.id).first() balance_val = 0.0 clothes_num_val = 0 outfit_slots_val = 0 if user_row: balance_val = float(_ensure_balance(user_row)) owned_ids = _parse_owned_outfits(user_row.owned_outfit_ids) clothes_num_val = int(user_row.clothes_num or 0) outfit_slots_val = int(user_row.outfit_slots or 0) def fetch_category(cat: str): q = ( db.query(OutfitItem) .filter( OutfitItem.category == cat, OutfitItem.gender.in_(gender_filter), OutfitItem.status == "1", ) .order_by(OutfitItem.weigh.desc(), OutfitItem.id.asc()) ) total_count = q.count() records = q.offset((page - 1) * size).limit(size).all() return total_count, records looks = ( db.query(OutfitLook) .filter(OutfitLook.user_id == user.id, OutfitLook.deleted_at.is_(None)) .order_by(OutfitLook.id.desc()) .all() ) top_total, top_items = fetch_category("top") bottom_total, bottom_items = fetch_category("bottom") dress_total, dress_items = fetch_category("dress") return success_response( OutfitListResponse( top=[ OutfitItemOut( id=item.id, name=item.name, category=item.category, gender=item.gender, image_url=_cdnize(item.image_url), is_free=bool(item.is_free), price_gold=item.price_gold or 0, is_vip_only=bool(item.is_vip_only), is_owned=item.id in owned_ids, ) for item in top_items ], bottom=[ OutfitItemOut( id=item.id, name=item.name, category=item.category, gender=item.gender, image_url=_cdnize(item.image_url), is_free=bool(item.is_free), price_gold=item.price_gold or 0, is_vip_only=bool(item.is_vip_only), is_owned=item.id in owned_ids, ) for item in bottom_items ], dress=[ OutfitItemOut( id=item.id, name=item.name, category=item.category, gender=item.gender, image_url=_cdnize(item.image_url), is_free=bool(item.is_free), price_gold=item.price_gold or 0, is_vip_only=bool(item.is_vip_only), is_owned=item.id in owned_ids, ) for item in dress_items ], top_total=top_total, bottom_total=bottom_total, dress_total=dress_total, page=page, size=size, balance=balance_val, clothes_num=clothes_num_val, outfit_slots=outfit_slots_val, owned_outfit_ids=sorted(list(owned_ids)), current_outfit={ "top_id": lover.outfit_top_id, "bottom_id": lover.outfit_bottom_id, "dress_id": lover.outfit_dress_id, }, looks=[ OutfitLookOut( id=lk.id, image_url=_cdnize(lk.image_url), ) for lk in looks ], ) ) @router.get("/mall", response_model=ApiResponse[OutfitMallResponse]) def list_paid_outfits( db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): """ 金币商场:返回当前恋人性别(含 unisex)的所有付费服饰、已拥有列表和金币余额。 """ lover = db.query(Lover).filter(Lover.user_id == user.id).first() if not lover: raise HTTPException(status_code=404, detail="恋人未找到") user_row = db.query(User).filter(User.id == user.id).first() if not user_row: raise HTTPException(status_code=404, detail="用户不存在") balance = float(_ensure_balance(user_row)) owned_ids = _parse_owned_outfits(user_row.owned_outfit_ids) gender_filter = [lover.gender, "unisex"] items = ( db.query(OutfitItem) .filter( OutfitItem.status == "1", OutfitItem.is_free.is_(False), OutfitItem.price_gold > 0, OutfitItem.gender.in_(gender_filter), ) .order_by(OutfitItem.weigh.desc(), OutfitItem.id.asc()) .all() ) return success_response( OutfitMallResponse( items=[ OutfitMallItem( id=item.id, name=item.name, image_url=_cdnize(item.image_url), price_gold=item.price_gold or 0, gender=item.gender, category=item.category, ) for item in items ], owned_outfit_ids=sorted(list(owned_ids)), balance=balance, ) )