import hashlib import math import os import subprocess import tempfile import threading import time from contextlib import contextmanager from datetime import datetime, timedelta from typing import List, Optional import oss2 import requests from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException from pydantic import BaseModel, ConfigDict, Field from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from ..config import settings from ..db import SessionLocal, get_db from ..deps import AuthedUser, get_current_user from ..models import ( ChatMessage, ChatSession, EmoDetectCache, GenerationTask, Lover, SingBaseVideo, SingSongVideo, SongLibrary, SongSegment, SongSegmentVideo, User, ) from ..response import ApiResponse, success_response from ..task_queue import sing_task_queue router = APIRouter(prefix="/sing", tags=["sing"]) SING_BASE_MODEL = "wan2.5-i2v-preview" SING_BASE_RESOLUTION = "480P" SING_WAN26_MODEL = "wan2.6-i2v-flash" SING_WAN26_RESOLUTION = "720P" SING_BASE_DURATION = 5 SING_BASE_PROMPT = ( "front-facing full-body, modest outfit; camera locked on tripod; " "head and neck fixed, body still; " "natural singing with soft lip articulation and varied mouth shapes, small-to-medium opening, smooth transitions; " "subtle jaw motion only; gentle expressive hand gestures near chest; " "realistic lighting, sharp details; pose consistent from start to end" ) SING_BASE_NEGATIVE_PROMPT = ( "nudity, cleavage, lingerie, bikini, see-through, sexualized, provocative; " "camera movement, zoom, pan, shake, head turning, head bobbing, body swaying, dancing, walking; " "exaggerated expressions; chewing, biting, eating, fish mouth, O-shaped mouth, " "mouth stuck open, mouth stuck closed, repetitive mechanical mouth motion, extreme mouth shapes, lip deformation; " "blur, artifacts, watermark, text, bad hands, extra fingers" ) SING_WAN26_PROMPT = ( "front-facing full-body, modest outfit; camera locked on tripod; " "head and neck fixed, body still; " "lip-sync to the provided audio lyrics with natural mouth articulation and varied mouth shapes, " "clear closures between syllables, smooth transitions; " "subtle jaw motion only; gentle expressive hand gestures near chest; " "realistic lighting, sharp details; pose consistent from start to end" ) SING_WAN26_NEGATIVE_PROMPT = ( "nudity, cleavage, lingerie, bikini, see-through, sexualized, provocative; " "camera movement, zoom, pan, shake, head turning, head bobbing, body swaying, dancing, walking; " "exaggerated expressions; chewing, biting, eating, fish mouth, O-shaped mouth, " "mouth stuck open, mouth stuck closed, out-of-sync mouth motion, repetitive mechanical mouth motion, " "extreme mouth shapes, lip deformation; " "blur, artifacts, watermark, text, bad hands, extra fingers" ) SING_BASE_PROMPT_EXTEND = False SING_BASE_WAIT_SECONDS = 180 SING_MERGE_WAIT_SECONDS = 60 SING_REQUEUE_COOLDOWN_SECONDS = 5 EMO_DETECT_MODEL = "emo-detect-v1" EMO_MODEL = "emo-v1" EMO_RATIO = "3:4" EMO_STYLE_LEVEL = "normal" EMO_SEGMENT_SECONDS = 60 EMO_TASK_TIMEOUT_SECONDS = 1800 EMO_BACKFILL_STALE_SECONDS = 60 EMO_BACKFILL_MIN_INTERVAL_SECONDS = 20 EMO_CONTENT_SAFETY_CODE = "DataInspectionFailed" EMO_CONTENT_SAFETY_MESSAGE = "歌词内容触发了阿里云的内容安全审核机制" _sing_merge_semaphore = threading.BoundedSemaphore(max(1, settings.SING_MERGE_MAX_CONCURRENCY or 1)) _sing_enqueue_lock = threading.Lock() _sing_last_enqueue_at: dict[int, float] = {} _emo_task_semaphore = threading.BoundedSemaphore(max(1, settings.EMO_MAX_CONCURRENCY or 1)) @contextmanager def _semaphore_guard(semaphore: threading.BoundedSemaphore): semaphore.acquire() try: yield finally: semaphore.release() def _cdnize(url: Optional[str]) -> Optional[str]: """ 将相对路径补全为可访问 URL。优先使用 CDN,其次 bucket+endpoint,最后兜底固定域名。 """ if not url: return url cleaned = url.strip() if cleaned.startswith("http://") or cleaned.startswith("https://"): return cleaned # 去掉首个斜杠,防止双斜杠 cleaned = cleaned.lstrip("/") if settings.ALIYUN_OSS_CDN_DOMAIN: return f"{settings.ALIYUN_OSS_CDN_DOMAIN.rstrip('/')}/{cleaned}" if settings.ALIYUN_OSS_BUCKET_NAME and settings.ALIYUN_OSS_ENDPOINT: endpoint = settings.ALIYUN_OSS_ENDPOINT.rstrip("/").replace("https://", "").replace("http://", "") return f"https://{settings.ALIYUN_OSS_BUCKET_NAME}.{endpoint}/{cleaned}" # 兜底:项目历史使用的公开域名 return f"https://nvlovers.oss-cn-qingdao.aliyuncs.com/{cleaned}" def _extract_error_text(exc: Exception) -> str: if isinstance(exc, HTTPException): detail = exc.detail return detail if isinstance(detail, str) else str(detail) return str(exc) def _is_content_safety_error(value: Optional[str]) -> bool: if not value: return False return EMO_CONTENT_SAFETY_CODE.lower() in value.lower() def _build_sing_message_content(video_url: str, content_safety_blocked: bool) -> str: if content_safety_blocked: if video_url: return f"{EMO_CONTENT_SAFETY_MESSAGE},已生成部分视频,点击查看:{video_url}" return EMO_CONTENT_SAFETY_MESSAGE if video_url: return f"为你生成了一段唱歌视频,点击查看:{video_url}" return "为你生成了一段唱歌视频" def _upload_to_oss(file_bytes: bytes, object_name: str) -> str: """上传到 OSS,返回可访问 URL(优先 CDN 域名)。""" if not settings.ALIYUN_OSS_ACCESS_KEY_ID or not settings.ALIYUN_OSS_ACCESS_KEY_SECRET: raise HTTPException(status_code=500, detail="未配置 OSS Key") if not settings.ALIYUN_OSS_BUCKET_NAME or not settings.ALIYUN_OSS_ENDPOINT: raise HTTPException(status_code=500, detail="未配置 OSS Bucket/Endpoint") auth = oss2.Auth(settings.ALIYUN_OSS_ACCESS_KEY_ID, settings.ALIYUN_OSS_ACCESS_KEY_SECRET) endpoint = settings.ALIYUN_OSS_ENDPOINT.rstrip("/") bucket = oss2.Bucket(auth, endpoint, settings.ALIYUN_OSS_BUCKET_NAME) bucket.put_object(object_name, file_bytes) cdn = settings.ALIYUN_OSS_CDN_DOMAIN if cdn: return f"{cdn.rstrip('/')}/{object_name}" return f"https://{settings.ALIYUN_OSS_BUCKET_NAME}.{endpoint.replace('https://', '').replace('http://', '')}/{object_name}" def _hash_text(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest() def _hash_file(path: str) -> str: hash_obj = hashlib.sha256() with open(path, "rb") as file_handle: while True: chunk = file_handle.read(1024 * 1024) if not chunk: break hash_obj.update(chunk) return hash_obj.hexdigest() def _build_prompt_hash( prompt: str, negative_prompt: Optional[str], audio_hash: Optional[str] = None, use_audio: bool = False, ) -> str: base = f"{prompt}\n--NEG--\n{negative_prompt}" if negative_prompt else prompt if use_audio and audio_hash: base = f"{base}\n--AUDIO--\n{audio_hash}" return _hash_text(base) def _resolve_sing_base_config(_: Optional[str]) -> tuple[str, str, int, bool]: return SING_WAN26_MODEL, SING_WAN26_RESOLUTION, SING_BASE_DURATION, True def _resolve_sing_prompts(model: str) -> tuple[str, str]: if model == SING_WAN26_MODEL: return SING_WAN26_PROMPT, SING_WAN26_NEGATIVE_PROMPT return SING_BASE_PROMPT, SING_BASE_NEGATIVE_PROMPT def _download_to_path(url: str, target_path: str): try: resp = requests.get(url, stream=True, timeout=30) except Exception as exc: raise HTTPException(status_code=502, detail="文件下载失败") from exc if resp.status_code != 200: raise HTTPException(status_code=502, detail="文件下载失败") try: with open(target_path, "wb") as file_handle: for chunk in resp.iter_content(chunk_size=1024 * 1024): if chunk: file_handle.write(chunk) finally: resp.close() def _emo_detect(image_url: str, ratio: str) -> dict: if not settings.DASHSCOPE_API_KEY: raise HTTPException(status_code=500, detail="未配置 DASHSCOPE_API_KEY") payload = { "model": EMO_DETECT_MODEL, "input": {"image_url": image_url}, "parameters": {"ratio": ratio}, } headers = { "Authorization": f"Bearer {settings.DASHSCOPE_API_KEY}", "Content-Type": "application/json", } try: resp = requests.post( "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/face-detect", headers=headers, json=payload, timeout=15, ) except Exception as exc: raise HTTPException(status_code=502, detail="调用EMO检测失败") from exc if resp.status_code != 200: msg = resp.text try: msg = resp.json().get("message") or msg except Exception: pass raise HTTPException(status_code=502, detail=f"EMO检测失败: {msg}") try: data = resp.json() except Exception as exc: raise HTTPException(status_code=502, detail="EMO检测返回解析失败") from exc output = data.get("output") or {} return { "check_pass": bool(output.get("check_pass")), "face_bbox": output.get("face_bbox"), "ext_bbox": output.get("ext_bbox"), "raw": data, } def _ensure_emo_detect_cache( db: Session, lover_id: int, image_url: str, image_hash: str, ratio: str, ) -> EmoDetectCache: cached = ( db.query(EmoDetectCache) .filter(EmoDetectCache.image_hash == image_hash, EmoDetectCache.ratio == ratio) .first() ) if cached: return cached result = _emo_detect(image_url, ratio) cached = EmoDetectCache( lover_id=lover_id, image_url=image_url, image_hash=image_hash, ratio=ratio, check_pass=result.get("check_pass", False), face_bbox=result.get("face_bbox"), ext_bbox=result.get("ext_bbox"), raw_response=result.get("raw"), ) db.add(cached) try: db.flush() except IntegrityError: db.rollback() cached = ( db.query(EmoDetectCache) .filter(EmoDetectCache.image_hash == image_hash, EmoDetectCache.ratio == ratio) .first() ) if cached: return cached raise return cached def _probe_media_duration(path: str) -> Optional[float]: command = [ "ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", path, ] try: result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError: return None except subprocess.CalledProcessError: return None raw = result.stdout.decode("utf-8", errors="ignore").strip() if not raw: return None try: duration = float(raw) except ValueError: return None if duration <= 0: return None return duration def _run_ffmpeg_merge(video_path: str, audio_path: str, output_path: str): audio_duration = _probe_media_duration(audio_path) command = [ "ffmpeg", "-y", "-loglevel", "error", "-stream_loop", "-1", "-i", video_path, "-i", audio_path, ] if audio_duration: command.extend(["-t", f"{audio_duration:.3f}"]) command += [ "-map", "0:v:0", "-map", "1:a:0", "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-pix_fmt", "yuv420p", "-c:a", "aac", "-shortest", "-movflags", "+faststart", output_path, ] try: with _semaphore_guard(_sing_merge_semaphore): subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError as exc: raise HTTPException(status_code=500, detail="ffmpeg 未安装或不可用") from exc except subprocess.CalledProcessError as exc: stderr = exc.stderr.decode("utf-8", errors="ignore") if exc.stderr else "" raise HTTPException(status_code=502, detail=f"ffmpeg 合成失败: {stderr[:200]}") from exc def _strip_video_audio(video_bytes: bytes) -> bytes: with tempfile.TemporaryDirectory() as tmpdir: input_path = os.path.join(tmpdir, "input.mp4") output_path = os.path.join(tmpdir, "output.mp4") with open(input_path, "wb") as file_handle: file_handle.write(video_bytes) command = [ "ffmpeg", "-y", "-loglevel", "error", "-i", input_path, "-map", "0:v:0", "-c:v", "copy", "-an", "-movflags", "+faststart", output_path, ] try: with _semaphore_guard(_sing_merge_semaphore): subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError as exc: raise HTTPException(status_code=500, detail="ffmpeg 未安装或不可用") from exc except subprocess.CalledProcessError: fallback = [ "ffmpeg", "-y", "-loglevel", "error", "-i", input_path, "-map", "0:v:0", "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-pix_fmt", "yuv420p", "-an", "-movflags", "+faststart", output_path, ] try: with _semaphore_guard(_sing_merge_semaphore): subprocess.run(fallback, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError as exc: raise HTTPException(status_code=500, detail="ffmpeg 未安装或不可用") from exc except subprocess.CalledProcessError as exc: stderr = exc.stderr.decode("utf-8", errors="ignore") if exc.stderr else "" raise HTTPException(status_code=502, detail=f"ffmpeg 去除音轨失败: {stderr[:200]}") from exc with open(output_path, "rb") as file_handle: return file_handle.read() def _merge_audio_video(base_video_url: str, audio_url: str) -> bytes: with tempfile.TemporaryDirectory() as tmpdir: video_path = os.path.join(tmpdir, "base.mp4") audio_path = os.path.join(tmpdir, "audio.mp3") output_path = os.path.join(tmpdir, "merged.mp4") _download_to_path(base_video_url, video_path) _download_to_path(audio_url, audio_path) _run_ffmpeg_merge(video_path, audio_path, output_path) with open(output_path, "rb") as file_handle: return file_handle.read() def _extract_audio_segment( input_path: str, start_sec: float, duration_sec: float, output_path: str, ): command = [ "ffmpeg", "-y", "-loglevel", "error", "-i", input_path, "-ss", f"{start_sec:.3f}", "-t", f"{duration_sec:.3f}", "-vn", "-acodec", "libmp3lame", "-b:a", "128k", "-ar", "44100", "-ac", "2", output_path, ] try: with _semaphore_guard(_sing_merge_semaphore): subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError as exc: raise HTTPException(status_code=500, detail="ffmpeg 未安装或不可用") from exc except subprocess.CalledProcessError as exc: stderr = exc.stderr.decode("utf-8", errors="ignore") if exc.stderr else "" raise HTTPException(status_code=502, detail=f"音频分段失败: {stderr[:200]}") from exc def _pad_audio_segment( input_path: str, pad_sec: float, target_duration_sec: float, output_path: str, ): if pad_sec <= 0: return command = [ "ffmpeg", "-y", "-loglevel", "error", "-i", input_path, "-af", f"apad=pad_dur={pad_sec:.3f}", "-t", f"{target_duration_sec:.3f}", "-acodec", "libmp3lame", "-b:a", "128k", "-ar", "44100", "-ac", "2", output_path, ] try: with _semaphore_guard(_sing_merge_semaphore): subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError as exc: raise HTTPException(status_code=500, detail="ffmpeg 未安装或不可用") from exc except subprocess.CalledProcessError as exc: stderr = exc.stderr.decode("utf-8", errors="ignore") if exc.stderr else "" raise HTTPException(status_code=502, detail=f"音频补齐失败: {stderr[:200]}") from exc def _trim_video_duration(input_path: str, target_duration_sec: float, output_path: str): command = [ "ffmpeg", "-y", "-loglevel", "error", "-i", input_path, "-t", f"{target_duration_sec:.3f}", "-c", "copy", "-movflags", "+faststart", output_path, ] try: with _semaphore_guard(_sing_merge_semaphore): subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) return except FileNotFoundError as exc: raise HTTPException(status_code=500, detail="ffmpeg 未安装或不可用") from exc except subprocess.CalledProcessError: pass fallback = [ "ffmpeg", "-y", "-loglevel", "error", "-i", input_path, "-t", f"{target_duration_sec:.3f}", "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-pix_fmt", "yuv420p", "-c:a", "aac", "-movflags", "+faststart", output_path, ] try: with _semaphore_guard(_sing_merge_semaphore): subprocess.run(fallback, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.CalledProcessError as exc: stderr = exc.stderr.decode("utf-8", errors="ignore") if exc.stderr else "" raise HTTPException(status_code=502, detail=f"视频裁剪失败: {stderr[:200]}") from exc def _trim_video_bytes(video_bytes: bytes, target_duration_sec: float) -> bytes: if target_duration_sec <= 0: return video_bytes with tempfile.TemporaryDirectory() as tmpdir: input_path = os.path.join(tmpdir, "input.mp4") output_path = os.path.join(tmpdir, "trimmed.mp4") with open(input_path, "wb") as file_handle: file_handle.write(video_bytes) _trim_video_duration(input_path, target_duration_sec, output_path) with open(output_path, "rb") as file_handle: return file_handle.read() def _concat_video_files(video_paths: list[str], output_path: str): if not video_paths: raise HTTPException(status_code=500, detail="合成视频列表为空") list_path = None try: with tempfile.NamedTemporaryFile("w", delete=False) as list_file: list_path = list_file.name for path in video_paths: list_file.write(f"file '{path}'\n") command = [ "ffmpeg", "-y", "-loglevel", "error", "-f", "concat", "-safe", "0", "-i", list_path, "-c", "copy", "-movflags", "+faststart", output_path, ] try: with _semaphore_guard(_sing_merge_semaphore): subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except subprocess.CalledProcessError: fallback = [ "ffmpeg", "-y", "-loglevel", "error", "-f", "concat", "-safe", "0", "-i", list_path, "-c:v", "libx264", "-preset", "veryfast", "-crf", "23", "-pix_fmt", "yuv420p", "-c:a", "aac", "-movflags", "+faststart", output_path, ] with _semaphore_guard(_sing_merge_semaphore): subprocess.run(fallback, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) except FileNotFoundError as exc: raise HTTPException(status_code=500, detail="ffmpeg 未安装或不可用") from exc except subprocess.CalledProcessError as exc: stderr = exc.stderr.decode("utf-8", errors="ignore") if exc.stderr else "" raise HTTPException(status_code=502, detail=f"视频拼接失败: {stderr[:200]}") from exc finally: if list_path: try: os.remove(list_path) except OSError: pass def _submit_sing_task( prompt: str, image_url: str, negative_prompt: Optional[str], audio_url: Optional[str], model: str, resolution: str, duration: int, ) -> str: """ 调用 i2v 模型,传入 image_url + prompt,返回 dashscope task_id。 """ if not settings.DASHSCOPE_API_KEY: raise HTTPException(status_code=500, detail="未配置 DASHSCOPE_API_KEY") base_prompt = prompt or SING_BASE_PROMPT input_obj = { "prompt": base_prompt, "negative_prompt": negative_prompt, "img_url": image_url, } if audio_url: input_obj["audio_url"] = audio_url parameters = { "resolution": resolution, "duration": duration, "prompt_extend": SING_BASE_PROMPT_EXTEND, } if model == SING_WAN26_MODEL: parameters["audio"] = True input_payload = { "model": model, "input": input_obj, "parameters": parameters, } headers = { "X-DashScope-Async": "enable", "Authorization": f"Bearer {settings.DASHSCOPE_API_KEY}", "Content-Type": "application/json", } try: resp = requests.post( "https://dashscope.aliyuncs.com/api/v1/services/aigc/video-generation/video-synthesis", headers=headers, json=input_payload, timeout=10, ) except Exception as exc: raise HTTPException(status_code=502, detail="调用视频生成接口失败") from exc if resp.status_code != 200: msg = resp.text try: msg = resp.json().get("message") or msg except Exception: pass raise HTTPException(status_code=502, detail=f"视频任务提交失败: {msg}") try: data = resp.json() except Exception as exc: raise HTTPException(status_code=502, detail="视频任务返回解析失败") from exc task_id = ( data.get("output", {}).get("task_id") or data.get("task_id") or data.get("output", {}).get("id") ) if not task_id: raise HTTPException(status_code=502, detail="视频任务未返回 task_id") return str(task_id) def _submit_emo_video( image_url: str, audio_url: str, face_bbox: list, ext_bbox: list, style_level: str, ) -> str: if not settings.DASHSCOPE_API_KEY: raise HTTPException(status_code=500, detail="未配置 DASHSCOPE_API_KEY") input_obj = { "image_url": image_url, "audio_url": audio_url, "face_bbox": face_bbox, "ext_bbox": ext_bbox, } input_payload = { "model": EMO_MODEL, "input": input_obj, "parameters": {"style_level": style_level}, } headers = { "X-DashScope-Async": "enable", "Authorization": f"Bearer {settings.DASHSCOPE_API_KEY}", "Content-Type": "application/json", } try: resp = requests.post( "https://dashscope.aliyuncs.com/api/v1/services/aigc/image2video/video-synthesis", headers=headers, json=input_payload, timeout=15, ) except Exception as exc: raise HTTPException(status_code=502, detail="调用EMO视频生成失败") from exc if resp.status_code != 200: msg = resp.text try: msg = resp.json().get("message") or msg except Exception: pass raise HTTPException(status_code=502, detail=f"EMO视频任务提交失败: {msg}") try: data = resp.json() except Exception as exc: raise HTTPException(status_code=502, detail="EMO视频任务返回解析失败") from exc task_id = ( data.get("output", {}).get("task_id") or data.get("task_id") or data.get("output", {}).get("id") ) if not task_id: raise HTTPException(status_code=502, detail="EMO视频任务未返回 task_id") return str(task_id) def _poll_video_url(task_id: str, timeout_seconds: int = 360) -> str: headers = {"Authorization": f"Bearer {settings.DASHSCOPE_API_KEY}"} query_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}" # 视频任务排队时间可能较长,放宽到指定超时 deadline = time.time() + max(60, timeout_seconds) while time.time() < deadline: time.sleep(3) try: resp = requests.get(query_url, headers=headers, timeout=8) except Exception: continue if resp.status_code != 200: continue try: data = resp.json() except Exception: continue output = data.get("output") or {} status_str = str( output.get("task_status") or data.get("task_status") or data.get("status") or "" ).upper() if status_str == "SUCCEEDED": results = output.get("results") or {} url = ( results.get("video_url") or output.get("video_url") or data.get("video_url") or data.get("output", {}).get("video_url") ) if not url: raise HTTPException(status_code=502, detail="视频生成成功但未返回结果 URL") return url if status_str == "FAILED": code = output.get("code") or data.get("code") msg = output.get("message") or data.get("message") or "生成失败" if code: msg = f"{code}: {msg}" raise HTTPException(status_code=502, detail=f"视频生成失败: {msg}") raise HTTPException(status_code=504, detail="视频生成超时,请稍后重试") def _query_dashscope_task_status(task_id: str) -> tuple[str, Optional[str], Optional[str]]: if not settings.DASHSCOPE_API_KEY: return "UNKNOWN", None, "未配置 DASHSCOPE_API_KEY" headers = {"Authorization": f"Bearer {settings.DASHSCOPE_API_KEY}"} query_url = f"https://dashscope.aliyuncs.com/api/v1/tasks/{task_id}" try: resp = requests.get(query_url, headers=headers, timeout=8) except Exception: return "UNKNOWN", None, "请求失败" if resp.status_code != 200: return "UNKNOWN", None, resp.text try: data = resp.json() except Exception: return "UNKNOWN", None, "响应解析失败" output = data.get("output") or {} status_str = str( output.get("task_status") or data.get("task_status") or data.get("status") or "" ).upper() if status_str == "SUCCEEDED": results = output.get("results") or {} url = ( results.get("video_url") or output.get("video_url") or data.get("video_url") or data.get("output", {}).get("video_url") ) if not url: return "FAILED", None, "视频生成成功但未返回结果 URL" return "SUCCEEDED", url, None if status_str == "FAILED": code = output.get("code") or data.get("code") msg = output.get("message") or data.get("message") or "生成失败" if code: msg = f"{code}: {msg}" return "FAILED", None, msg if status_str: return status_str, None, None return "UNKNOWN", None, None def _try_backfill_segment_video(segment_video_id: int, dashscope_task_id: Optional[str] = None) -> Optional[SongSegmentVideo]: with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter(SongSegmentVideo.id == segment_video_id) .first() ) if not segment_video or segment_video.status != "running": return None task_id = dashscope_task_id or segment_video.dashscope_task_id if not task_id: return None segment = ( db.query(SongSegment) .filter(SongSegment.id == segment_video.segment_id) .first() ) segment_index = segment.segment_index if segment else segment_video.segment_id info = { "task_id": task_id, "segment_index": segment_index, "lover_id": segment_video.lover_id, "song_id": segment_video.song_id, "image_hash": segment_video.image_hash, "style_level": segment_video.style_level, "duration_ms": segment.duration_ms if segment else None, } status, dash_video_url, error_msg = _query_dashscope_task_status(info["task_id"]) if status == "SUCCEEDED" and dash_video_url: try: video_bytes = _download_binary(dash_video_url) min_len_ms = int((settings.EMO_MIN_SEGMENT_SECONDS or 0) * 1000) duration_ms = info.get("duration_ms") or 0 if min_len_ms > 0 and duration_ms > 0 and duration_ms < min_len_ms: video_bytes = _trim_video_bytes(video_bytes, duration_ms / 1000.0) object_name = ( f"lover/{info['lover_id']}/sing/segments/" f"{info['song_id']}_{info['image_hash']}_{info['style_level']}_{info['segment_index']}.mp4" ) segment_video_url = _upload_to_oss(video_bytes, object_name) except Exception as exc: with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter(SongSegmentVideo.id == segment_video_id) .with_for_update() .first() ) if segment_video and segment_video.status == "running": segment_video.status = "failed" segment_video.error_msg = (_extract_error_text(exc) or "生成失败")[:255] segment_video.updated_at = datetime.utcnow() db.add(segment_video) db.commit() return None with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter(SongSegmentVideo.id == segment_video_id) .with_for_update() .first() ) if segment_video and segment_video.status == "running": segment_video.video_url = segment_video_url segment_video.status = "succeeded" segment_video.error_msg = None segment_video.updated_at = datetime.utcnow() db.add(segment_video) db.commit() return segment_video return None if status == "FAILED": with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter(SongSegmentVideo.id == segment_video_id) .with_for_update() .first() ) if segment_video and segment_video.status == "running": segment_video.status = "failed" segment_video.error_msg = str(error_msg or "生成失败")[:255] segment_video.updated_at = datetime.utcnow() db.add(segment_video) db.commit() return segment_video return None return None def _wait_for_base_video(base_id: int, timeout: int) -> Optional[SingBaseVideo]: deadline = time.time() + timeout while time.time() < deadline: with SessionLocal() as db: base = ( db.query(SingBaseVideo) .filter(SingBaseVideo.id == base_id) .first() ) if base and base.status in ("succeeded", "failed"): return base time.sleep(3) return None def _wait_for_merge_video(merge_id: int, timeout: int) -> Optional[SingSongVideo]: deadline = time.time() + timeout while time.time() < deadline: with SessionLocal() as db: merge = ( db.query(SingSongVideo) .filter(SingSongVideo.id == merge_id) .first() ) if merge and merge.status in ("succeeded", "failed"): return merge time.sleep(3) return None def _wait_for_segment_video(segment_video_id: int, timeout: int) -> Optional[SongSegmentVideo]: deadline = time.time() + timeout last_backfill_at = 0.0 while time.time() < deadline: with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter(SongSegmentVideo.id == segment_video_id) .first() ) if segment_video and segment_video.status in ("succeeded", "failed"): return segment_video dash_task_id = segment_video.dashscope_task_id if segment_video else None now = time.time() if dash_task_id and now - last_backfill_at >= EMO_BACKFILL_MIN_INTERVAL_SECONDS: updated = _try_backfill_segment_video(segment_video_id, dash_task_id) last_backfill_at = now if updated and updated.status in ("succeeded", "failed"): return updated time.sleep(3) return None def _backfill_running_segments(payload: dict): song_id = payload.get("song_id") image_hash = payload.get("image_hash") if not song_id or not image_hash: return ratio = payload.get("ratio") or EMO_RATIO style_level = payload.get("style_level") or EMO_STYLE_LEVEL cutoff = datetime.utcnow() - timedelta(seconds=EMO_BACKFILL_STALE_SECONDS) with SessionLocal() as db: running_segments = ( db.query(SongSegmentVideo) .filter( SongSegmentVideo.song_id == song_id, SongSegmentVideo.image_hash == image_hash, SongSegmentVideo.ratio == ratio, SongSegmentVideo.style_level == style_level, SongSegmentVideo.model == EMO_MODEL, SongSegmentVideo.status == "running", SongSegmentVideo.dashscope_task_id.isnot(None), SongSegmentVideo.updated_at <= cutoff, ) .order_by(SongSegmentVideo.id.asc()) .limit(2) .all() ) segment_ids = [segment.id for segment in running_segments] segment_tasks = [segment.dashscope_task_id for segment in running_segments] for segment_id, task_id in zip(segment_ids, segment_tasks): _try_backfill_segment_video(segment_id, task_id) def _download_binary(url: str) -> bytes: try: resp = requests.get(url, timeout=30) except Exception as exc: raise HTTPException(status_code=502, detail="唱歌视频下载失败") from exc if resp.status_code != 200: raise HTTPException(status_code=502, detail="唱歌视频下载失败") return resp.content def _build_emo_segment_plan(duration: float) -> list[dict]: if duration <= 0: return [] max_len = float(EMO_SEGMENT_SECONDS) segment_count = max(1, math.ceil(duration / max_len)) segments = [] for idx in range(segment_count): start_sec = idx * max_len remaining = max(0.0, duration - start_sec) segment_duration = min(max_len, remaining) if segment_duration <= 0: break segments.append({"start_sec": start_sec, "duration_sec": segment_duration}) return segments def _fetch_complete_segments(song_id: int, audio_hash: str, expected_count: int) -> Optional[list[dict]]: if expected_count <= 0: return None min_len_ms = int((settings.EMO_MIN_SEGMENT_SECONDS or 0) * 1000) with SessionLocal() as db: segments = ( db.query(SongSegment) .filter(SongSegment.song_id == song_id, SongSegment.audio_hash == audio_hash) .order_by(SongSegment.segment_index.asc()) .all() ) if len(segments) != expected_count: return None segments_map = {segment.segment_index: segment for segment in segments} for idx in range(1, expected_count + 1): segment = segments_map.get(idx) if not segment or segment.status != "succeeded" or not segment.audio_url: return None return [ { "id": segment.id, "segment_index": segment.segment_index, "audio_url": segment.audio_url, "duration_ms": segment.duration_ms, "emo_duration_ms": max(segment.duration_ms, min_len_ms) if min_len_ms > 0 else segment.duration_ms, } for segment in segments ] def _ensure_song_segments( song_id: int, audio_url: str, audio_hash_hint: Optional[str], duration_sec_hint: Optional[int], ) -> tuple[list[dict], str, int]: if audio_hash_hint and duration_sec_hint: expected_count = max(1, math.ceil(duration_sec_hint / EMO_SEGMENT_SECONDS)) existing = _fetch_complete_segments(song_id, audio_hash_hint, expected_count) if existing: return existing, audio_hash_hint, duration_sec_hint with tempfile.TemporaryDirectory() as tmpdir: input_path = os.path.join(tmpdir, "song_audio") _download_to_path(audio_url, input_path) audio_hash = _hash_file(input_path) duration = _probe_media_duration(input_path) if not duration: raise HTTPException(status_code=502, detail="音频时长获取失败") duration_sec = int(math.ceil(duration)) segment_plan = _build_emo_segment_plan(duration) expected_count = len(segment_plan) or 1 with SessionLocal() as db: song = db.query(SongLibrary).filter(SongLibrary.id == song_id).with_for_update().first() if song: song.audio_hash = audio_hash song.duration_sec = duration_sec db.add(song) db.commit() existing_segments = {} with SessionLocal() as db: segments = ( db.query(SongSegment) .filter(SongSegment.song_id == song_id, SongSegment.audio_hash == audio_hash) .all() ) existing_segments = {segment.segment_index: segment for segment in segments} output_segments: list[dict] = [] min_len_ms = int((settings.EMO_MIN_SEGMENT_SECONDS or 0) * 1000) for idx, plan in enumerate(segment_plan, start=1): start_sec = plan["start_sec"] segment_duration = plan["duration_sec"] if segment_duration <= 0: continue segment = existing_segments.get(idx) if segment and segment.status == "succeeded" and segment.audio_url: expected_start_ms = int(start_sec * 1000) expected_duration_ms = int(segment_duration * 1000) if segment.start_ms == expected_start_ms and segment.duration_ms == expected_duration_ms: emo_duration_ms = segment.duration_ms if min_len_ms > 0 and segment.duration_ms < min_len_ms: emo_duration_ms = min_len_ms output_segments.append( { "id": segment.id, "segment_index": segment.segment_index, "audio_url": segment.audio_url, "duration_ms": segment.duration_ms, "emo_duration_ms": emo_duration_ms, } ) continue start_ms = int(start_sec * 1000) duration_ms = int(segment_duration * 1000) output_path = os.path.join(tmpdir, f"segment_{idx}.mp3") with SessionLocal() as db: segment = ( db.query(SongSegment) .filter( SongSegment.song_id == song_id, SongSegment.audio_hash == audio_hash, SongSegment.segment_index == idx, ) .with_for_update() .first() ) if not segment: segment = SongSegment( song_id=song_id, audio_hash=audio_hash, segment_index=idx, start_ms=start_ms, duration_ms=duration_ms, audio_url="", status="running", ) db.add(segment) db.flush() else: segment.status = "running" segment.error_msg = None segment.start_ms = start_ms segment.duration_ms = duration_ms db.add(segment) db.query(SongSegmentVideo).filter(SongSegmentVideo.segment_id == segment.id).delete( synchronize_session=False ) db.commit() try: _extract_audio_segment(input_path, start_sec, segment_duration, output_path) with open(output_path, "rb") as file_handle: segment_bytes = file_handle.read() object_name = f"song/{song_id}/segments/{audio_hash}_{idx}.mp3" segment_url = _upload_to_oss(segment_bytes, object_name) audio_size = os.path.getsize(output_path) with SessionLocal() as db: segment = ( db.query(SongSegment) .filter( SongSegment.song_id == song_id, SongSegment.audio_hash == audio_hash, SongSegment.segment_index == idx, ) .with_for_update() .first() ) if not segment: raise HTTPException(status_code=500, detail="分段记录丢失") segment.audio_url = segment_url segment.audio_size = audio_size segment.status = "succeeded" segment.error_msg = None db.add(segment) db.flush() segment_id = segment.id db.commit() output_segments.append( { "id": segment_id, "segment_index": idx, "audio_url": segment_url, "duration_ms": duration_ms, "emo_duration_ms": max(duration_ms, min_len_ms) if min_len_ms > 0 else duration_ms, } ) except Exception as exc: with SessionLocal() as db: segment = ( db.query(SongSegment) .filter( SongSegment.song_id == song_id, SongSegment.audio_hash == audio_hash, SongSegment.segment_index == idx, ) .with_for_update() .first() ) if segment: segment.status = "failed" segment.error_msg = (str(exc) or "生成失败")[:255] db.add(segment) db.commit() raise output_segments.sort(key=lambda item: item["segment_index"]) return output_segments, audio_hash, duration_sec def _should_enqueue_task(task_id: int) -> bool: now = time.time() with _sing_enqueue_lock: last = _sing_last_enqueue_at.get(task_id) if last and now - last < SING_REQUEUE_COOLDOWN_SECONDS: return False _sing_last_enqueue_at[task_id] = now if len(_sing_last_enqueue_at) > 2000: cutoff = now - 3600 for key in list(_sing_last_enqueue_at.keys()): if _sing_last_enqueue_at.get(key, 0) < cutoff: _sing_last_enqueue_at.pop(key, None) return True def _enqueue_sing_task(task_id: int): if not _should_enqueue_task(task_id): return False return sing_task_queue.enqueue_unique(f"sing:{task_id}", _process_sing_task, task_id) def _next_seq(db: Session, session_id: int) -> int: last_msg = ( db.query(ChatMessage) .filter(ChatMessage.session_id == session_id) .order_by(ChatMessage.seq.desc()) .first() ) return (last_msg.seq if last_msg and last_msg.seq else 0) + 1 def _mark_task_failed(task_id: int, msg: str): """标记任务失败,并更新占位消息。使用独立会话,避免已关闭会话导致的 Detached 异常。""" with SessionLocal() as db: task = ( db.query(GenerationTask) .filter(GenerationTask.id == task_id) .with_for_update() .first() ) if not task: return payload = task.payload or {} lover_msg_id = payload.get("lover_message_id") session_id = payload.get("session_id") if lover_msg_id: lover_msg = ( db.query(ChatMessage) .filter(ChatMessage.id == lover_msg_id) .with_for_update() .first() ) if lover_msg: lover_msg.content = f"唱歌视频生成失败:{msg}" lover_msg.extra = { **(lover_msg.extra or {}), "generation_status": "failed", "error_msg": msg, "generation_task_id": task_id, } lover_msg.tts_status = lover_msg.tts_status or "pending" db.add(lover_msg) if session_id: session = ( db.query(ChatSession) .filter(ChatSession.id == session_id) .with_for_update() .first() ) if session: session.last_message_at = datetime.utcnow() db.add(session) task.status = "failed" task.error_msg = msg[:255] task.updated_at = datetime.utcnow() db.add(task) db.commit() def _mark_task_content_blocked(task_id: int, msg: str): """内容安全拦截:直接提示文案,不拼接失败前缀。""" with SessionLocal() as db: task = ( db.query(GenerationTask) .filter(GenerationTask.id == task_id) .with_for_update() .first() ) if not task: return payload = task.payload or {} lover_msg_id = payload.get("lover_message_id") session_id = payload.get("session_id") if lover_msg_id: lover_msg = ( db.query(ChatMessage) .filter(ChatMessage.id == lover_msg_id) .with_for_update() .first() ) if lover_msg: lover_msg.content = msg lover_msg.extra = { **(lover_msg.extra or {}), "generation_status": "failed", "error_msg": msg, "generation_task_id": task_id, "content_safety_blocked": True, } lover_msg.tts_status = lover_msg.tts_status or "pending" db.add(lover_msg) if session_id: session = ( db.query(ChatSession) .filter(ChatSession.id == session_id) .with_for_update() .first() ) if session: session.last_message_at = datetime.utcnow() db.add(session) task.status = "failed" task.error_msg = msg[:255] task.payload = {**payload, "content_safety_blocked": True} task.updated_at = datetime.utcnow() db.add(task) db.commit() def _sanitize_resolution(resolution: Optional[str]) -> str: """wan2.2-i2v-flash 支持 480P/720P/1080P,非法值回落 480P。""" if not resolution: return "480P" upper = str(resolution).upper() if upper in ("480P", "720P", "1080P"): return upper return "480P" class SongOut(BaseModel): id: int title: str artist: Optional[str] = None gender: str audio_url: str model_config = ConfigDict(from_attributes=True) class SongListResponse(BaseModel): songs: List[SongOut] class SingGenerateIn(BaseModel): song_id: int = Field(..., description="歌曲ID(nf_song_library.id)") class SingTaskStatusOut(BaseModel): generation_task_id: int status: str = Field(..., description="pending|running|succeeded|failed") dashscope_task_id: str = "" video_url: str = "" session_id: int = 0 user_message_id: int = 0 lover_message_id: int = 0 error_msg: Optional[str] = None @router.get("/songs", response_model=ApiResponse[SongListResponse]) def list_songs_for_lover( db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): """ 返回当前用户恋人性别可用的歌曲列表。 """ lover = db.query(Lover).filter(Lover.user_id == user.id).first() if not lover: raise HTTPException(status_code=404, detail="恋人未找到") if lover.gender not in ("male", "female"): raise HTTPException(status_code=400, detail="恋人性别异常,请重新选择性别") songs = ( db.query(SongLibrary) .filter( SongLibrary.gender == lover.gender, SongLibrary.status.is_(True), SongLibrary.deletetime.is_(None), ) .order_by(SongLibrary.weigh.desc(), SongLibrary.id.desc()) .all() ) if not songs: raise HTTPException(status_code=404, detail="暂无可用歌曲,请稍后重试") return success_response(SongListResponse(songs=songs), msg="歌曲列表获取成功") def _get_or_create_session(db: Session, user: AuthedUser, lover: Lover, session_id: Optional[int]) -> ChatSession: if session_id: session = ( db.query(ChatSession) .filter(ChatSession.id == session_id, ChatSession.user_id == user.id) .first() ) if not session: raise HTTPException(status_code=404, detail="会话不存在") return session active_sessions = ( db.query(ChatSession) .filter(ChatSession.user_id == user.id, ChatSession.lover_id == lover.id, ChatSession.status == "active") .with_for_update() .order_by(ChatSession.created_at.desc()) .all() ) if active_sessions: primary = active_sessions[0] for extra in active_sessions[1:]: if extra.status == "active": extra.status = "archived" db.add(extra) return primary now = datetime.utcnow() session = ChatSession( user_id=user.id, lover_id=lover.id, model=settings.LLM_MODEL or "qwen-flash", status="active", last_message_at=now, created_at=now, updated_at=now, inner_voice_enabled=False, ) db.add(session) db.flush() return session def _process_sing_task(task_id: int): """ 后台处理唱歌视频生成任务:分段音频 -> EMO 逐段生成 -> 拼接整曲。 """ song_title: str = "" image_url: str = "" audio_url: str = "" session_id: Optional[int] = None user_message_id: Optional[int] = None lover_message_id: Optional[int] = None user_id: Optional[int] = None lover_id: Optional[int] = None song_id: Optional[int] = None image_hash: str = "" merged_video_url: str = "" merge_id: Optional[int] = None ratio: str = EMO_RATIO style_level: str = EMO_STYLE_LEVEL face_bbox: Optional[list] = None ext_bbox: Optional[list] = None audio_hash_hint: Optional[str] = None duration_sec_hint: Optional[int] = None try: db = SessionLocal() task = ( db.query(GenerationTask) .filter(GenerationTask.id == task_id) .with_for_update() .first() ) if not task or task.status in ("succeeded", "failed"): db.rollback() return user_id = task.user_id lover_id = task.lover_id payload = task.payload or {} song_id = payload.get("song_id") song_title = payload.get("song_title") or "" image_url = payload.get("image_url") or "" audio_url = payload.get("audio_url") or "" session_id = payload.get("session_id") user_message_id = payload.get("user_message_id") lover_message_id = payload.get("lover_message_id") ratio = payload.get("ratio") or EMO_RATIO style_level = payload.get("style_level") or EMO_STYLE_LEVEL face_bbox = payload.get("face_bbox") ext_bbox = payload.get("ext_bbox") image_hash = payload.get("image_hash") or _hash_text(image_url or "") lover = db.query(Lover).filter(Lover.id == lover_id).first() user_row = db.query(User).filter(User.id == user_id).with_for_update().first() song = None if song_id: song = ( db.query(SongLibrary) .filter( SongLibrary.id == song_id, SongLibrary.status.is_(True), SongLibrary.deletetime.is_(None), ) .first() ) if not lover: raise HTTPException(status_code=404, detail="恋人不存在,请重新创建") if not lover.image_url: raise HTTPException(status_code=400, detail="请先生成并确认恋人形象") if not user_row: raise HTTPException(status_code=404, detail="用户不存在") if not song: raise HTTPException(status_code=404, detail="歌曲未找到或已下架") if song.gender != lover.gender: raise HTTPException(status_code=400, detail="歌曲版本性别与恋人不匹配") song_id = song.id if (user_row.video_gen_remaining or 0) <= 0: raise HTTPException(status_code=400, detail="视频生成次数不足") image_url = _cdnize(image_url or lover.image_url) audio_url = _cdnize(audio_url or song.audio_url) if not audio_url: raise HTTPException(status_code=400, detail="音频地址不可用") if not image_url: raise HTTPException(status_code=400, detail="形象地址不可用") if not image_hash: image_hash = _hash_text(image_url) if not song_title: song_title = song.title or "点播歌曲" audio_hash_hint = song.audio_hash duration_sec_hint = song.duration_sec if not face_bbox or not ext_bbox: detect = _ensure_emo_detect_cache(db, lover_id, image_url, image_hash, ratio) if not detect.check_pass: raise HTTPException(status_code=400, detail="恋人形象未通过EMO检测") face_bbox = detect.face_bbox ext_bbox = detect.ext_bbox if not face_bbox or not ext_bbox: raise HTTPException(status_code=502, detail="EMO检测返回缺少人脸或动态区域") task.status = "running" task.updated_at = datetime.utcnow() task.payload = { **payload, "song_id": song.id, "song_title": song_title, "image_url": image_url, "audio_url": audio_url, "image_hash": image_hash, "ratio": ratio, "style_level": style_level, "face_bbox": face_bbox, "ext_bbox": ext_bbox, } db.add(task) db.commit() except HTTPException as exc: try: _mark_task_failed(task_id, str(exc.detail) if hasattr(exc, "detail") else str(exc)) except Exception: pass finally: try: db.close() except Exception: pass return except Exception as exc: try: _mark_task_failed(task_id, str(exc)[:255]) except Exception: pass finally: try: db.close() except Exception: pass return finally: try: db.close() except Exception: pass try: segments, audio_hash, duration_sec = _ensure_song_segments( song_id, audio_url, audio_hash_hint, duration_sec_hint, ) with SessionLocal() as db: task_row = ( db.query(GenerationTask) .filter(GenerationTask.id == task_id) .with_for_update() .first() ) if task_row: task_row.payload = { **(task_row.payload or {}), "audio_hash": audio_hash, "duration_sec": duration_sec, "segment_count": len(segments), } task_row.updated_at = datetime.utcnow() db.add(task_row) db.commit() content_safety_blocked = False with SessionLocal() as db: cached_merge = ( db.query(SingSongVideo) .filter( SingSongVideo.lover_id == lover_id, SingSongVideo.song_id == song_id, SingSongVideo.audio_hash == audio_hash, SingSongVideo.ratio == ratio, SingSongVideo.style_level == style_level, SingSongVideo.status == "succeeded", ) .order_by(SingSongVideo.id.desc()) .first() ) if cached_merge and cached_merge.merged_video_url: merged_video_url = cached_merge.merged_video_url merge_id = cached_merge.id content_safety_blocked = _is_content_safety_error(cached_merge.error_msg) if not merged_video_url: segment_video_urls: list[tuple[int, str]] = [] content_safety_triggered = False for segment in segments: segment_id = segment["id"] segment_index = segment["segment_index"] segment_audio_url = segment.get("audio_url") or "" segment_duration_ms = int(segment.get("duration_ms") or 0) emo_duration_ms = int(segment.get("emo_duration_ms") or segment_duration_ms) existing_running = False segment_video_id = None segment_video_url = "" with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter( SongSegmentVideo.segment_id == segment_id, SongSegmentVideo.image_hash == image_hash, SongSegmentVideo.style_level == style_level, SongSegmentVideo.model == EMO_MODEL, ) .with_for_update() .first() ) if segment_video and segment_video.status == "succeeded" and segment_video.video_url: segment_video_url = segment_video.video_url segment_video_id = segment_video.id elif segment_video and segment_video.status == "running": existing_running = True segment_video_id = segment_video.id elif segment_video and segment_video.status == "failed" and _is_content_safety_error(segment_video.error_msg): content_safety_triggered = True else: if not segment_video: segment_video = SongSegmentVideo( user_id=user_id, lover_id=lover_id, song_id=song_id, segment_id=segment_id, image_hash=image_hash, model=EMO_MODEL, ratio=ratio, style_level=style_level, status="running", ) db.add(segment_video) db.flush() else: segment_video.status = "running" segment_video.error_msg = None db.add(segment_video) db.commit() segment_video_id = segment_video.id if content_safety_triggered: break if not segment_video_url and existing_running and segment_video_id: waited = _wait_for_segment_video(segment_video_id, EMO_TASK_TIMEOUT_SECONDS) if waited and waited.status == "succeeded" and waited.video_url: segment_video_url = waited.video_url elif waited and waited.status == "failed" and _is_content_safety_error(waited.error_msg): content_safety_triggered = True if content_safety_triggered: break if not segment_video_url: try: with _semaphore_guard(_emo_task_semaphore): emo_audio_url = segment_audio_url if ( emo_duration_ms > segment_duration_ms and segment_duration_ms > 0 and segment_audio_url ): with tempfile.TemporaryDirectory() as tmpdir: input_path = os.path.join(tmpdir, f"segment_{segment_index}.mp3") padded_path = os.path.join(tmpdir, f"segment_{segment_index}_emo.mp3") _download_to_path(segment_audio_url, input_path) pad_sec = (emo_duration_ms - segment_duration_ms) / 1000.0 target_sec = emo_duration_ms / 1000.0 _pad_audio_segment(input_path, pad_sec, target_sec, padded_path) with open(padded_path, "rb") as file_handle: padded_bytes = file_handle.read() object_name = ( f"song/{song_id}/segments/" f"{audio_hash}_{segment_index}_emo.mp3" ) emo_audio_url = _upload_to_oss(padded_bytes, object_name) dash_task_id = _submit_emo_video( image_url=image_url, audio_url=emo_audio_url, face_bbox=face_bbox or [], ext_bbox=ext_bbox or [], style_level=style_level, ) with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter(SongSegmentVideo.id == segment_video_id) .with_for_update() .first() ) if segment_video: segment_video.dashscope_task_id = dash_task_id segment_video.status = "running" segment_video.error_msg = None segment_video.updated_at = datetime.utcnow() db.add(segment_video) db.commit() dash_video_url = _poll_video_url(dash_task_id, EMO_TASK_TIMEOUT_SECONDS) video_bytes = _download_binary(dash_video_url) if emo_duration_ms > segment_duration_ms and segment_duration_ms > 0: video_bytes = _trim_video_bytes(video_bytes, segment_duration_ms / 1000.0) object_name = ( f"lover/{lover_id}/sing/segments/" f"{song_id}_{image_hash}_{style_level}_{segment_index}.mp4" ) segment_video_url = _upload_to_oss(video_bytes, object_name) with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter(SongSegmentVideo.id == segment_video_id) .with_for_update() .first() ) if segment_video: segment_video.video_url = segment_video_url segment_video.status = "succeeded" segment_video.error_msg = None segment_video.updated_at = datetime.utcnow() db.add(segment_video) db.commit() except Exception as exc: with SessionLocal() as db: segment_video = ( db.query(SongSegmentVideo) .filter(SongSegmentVideo.id == segment_video_id) .with_for_update() .first() ) if segment_video: segment_video.status = "failed" segment_video.error_msg = (_extract_error_text(exc) or "生成失败")[:255] segment_video.updated_at = datetime.utcnow() db.add(segment_video) db.commit() if _is_content_safety_error(_extract_error_text(exc)): content_safety_triggered = True break raise if content_safety_triggered: break if segment_video_url: segment_video_urls.append((segment_index, segment_video_url)) if content_safety_triggered and not segment_video_urls: _mark_task_content_blocked(task_id, EMO_CONTENT_SAFETY_MESSAGE) return if content_safety_triggered: content_safety_blocked = True segment_video_urls.sort(key=lambda item: item[0]) with tempfile.TemporaryDirectory() as tmpdir: video_paths = [] for idx, url in segment_video_urls: local_path = os.path.join(tmpdir, f"segment_{idx}.mp4") _download_to_path(url, local_path) video_paths.append(local_path) output_path = os.path.join(tmpdir, "merged.mp4") _concat_video_files(video_paths, output_path) with open(output_path, "rb") as file_handle: merged_bytes = file_handle.read() object_name = f"lover/{lover_id}/sing/{int(time.time())}_{song_id}.mp4" merged_video_url = _upload_to_oss(merged_bytes, object_name) with SessionLocal() as db: task_row = ( db.query(GenerationTask) .filter(GenerationTask.id == task_id) .with_for_update() .first() ) if not task_row: return payload = task_row.payload or {} if not session_id: session_id = payload.get("session_id") if not user_message_id: user_message_id = payload.get("user_message_id") if not lover_message_id: lover_message_id = payload.get("lover_message_id") merge_row = ( db.query(SingSongVideo) .filter( SingSongVideo.lover_id == lover_id, SingSongVideo.song_id == song_id, SingSongVideo.audio_hash == audio_hash, SingSongVideo.image_hash == image_hash, SingSongVideo.ratio == ratio, SingSongVideo.style_level == style_level, ) .with_for_update() .first() ) if not merge_row: merge_row = SingSongVideo( user_id=user_id, lover_id=lover_id, song_id=song_id, base_video_id=None, audio_url=audio_url, audio_hash=audio_hash, image_hash=image_hash, ratio=ratio, style_level=style_level, merged_video_url=merged_video_url, status="succeeded", error_msg=EMO_CONTENT_SAFETY_CODE if content_safety_blocked else None, generation_task_id=task_row.id, ) db.add(merge_row) db.flush() else: merge_row.user_id = user_id merge_row.audio_url = audio_url merge_row.audio_hash = audio_hash merge_row.image_hash = image_hash merge_row.ratio = ratio merge_row.style_level = style_level merge_row.merged_video_url = merged_video_url merge_row.status = "succeeded" merge_row.error_msg = EMO_CONTENT_SAFETY_CODE if content_safety_blocked else None merge_row.generation_task_id = task_row.id db.add(merge_row) merge_id = merge_row.id session = None if session_id: session = ( db.query(ChatSession) .filter(ChatSession.id == session_id, ChatSession.user_id == user_id) .with_for_update() .first() ) else: session = _get_or_create_session(db, AuthedUser(id=user_id), Lover(id=lover_id), None) now = datetime.utcnow() if not user_message_id: user_msg = ChatMessage( session_id=session.id, user_id=user_id, lover_id=lover_id, role="user", content_type="text", content=song_title or "点播歌曲", seq=_next_seq(db, session.id), created_at=now, model=settings.LLM_MODEL or "qwen-flash", ) db.add(user_msg) db.flush() else: user_msg = db.query(ChatMessage).filter(ChatMessage.id == user_message_id).first() if not user_msg: user_msg = ChatMessage( session_id=session.id, user_id=user_id, lover_id=lover_id, role="user", content_type="text", content=song_title or "点播歌曲", seq=_next_seq(db, session.id), created_at=now, model=settings.LLM_MODEL or "qwen-flash", ) db.add(user_msg) db.flush() lover_content = _build_sing_message_content(merged_video_url, content_safety_blocked) if not lover_message_id: lover_msg = ChatMessage( session_id=session.id, user_id=user_id, lover_id=lover_id, role="lover", content_type="text", content=lover_content, seq=_next_seq(db, session.id), created_at=datetime.utcnow(), model=settings.LLM_MODEL or "qwen-flash", extra={ "video_url": merged_video_url, "generation_task_id": task_row.id, "song_title": song_title, "generation_status": "succeeded", "content_safety_blocked": content_safety_blocked, }, ) db.add(lover_msg) db.flush() else: lover_msg = ( db.query(ChatMessage) .filter(ChatMessage.id == lover_message_id) .with_for_update() .first() ) if lover_msg: lover_msg.content = lover_content lover_msg.extra = { **(lover_msg.extra or {}), "video_url": merged_video_url, "generation_status": "succeeded", "content_safety_blocked": content_safety_blocked, } db.add(lover_msg) else: lover_msg = ChatMessage( session_id=session.id, user_id=user_id, lover_id=lover_id, role="lover", content_type="text", content=lover_content, seq=_next_seq(db, session.id), created_at=datetime.utcnow(), model=settings.LLM_MODEL or "qwen-flash", extra={ "video_url": merged_video_url, "generation_task_id": task_row.id, "song_title": song_title, "generation_status": "succeeded", "content_safety_blocked": content_safety_blocked, }, ) db.add(lover_msg) db.flush() lover_msg.extra = { **(lover_msg.extra or {}), "video_url": merged_video_url, "generation_status": "succeeded", "generation_task_id": task_row.id, "song_title": song_title, "content_safety_blocked": content_safety_blocked, } lover_msg.tts_status = lover_msg.tts_status or "pending" db.add(lover_msg) already_deducted = (task_row.payload or {}).get("deducted") user_row = db.query(User).filter(User.id == user_id).with_for_update().first() remaining = user_row.video_gen_remaining if user_row else 0 if user_row and remaining > 0 and not already_deducted: user_row.video_gen_remaining = remaining - 1 db.add(user_row) session.last_message_at = datetime.utcnow() db.add(session) task_row.status = "succeeded" task_row.result_url = merged_video_url task_row.payload = { **(task_row.payload or {}), "merged_video_url": merged_video_url, "merge_id": merge_id, "deducted": True, "content_safety_blocked": content_safety_blocked, "session_id": session.id, "user_message_id": user_msg.id, "lover_message_id": lover_msg.id, } task_row.updated_at = datetime.utcnow() db.add(task_row) db.commit() except HTTPException as exc: try: _mark_task_failed(task_id, str(exc.detail) if hasattr(exc, "detail") else str(exc)) except Exception: pass except Exception as exc: try: _mark_task_failed(task_id, str(exc)[:255]) except Exception: pass @router.post("/generate", response_model=ApiResponse[SingTaskStatusOut]) def generate_sing_video( payload: SingGenerateIn, background_tasks: BackgroundTasks, db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): lover = db.query(Lover).filter(Lover.user_id == user.id).first() if not lover: raise HTTPException(status_code=404, detail="恋人不存在,请先完成创建流程") if not lover.image_url: raise HTTPException(status_code=400, detail="请先生成并确认恋人形象") song = ( db.query(SongLibrary) .filter( SongLibrary.id == payload.song_id, SongLibrary.status.is_(True), SongLibrary.deletetime.is_(None), ) .first() ) if not song: raise HTTPException(status_code=404, detail="歌曲未找到或已下架") if song.gender != lover.gender: raise HTTPException(status_code=400, detail="歌曲版本性别与恋人不匹配") user_row = ( db.query(User) .filter(User.id == user.id) .with_for_update() .first() ) if not user_row: raise HTTPException(status_code=404, detail="用户不存在") if (user_row.video_gen_remaining or 0) <= 0: raise HTTPException(status_code=400, detail="视频生成次数不足") pending_task = ( db.query(GenerationTask) .filter( GenerationTask.user_id == user.id, GenerationTask.task_type == "video", GenerationTask.status.in_(["pending", "running"]), ) .first() ) if pending_task: raise HTTPException(status_code=409, detail="已有视频生成任务进行中,请稍后再试") if not song.audio_url: raise HTTPException(status_code=400, detail="该歌曲无可用音频") audio_url = _cdnize(song.audio_url) if not audio_url: raise HTTPException(status_code=400, detail="音频地址不可用") audio_url_hash = _hash_text(audio_url) audio_hash = song.audio_hash or audio_url_hash ratio = EMO_RATIO style_level = EMO_STYLE_LEVEL audio_hash_candidates = [audio_hash] if audio_url_hash not in audio_hash_candidates: audio_hash_candidates.append(audio_url_hash) cached_merge = ( db.query(SingSongVideo) .filter( SingSongVideo.lover_id == lover.id, SingSongVideo.song_id == song.id, SingSongVideo.audio_hash.in_(audio_hash_candidates), SingSongVideo.ratio == ratio, SingSongVideo.style_level == style_level, SingSongVideo.status == "succeeded", ) .order_by(SingSongVideo.id.desc()) .first() ) cached_merge_url = cached_merge.merged_video_url if cached_merge else "" cached_merge_id = cached_merge.id if cached_merge else None cached_merge_audio_hash = cached_merge.audio_hash if cached_merge else None cached_content_safety = _is_content_safety_error(cached_merge.error_msg) if cached_merge else False image_url = _cdnize(lover.image_url) if not image_url: raise HTTPException(status_code=400, detail="形象地址不可用") image_hash = _hash_text(image_url) idem_key_src = f"sing:{user.id}:{song.id}:{audio_hash}:{ratio}:{style_level}" idem_key = hashlib.sha256(idem_key_src.encode("utf-8")).hexdigest() if cached_merge_url: task = GenerationTask( user_id=user.id, lover_id=lover.id, task_type="video", status="succeeded", idempotency_key=idem_key, result_url=cached_merge_url, payload={ "image_url": image_url, "audio_url": audio_url, "song_id": song.id, "song_title": song.title, "image_hash": image_hash, "audio_hash": cached_merge_audio_hash or audio_hash, "ratio": ratio, "style_level": style_level, "merged_video_url": cached_merge_url, "merge_id": cached_merge_id, "deducted": True, "content_safety_blocked": cached_content_safety, }, created_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) db.add(task) try: db.flush() except IntegrityError: db.rollback() existing = ( db.query(GenerationTask) .filter(GenerationTask.idempotency_key == idem_key) .first() ) if existing and existing.status in ("pending", "running"): raise HTTPException(status_code=409, detail="已有视频生成任务进行中,请稍后再试") retry_key = hashlib.sha256(f"{idem_key}:{time.time()}".encode()).hexdigest() task.idempotency_key = retry_key db.add(task) db.flush() session = _get_or_create_session(db, user, lover, None) now = datetime.utcnow() next_seq = _next_seq(db, session.id) user_msg = ChatMessage( session_id=session.id, user_id=user.id, lover_id=lover.id, role="user", content_type="text", content=song.title or "点播歌曲", seq=next_seq, created_at=now, model=settings.LLM_MODEL or "qwen-flash", ) db.add(user_msg) db.flush() lover_content = _build_sing_message_content(cached_merge_url, cached_content_safety) lover_msg = ChatMessage( session_id=session.id, user_id=user.id, lover_id=lover.id, role="lover", content_type="text", content=lover_content, seq=next_seq + 1, created_at=datetime.utcnow(), model=settings.LLM_MODEL or "qwen-flash", extra={ "generation_task_id": task.id, "generation_status": "succeeded", "song_title": song.title, "video_url": cached_merge_url, "content_safety_blocked": cached_content_safety, }, tts_status="pending", ) db.add(lover_msg) db.flush() session.last_message_at = datetime.utcnow() db.add(session) task.payload = { **(task.payload or {}), "session_id": session.id, "user_message_id": user_msg.id, "lover_message_id": lover_msg.id, } db.add(task) remaining = user_row.video_gen_remaining if user_row else 0 if user_row and remaining > 0: user_row.video_gen_remaining = remaining - 1 db.add(user_row) db.commit() return success_response( SingTaskStatusOut( generation_task_id=task.id, status="succeeded", dashscope_task_id="", video_url=cached_merge_url, session_id=session.id, user_message_id=user_msg.id, lover_message_id=lover_msg.id, error_msg=None, ), msg="视频生成成功", ) detect = _ensure_emo_detect_cache(db, lover.id, image_url, image_hash, ratio) if not detect.check_pass: raise HTTPException(status_code=400, detail="恋人形象未通过EMO检测") if not detect.face_bbox or not detect.ext_bbox: raise HTTPException(status_code=502, detail="EMO检测返回缺少人脸或动态区域") task = GenerationTask( user_id=user.id, lover_id=lover.id, task_type="video", status="pending", idempotency_key=idem_key, payload={ "image_url": image_url, "audio_url": audio_url, "song_id": song.id, "song_title": song.title, "image_hash": image_hash, "audio_hash": audio_hash, "ratio": ratio, "style_level": style_level, "face_bbox": detect.face_bbox, "ext_bbox": detect.ext_bbox, }, created_at=datetime.utcnow(), updated_at=datetime.utcnow(), ) db.add(task) try: db.flush() except IntegrityError: db.rollback() existing = ( db.query(GenerationTask) .filter(GenerationTask.idempotency_key == idem_key) .first() ) if existing and existing.status in ("pending", "running"): raise HTTPException(status_code=409, detail="已提交相同的唱歌视频生成任务,请稍后查看结果") retry_key = hashlib.sha256(f"{idem_key}:{time.time()}".encode()).hexdigest() task.idempotency_key = retry_key db.add(task) db.flush() # 预留会话与消息序号,便于生成完成后不会被后续消息“插队” session = _get_or_create_session(db, user, lover, None) now = datetime.utcnow() next_seq = _next_seq(db, session.id) user_msg = ChatMessage( session_id=session.id, user_id=user.id, lover_id=lover.id, role="user", content_type="text", content=song.title or "点播歌曲", seq=next_seq, created_at=now, model=settings.LLM_MODEL or "qwen-flash", ) db.add(user_msg) db.flush() lover_msg = ChatMessage( session_id=session.id, user_id=user.id, lover_id=lover.id, role="lover", content_type="text", content="正在为你生成唱歌视频,完成后会自动更新此消息", seq=next_seq + 1, created_at=datetime.utcnow(), model=settings.LLM_MODEL or "qwen-flash", extra={ "generation_task_id": task.id, "generation_status": "pending", "song_title": song.title, }, tts_status="pending", ) db.add(lover_msg) db.flush() session.last_message_at = datetime.utcnow() db.add(session) task.payload = { **(task.payload or {}), "session_id": session.id, "user_message_id": user_msg.id, "lover_message_id": lover_msg.id, } db.add(task) db.commit() _enqueue_sing_task(task.id) return success_response( SingTaskStatusOut( generation_task_id=task.id, status="pending", dashscope_task_id="", video_url="", session_id=session.id, user_message_id=user_msg.id, lover_message_id=lover_msg.id, error_msg=None, ), msg="视频生成任务已提交,正在生成", ) @router.get("/generate/{task_id}", response_model=ApiResponse[SingTaskStatusOut]) def get_sing_task( task_id: int, background_tasks: BackgroundTasks, db: Session = Depends(get_db), user: AuthedUser = Depends(get_current_user), ): task = ( db.query(GenerationTask) .filter( GenerationTask.id == task_id, GenerationTask.user_id == user.id, GenerationTask.task_type == "video", ) .first() ) if not task: raise HTTPException(status_code=404, detail="任务不存在") status_msg_map = { "pending": "视频生成中", "running": "视频生成中", "succeeded": "视频生成成功", "failed": "视频生成失败", } resp_msg = status_msg_map.get(task.status or "", "查询成功") if task.status in ("pending", "running"): _enqueue_sing_task(task.id) payload = task.payload or {} _backfill_running_segments(payload) merge_id = payload.get("merge_id") if merge_id: with SessionLocal() as tmp: merge = ( tmp.query(SingSongVideo) .filter(SingSongVideo.id == merge_id) .first() ) if merge and merge.status == "succeeded" and merge.merged_video_url: current = ( tmp.query(GenerationTask) .filter(GenerationTask.id == task.id, GenerationTask.user_id == user.id) .with_for_update() .first() ) if current: content_safety_blocked = _is_content_safety_error(merge.error_msg) or bool( (current.payload or {}).get("content_safety_blocked") ) current.status = "succeeded" current.result_url = merge.merged_video_url current.payload = { **(current.payload or {}), "merged_video_url": merge.merged_video_url, "content_safety_blocked": content_safety_blocked, } current.updated_at = datetime.utcnow() # 更新聊天占位消息与扣减(若未扣) try: lover_msg_id = (current.payload or {}).get("lover_message_id") session_id = (current.payload or {}).get("session_id") if lover_msg_id: lover_msg = ( tmp.query(ChatMessage) .filter(ChatMessage.id == lover_msg_id) .with_for_update() .first() ) if lover_msg: lover_msg.content = _build_sing_message_content( merge.merged_video_url, content_safety_blocked ) lover_msg.extra = { **(lover_msg.extra or {}), "video_url": merge.merged_video_url, "generation_status": "succeeded", "content_safety_blocked": content_safety_blocked, } lover_msg.tts_status = lover_msg.tts_status or "pending" tmp.add(lover_msg) if session_id: session = ( tmp.query(ChatSession) .filter(ChatSession.id == session_id, ChatSession.user_id == user.id) .with_for_update() .first() ) if session: session.last_message_at = datetime.utcnow() tmp.add(session) if not (current.payload or {}).get("deducted"): user_row = ( tmp.query(User) .filter(User.id == current.user_id) .with_for_update() .first() ) if user_row: remaining = user_row.video_gen_remaining or 0 if remaining > 0: user_row.video_gen_remaining = remaining - 1 current.payload = {**(current.payload or {}), "deducted": True} tmp.add(user_row) except Exception: pass tmp.add(current) tmp.commit() task = current resp_msg = status_msg_map.get(task.status or "", resp_msg) payload = task.payload or {} return success_response( SingTaskStatusOut( generation_task_id=task.id, status=task.status, dashscope_task_id=str(payload.get("dashscope_task_id") or ""), video_url=task.result_url or payload.get("merged_video_url") or "", session_id=int(payload.get("session_id") or 0), user_message_id=int(payload.get("user_message_id") or 0), lover_message_id=int(payload.get("lover_message_id") or 0), error_msg=task.error_msg, ), msg=resp_msg, )