xinli/rag-python/app.py

285 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
RAG 知识库服务 - Flask API
支持与 jar 包同级目录部署
"""
import os
import sys
import logging
from logging.handlers import RotatingFileHandler
from flask import Flask, request, jsonify
from flask_cors import CORS
from config import HOST, PORT, KNOWLEDGE_DIR, BASE_DIR
from knowledge_service import knowledge_service
from file_watcher import FileWatcher
from event_store import get_events, record_event
app = Flask(__name__)
CORS(app) # 允许跨域请求
def _setup_logging():
log_dir = os.path.join(BASE_DIR, "logs")
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, "rag-python.log")
root = logging.getLogger()
if root.handlers:
return
root.setLevel(logging.INFO)
file_handler = RotatingFileHandler(
log_path,
maxBytes=10 * 1024 * 1024,
backupCount=10,
encoding="utf-8",
)
formatter = logging.Formatter(
fmt="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
file_handler.setFormatter(formatter)
root.addHandler(file_handler)
_setup_logging()
log = logging.getLogger("rag.app")
# 文件监控器
file_watcher = None
@app.route('/api/health', methods=['GET'])
def health_check():
"""健康检查"""
return jsonify({
'status': 'ok',
'service': 'RAG Knowledge Service',
'knowledge_dir': KNOWLEDGE_DIR,
'base_dir': BASE_DIR
})
@app.route('/api/events', methods=['GET'])
def list_events():
"""返回最近的索引事件(用于隐藏窗口时查看索引更新情况)"""
try:
limit = request.args.get('limit', default=None, type=int)
events = get_events(limit=limit, newest_first=True)
return jsonify({'success': True, 'data': events})
except Exception as e:
log.exception("Failed to list events")
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/documents', methods=['GET'])
def list_documents():
"""列出所有文档"""
try:
documents = knowledge_service.list_documents()
return jsonify({
'success': True,
'data': documents
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/documents/upload', methods=['POST'])
def upload_document():
"""上传文档"""
try:
if 'file' not in request.files:
return jsonify({'success': False, 'error': '没有上传文件'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'success': False, 'error': '文件名为空'}), 400
result = knowledge_service.upload_and_index(file)
if result['success']:
return jsonify({
'success': True,
'data': result
})
else:
return jsonify({'success': False, 'error': result.get('error')}), 400
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/documents/<filename>', methods=['DELETE'])
def delete_document(filename):
"""删除文档"""
try:
result = knowledge_service.delete_document(filename)
return jsonify({
'success': result['success'],
'data': result
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/search', methods=['POST'])
def search():
"""搜索文档"""
try:
data = request.get_json()
query = data.get('query', '')
top_k = data.get('top_k', 5)
if not query:
return jsonify({'success': False, 'error': '查询内容不能为空'}), 400
results = knowledge_service.search(query, top_k)
# 打印调试信息
print(f"[Search] Query: {query[:50]}..., Results: {len(results)}")
for i, r in enumerate(results):
print(f" [{i+1}] filename: {r.get('filename')}, content_len: {len(r.get('content', ''))}")
return jsonify({
'success': True,
'data': results
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/stats', methods=['GET'])
def get_stats():
"""获取统计信息"""
try:
stats = knowledge_service.get_stats()
return jsonify({
'success': True,
'data': stats
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/rebuild', methods=['POST'])
def rebuild_index():
"""重建索引"""
try:
result = knowledge_service.rebuild_index()
return jsonify({
'success': True,
'data': result
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/scan', methods=['POST'])
def scan_folder():
"""扫描文件夹并索引新文件"""
try:
result = knowledge_service.scan_and_index_folder()
return jsonify({
'success': True,
'data': result
})
except Exception as e:
return jsonify({'success': False, 'error': str(e)}), 500
@app.route('/api/rag-analyze', methods=['POST'])
def rag_analyze():
"""RAG增强的AI分析 - 结合知识库生成分析报告"""
try:
data = request.get_json()
report_content = data.get('reportContent', '')
report_title = data.get('reportTitle', '心理测评报告')
if not report_content:
return jsonify({'success': False, 'error': '报告内容不能为空'}), 400
# 1. 从报告中提取关键词进行知识库检索
# 提取纯文本去除HTML标签
import re
text_content = re.sub(r'<[^>]*>', '', report_content)
# 提取关键信息用于检索
query_keywords = []
# 提取因子名称
factor_matches = re.findall(r'([\u4e00-\u9fa5]+(?:焦虑|抑郁|压力|情绪|睡眠|躯体|认知|人格|心理)[\u4e00-\u9fa5]*)', text_content)
query_keywords.extend(factor_matches[:5])
# 提取等级
level_matches = re.findall(r'(正常|轻度|中度|重度|严重)', text_content)
query_keywords.extend(level_matches[:3])
query = ' '.join(set(query_keywords)) if query_keywords else '心理测评 分析 建议'
# 2. 检索相关知识
search_results = knowledge_service.search(query, top_k=5)
# 3. 构建知识库上下文
knowledge_context = ""
sources = []
if search_results and len(search_results) > 0:
knowledge_parts = []
for i, result in enumerate(search_results[:5]):
content = result.get('content', '')
filename = result.get('filename', '未知来源')
similarity = result.get('similarity', 0)
if content:
knowledge_parts.append(f"【参考资料{i+1}】({filename})\n{content[:500]}")
sources.append({
'filename': filename,
'content': content[:200] + '...' if len(content) > 200 else content,
'similarity': similarity
})
knowledge_context = '\n\n'.join(knowledge_parts)
# 打印调试信息到控制台
print("=" * 50)
print("📚 RAG-Analyze 检索结果")
print("=" * 50)
print(f"查询关键词: {query}")
print(f"检索到文档数: {len(sources)}")
for i, s in enumerate(sources):
print(f" [{i+1}] {s['filename']} (相似度: {s.get('similarity', 0):.4f})")
print("=" * 50)
# 4. 返回检索结果让前端调用AI
return jsonify({
'success': True,
'data': {
'knowledgeContext': knowledge_context,
'sources': sources,
'query': query
}
})
except Exception as e:
import traceback
traceback.print_exc()
return jsonify({'success': False, 'error': str(e)}), 500
def init_service():
"""初始化服务"""
log.info("%s", "=" * 50)
log.info("RAG 知识库服务启动中...")
log.info("%s", "=" * 50)
record_event("service_start", knowledge_dir=KNOWLEDGE_DIR, base_dir=BASE_DIR)
# 初始化知识库服务
knowledge_service.init()
record_event("index_loaded")
# 扫描并索引新文件
scan_result = knowledge_service.scan_and_index_folder()
record_event("startup_scan", result=scan_result)
# 启动文件监控
global file_watcher
file_watcher = FileWatcher(knowledge_service)
file_watcher.start()
record_event("watcher_started", path=KNOWLEDGE_DIR)
log.info("%s", "=" * 50)
log.info("服务已启动: http://%s:%s", HOST, PORT)
log.info("知识库文件夹: %s", KNOWLEDGE_DIR)
log.info("%s", "=" * 50)
record_event("service_ready", host=HOST, port=PORT)
if __name__ == '__main__':
init_service()
app.run(host=HOST, port=PORT, debug=False, threaded=True)