xinli/rag-python/knowledge_service.py

245 lines
7.5 KiB
Python
Raw Permalink Normal View History

2026-02-24 16:49:05 +08:00
# -*- coding: utf-8 -*-
"""
知识库服务 - 管理文档的添加删除和检索
"""
import os
import shutil
from datetime import datetime
from config import KNOWLEDGE_DIR, UPLOAD_DIR
from document_parser import parse_document, is_supported_file
from text_splitter import split_text
from vector_store import vector_store
class KnowledgeService:
def __init__(self):
self.vector_store = vector_store
def init(self):
"""初始化服务,加载已有索引"""
self.vector_store.load_index()
def scan_and_index_folder(self):
"""
扫描知识库文件夹并索引所有文档
用于启动时或手动重建索引
"""
print(f"开始扫描知识库文件夹: {KNOWLEDGE_DIR}")
# 获取已索引的文件
stats = self.vector_store.get_stats()
indexed_files = set(stats.get('files', []))
# 扫描文件夹
new_files = []
for root, dirs, files in os.walk(KNOWLEDGE_DIR):
for filename in files:
if is_supported_file(filename):
file_path = os.path.join(root, filename)
rel_path = os.path.relpath(file_path, KNOWLEDGE_DIR)
if rel_path not in indexed_files:
new_files.append((filename, file_path, rel_path))
# 索引新文件
indexed_count = 0
for filename, file_path, rel_path in new_files:
try:
result = self.add_document(file_path, filename)
if result['success']:
indexed_count += 1
print(f" 已索引: {rel_path}")
except Exception as e:
print(f" 索引失败 {rel_path}: {e}")
print(f"扫描完成,新索引 {indexed_count} 个文件")
return {
'scanned': len(new_files),
'indexed': indexed_count
}
def add_document(self, file_path, filename=None):
"""
添加单个文档到知识库
Args:
file_path: 文件路径
filename: 文件名可选
Returns:
处理结果
"""
if filename is None:
filename = os.path.basename(file_path)
if not os.path.exists(file_path):
return {'success': False, 'error': '文件不存在'}
if not is_supported_file(filename):
return {'success': False, 'error': '不支持的文件类型'}
# 解析文档
print(f"正在解析文档: {filename}")
content = parse_document(file_path)
if not content or not content.strip():
return {'success': False, 'error': '文档内容为空'}
# 分块
chunks = split_text(content)
if not chunks:
return {'success': False, 'error': '文档分块失败'}
# 元数据
metadata = {
'filename': filename,
'file_path': file_path,
'indexed_at': datetime.now().isoformat(),
'char_count': len(content)
}
# 添加到向量存储
added = self.vector_store.add_documents(chunks, metadata)
return {
'success': True,
'filename': filename,
'chunks': added,
'char_count': len(content)
}
def upload_and_index(self, file_storage, copy_to_knowledge=True):
"""
处理上传的文件并索引
Args:
file_storage: Flask FileStorage 对象
copy_to_knowledge: 是否复制到知识库文件夹
Returns:
处理结果
"""
from file_watcher import mark_uploading, unmark_uploading
filename = file_storage.filename
if not is_supported_file(filename):
return {'success': False, 'error': '不支持的文件类型'}
# 标记文件正在上传,防止文件监控器干扰
mark_uploading(filename)
# 保存到临时目录
temp_path = os.path.join(UPLOAD_DIR, filename)
file_storage.save(temp_path)
try:
# 索引文档
result = self.add_document(temp_path, filename)
if result['success'] and copy_to_knowledge:
# 复制到知识库文件夹
dest_path = os.path.join(KNOWLEDGE_DIR, filename)
shutil.copy2(temp_path, dest_path)
result['saved_to'] = dest_path
return result
finally:
# 清理临时文件
if os.path.exists(temp_path):
os.remove(temp_path)
# 延迟取消上传标记,给文件监控器足够时间忽略事件
import threading
def delayed_unmark():
import time
time.sleep(5)
unmark_uploading(filename)
threading.Thread(target=delayed_unmark, daemon=True).start()
def delete_document(self, filename):
"""
删除文档
Args:
filename: 文件名
Returns:
删除结果
"""
# 从向量存储删除
deleted = self.vector_store.delete_by_filename(filename)
# 从知识库文件夹删除
file_path = os.path.join(KNOWLEDGE_DIR, filename)
file_deleted = False
if os.path.exists(file_path):
os.remove(file_path)
file_deleted = True
return {
'success': deleted > 0 or file_deleted,
'chunks_deleted': deleted,
'file_deleted': file_deleted
}
def search(self, query, top_k=5):
"""
搜索相关文档
Args:
query: 查询文本
top_k: 返回结果数量
Returns:
搜索结果
"""
results = self.vector_store.search(query, top_k)
return results
def get_stats(self):
"""获取知识库统计信息"""
return self.vector_store.get_stats()
def list_documents(self):
"""列出所有已索引的文档"""
stats = self.vector_store.get_stats()
files = stats.get('files', [])
documents = []
for filename in files:
# 统计该文件的块数
chunk_count = sum(1 for doc in self.vector_store.documents
if doc.get('metadata', {}).get('filename') == filename)
# 获取文件信息
file_path = os.path.join(KNOWLEDGE_DIR, filename)
file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
documents.append({
'filename': filename,
'chunks': chunk_count,
'size': file_size,
'exists': os.path.exists(file_path)
})
return documents
def rebuild_index(self):
"""重建整个索引"""
print("开始重建索引...")
# 清空现有索引
self.vector_store.clear()
# 重新扫描并索引
result = self.scan_and_index_folder()
return {
'success': True,
'indexed': result['indexed']
}
# 全局实例
knowledge_service = KnowledgeService()