Ai_GirlFriend/xuniYou/check_task_382.py

172 lines
7.1 KiB
Python
Raw Permalink Normal View History

2026-03-02 18:57:11 +08:00
#!/usr/bin/env python3
"""
检查唱歌视频生成任务382的状态
"""
import sys
import os
# 添加lover目录到路径
lover_path = os.path.join(os.path.dirname(__file__), '..', 'lover')
sys.path.insert(0, lover_path)
# 切换到lover目录以便正确加载配置
os.chdir(lover_path)
from lover.db import SessionLocal
from lover.models import GenerationTask, SongSegmentVideo, SongSegment, User, Lover, SongLibrary
import json
def check_task_382():
"""检查任务382的详细状态"""
db = SessionLocal()
try:
print("=" * 80)
print("唱歌视频生成任务382诊断报告")
print("=" * 80)
# 1. 查询任务详情
task = db.query(GenerationTask).filter(GenerationTask.id == 382).first()
if not task:
print("\n❌ 任务382不存在")
return
print(f"\n【任务基本信息】")
print(f"任务ID: {task.id}")
print(f"用户ID: {task.user_id}")
print(f"恋人ID: {task.lover_id}")
print(f"状态: {task.status}")
print(f"错误信息: {task.error_msg or ''}")
print(f"创建时间: {task.created_at}")
print(f"更新时间: {task.updated_at}")
# 2. 解析payload
payload = task.payload or {}
print(f"\n【任务详细参数】")
print(f"歌曲ID: {payload.get('song_id')}")
print(f"歌曲标题: {payload.get('song_title')}")
print(f"图片URL: {payload.get('image_url', '')[:80]}...")
print(f"音频URL: {payload.get('audio_url', '')[:80]}...")
print(f"图片哈希: {payload.get('image_hash')}")
print(f"音频哈希: {payload.get('audio_hash')}")
print(f"时长(秒): {payload.get('duration_sec')}")
print(f"分段数: {payload.get('segment_count')}")
print(f"已扣费: {payload.get('deducted', False)}")
print(f"内容安全拦截: {payload.get('content_safety_blocked', False)}")
# 3. 查询用户信息
user = db.query(User).filter(User.id == task.user_id).first()
if user:
print(f"\n【用户信息】")
print(f"用户ID: {user.id}")
print(f"手机号: {user.mobile}")
print(f"剩余视频生成次数: {user.video_gen_remaining}")
print(f"剩余图片生成次数: {user.image_gen_remaining}")
# 4. 查询恋人信息
lover = db.query(Lover).filter(Lover.id == task.lover_id).first()
if lover:
print(f"\n【恋人信息】")
print(f"恋人ID: {lover.id}")
print(f"名字: {lover.name}")
print(f"性别: {lover.gender}")
print(f"形象URL: {(lover.image_url or '')[:80]}...")
# 5. 查询歌曲信息
song_id = payload.get('song_id')
if song_id:
song = db.query(SongLibrary).filter(SongLibrary.id == song_id).first()
if song:
print(f"\n【歌曲信息】")
print(f"歌曲ID: {song.id}")
print(f"标题: {song.title}")
print(f"艺术家: {song.artist}")
print(f"性别: {song.gender}")
print(f"时长(秒): {song.duration_sec}")
print(f"音频URL: {(song.audio_url or '')[:80]}...")
print(f"音频哈希: {song.audio_hash}")
print(f"状态: {'正常' if song.status else '已下架'}")
# 6. 查询分段视频状态
image_hash = payload.get('image_hash')
if song_id and image_hash:
segments = (
db.query(SongSegmentVideo, SongSegment)
.join(SongSegment, SongSegmentVideo.segment_id == SongSegment.id)
.filter(
SongSegmentVideo.song_id == song_id,
SongSegmentVideo.image_hash == image_hash
)
.order_by(SongSegment.segment_index)
.all()
)
if segments:
print(f"\n【分段视频状态】")
print(f"{len(segments)} 个分段")
for seg_video, seg in segments:
status_icon = "" if seg_video.status == "succeeded" else "" if seg_video.status == "failed" else ""
print(f"\n {status_icon} 分段 {seg.segment_index + 1}:")
print(f" 状态: {seg_video.status}")
print(f" 时长: {seg.duration_ms}ms")
print(f" DashScope任务ID: {seg_video.dashscope_task_id or ''}")
if seg_video.error_msg:
print(f" 错误: {seg_video.error_msg}")
if seg_video.video_url:
print(f" 视频URL: {seg_video.video_url[:80]}...")
else:
print(f"\n【分段视频状态】")
print(" 未找到分段视频记录")
# 7. 总结和建议
print(f"\n{'=' * 80}")
print("【诊断建议】")
print("=" * 80)
if task.status == "failed":
if task.error_msg:
print(f"\n失败原因: {task.error_msg}")
if "内容安全" in task.error_msg or "content" in task.error_msg.lower():
print("\n建议:")
print(" 1. 歌词内容可能触发了内容安全审核")
print(" 2. 尝试更换其他歌曲")
print(" 3. 检查恋人形象是否合规")
elif "不足" in task.error_msg:
print("\n建议:")
print(" 1. 用户视频生成次数不足")
print(" 2. 需要充值或购买会员")
elif "不存在" in task.error_msg or "未找到" in task.error_msg:
print("\n建议:")
print(" 1. 检查歌曲或恋人是否已被删除")
print(" 2. 重新选择歌曲和恋人")
else:
print("\n建议:")
print(" 1. 检查应用日志获取详细错误堆栈")
print(" 2. 验证DashScope API配额")
print(" 3. 检查网络连接")
print(" 4. 尝试使用重试接口: POST /sing/retry/382")
else:
print("\n未记录具体错误信息,建议:")
print(" 1. 查看应用日志文件")
print(" 2. 检查分段视频的错误信息")
elif task.status == "running":
print("\n任务仍在运行中,请稍候...")
elif task.status == "pending":
print("\n任务等待处理中")
elif task.status == "succeeded":
print("\n✅ 任务已成功完成")
if payload.get('merged_video_url'):
print(f"视频URL: {payload['merged_video_url']}")
print("\n" + "=" * 80)
except Exception as e:
print(f"\n❌ 检查过程出错: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
check_task_382()