#!/usr/bin/env python3 """ 检查唱歌视频生成任务的状态 用法: python check_task.py """ import sys import os # 确保可以导入lover模块 sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # 使用绝对导入 from config import settings from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import GenerationTask, SongSegmentVideo, SongSegment, User, Lover, SongLibrary import json # 创建数据库会话 engine = create_engine( settings.DATABASE_URL, pool_pre_ping=True, pool_recycle=3600, echo=False, ) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) def check_task(task_id: int): """检查任务的详细状态""" db = SessionLocal() try: print("=" * 80) print(f"唱歌视频生成任务{task_id}诊断报告") print("=" * 80) # 1. 查询任务详情 task = db.query(GenerationTask).filter(GenerationTask.id == task_id).first() if not task: print(f"\n❌ 任务{task_id}不存在") return print(f"\n【任务基本信息】") print(f"任务ID: {task.id}") print(f"用户ID: {task.user_id}") print(f"恋人ID: {task.lover_id}") print(f"状态: {task.status}") print(f"错误信息: {task.error_msg or '无'}") print(f"创建时间: {task.created_at}") print(f"更新时间: {task.updated_at}") # 2. 解析payload payload = task.payload or {} print(f"\n【任务详细参数】") print(f"歌曲ID: {payload.get('song_id')}") print(f"歌曲标题: {payload.get('song_title')}") print(f"图片URL: {payload.get('image_url', '')[:80]}...") print(f"音频URL: {payload.get('audio_url', '')[:80]}...") print(f"图片哈希: {payload.get('image_hash')}") print(f"音频哈希: {payload.get('audio_hash')}") print(f"时长(秒): {payload.get('duration_sec')}") print(f"分段数: {payload.get('segment_count')}") print(f"已扣费: {payload.get('deducted', False)}") print(f"内容安全拦截: {payload.get('content_safety_blocked', False)}") # 3. 查询用户信息 user = db.query(User).filter(User.id == task.user_id).first() if user: print(f"\n【用户信息】") print(f"用户ID: {user.id}") print(f"手机号: {user.mobile}") print(f"剩余视频生成次数: {user.video_gen_remaining}") print(f"剩余图片生成次数: {user.image_gen_remaining}") # 4. 查询恋人信息 lover = db.query(Lover).filter(Lover.id == task.lover_id).first() if lover: print(f"\n【恋人信息】") print(f"恋人ID: {lover.id}") print(f"名字: {lover.name}") print(f"性别: {lover.gender}") print(f"形象URL: {(lover.image_url or '')[:80]}...") # 5. 查询歌曲信息 song_id = payload.get('song_id') if song_id: song = db.query(SongLibrary).filter(SongLibrary.id == song_id).first() if song: print(f"\n【歌曲信息】") print(f"歌曲ID: {song.id}") print(f"标题: {song.title}") print(f"艺术家: {song.artist}") print(f"性别: {song.gender}") print(f"时长(秒): {song.duration_sec}") print(f"音频URL: {(song.audio_url or '')[:80]}...") print(f"音频哈希: {song.audio_hash}") print(f"状态: {'正常' if song.status else '已下架'}") # 6. 查询分段视频状态 image_hash = payload.get('image_hash') if song_id and image_hash: segments = ( db.query(SongSegmentVideo, SongSegment) .join(SongSegment, SongSegmentVideo.segment_id == SongSegment.id) .filter( SongSegmentVideo.song_id == song_id, SongSegmentVideo.image_hash == image_hash ) .order_by(SongSegment.segment_index) .all() ) if segments: print(f"\n【分段视频状态】") print(f"共 {len(segments)} 个分段") for seg_video, seg in segments: status_icon = "✅" if seg_video.status == "succeeded" else "❌" if seg_video.status == "failed" else "⏳" print(f"\n {status_icon} 分段 {seg.segment_index + 1}:") print(f" 状态: {seg_video.status}") print(f" 时长: {seg.duration_ms}ms") print(f" DashScope任务ID: {seg_video.dashscope_task_id or '无'}") if seg_video.error_msg: print(f" 错误: {seg_video.error_msg}") if seg_video.video_url: print(f" 视频URL: {seg_video.video_url[:80]}...") else: print(f"\n【分段视频状态】") print(" 未找到分段视频记录") # 7. 总结和建议 print(f"\n{'=' * 80}") print("【诊断建议】") print("=" * 80) if task.status == "failed": if task.error_msg: print(f"\n失败原因: {task.error_msg}") if "内容安全" in task.error_msg or "content" in task.error_msg.lower(): print("\n建议:") print(" 1. 歌词内容可能触发了内容安全审核") print(" 2. 尝试更换其他歌曲") print(" 3. 检查恋人形象是否合规") elif "不足" in task.error_msg: print("\n建议:") print(" 1. 用户视频生成次数不足") print(" 2. 需要充值或购买会员") elif "不存在" in task.error_msg or "未找到" in task.error_msg: print("\n建议:") print(" 1. 检查歌曲或恋人是否已被删除") print(" 2. 重新选择歌曲和恋人") else: print("\n建议:") print(" 1. 检查应用日志获取详细错误堆栈") print(" 2. 验证DashScope API配额") print(" 3. 检查网络连接") print(f" 4. 尝试使用重试接口: POST /sing/retry/{task_id}") else: print("\n未记录具体错误信息,建议:") print(" 1. 查看应用日志文件") print(" 2. 检查分段视频的错误信息") elif task.status == "running": print("\n任务仍在运行中,请稍候...") elif task.status == "pending": print("\n任务等待处理中") elif task.status == "succeeded": print("\n✅ 任务已成功完成") if payload.get('merged_video_url'): print(f"视频URL: {payload['merged_video_url']}") print("\n" + "=" * 80) except Exception as e: print(f"\n❌ 检查过程出错: {e}") import traceback traceback.print_exc() finally: db.close() if __name__ == "__main__": if len(sys.argv) < 2: print("用法: python check_task.py ") print("示例: python check_task.py 382") sys.exit(1) try: task_id = int(sys.argv[1]) check_task(task_id) except ValueError: print("错误: task_id 必须是数字") sys.exit(1)