xinli/rag-python/merge_index.py

166 lines
4.9 KiB
Python
Raw Permalink Normal View History

2026-02-24 16:49:05 +08:00
# -*- coding: utf-8 -*-
"""
索引合并脚本 - 用于合并多人处理的知识库索引
将多个 index_data 文件夹合并成一个统一的索引
使用方法
1. 将各人处理好的 index_data 文件夹重命名后放到 to_merge/ 目录
例如: to_merge/index_data_张三/, to_merge/index_data_李四/
2. 运行: python merge_index.py
3. 合并后的索引会保存到 index_data/ 目录
"""
import os
import sys
import json
import shutil
# 添加当前目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import INDEX_DIR, EMBEDDING_MODEL, OLLAMA_URL
from vector_store import vector_store
# 待合并的文件夹
MERGE_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'to_merge')
def load_documents_from_folder(folder_path):
"""从文件夹加载文档数据"""
docs_file = os.path.join(folder_path, 'documents.json')
if not os.path.exists(docs_file):
print(f" 警告: {folder_path} 中没有 documents.json")
return []
try:
with open(docs_file, 'r', encoding='utf-8') as f:
data = json.load(f)
return data.get('documents', [])
except Exception as e:
print(f" 错误: 读取 {docs_file} 失败: {e}")
return []
def merge_indexes():
"""合并多个索引"""
print("=" * 60)
print("索引合并工具")
print("=" * 60)
# 检查合并目录
if not os.path.exists(MERGE_DIR):
os.makedirs(MERGE_DIR)
print(f"已创建合并目录: {MERGE_DIR}")
print(f"请将各人处理好的 index_data 文件夹放到此目录中")
print(f"例如: {MERGE_DIR}/index_data_张三/")
return
# 扫描待合并的文件夹
folders = []
for name in os.listdir(MERGE_DIR):
folder_path = os.path.join(MERGE_DIR, name)
if os.path.isdir(folder_path):
docs_file = os.path.join(folder_path, 'documents.json')
if os.path.exists(docs_file):
folders.append((name, folder_path))
if not folders:
print(f"{MERGE_DIR} 中没有找到有效的索引文件夹")
print(f"请确保文件夹中包含 documents.json 文件")
return
print(f"找到 {len(folders)} 个待合并的索引:")
for name, path in folders:
print(f" - {name}")
print()
# 确认合并
confirm = input("是否开始合并?这将覆盖现有的 index_data (y/n): ").strip().lower()
if confirm != 'y':
print("已取消")
return
# 收集所有文档
all_documents = []
all_files = set()
print()
print("正在收集文档...")
for name, folder_path in folders:
print(f" 处理: {name}")
docs = load_documents_from_folder(folder_path)
for doc in docs:
filename = doc.get('filename', '')
# 避免重复文件
if filename and filename not in all_files:
all_files.add(filename)
all_documents.append(doc)
print(f" 文档数: {len(docs)}, 累计: {len(all_documents)}")
print()
print(f"共收集 {len(all_documents)} 个文档块")
print(f"来自 {len(all_files)} 个不同文件")
print()
# 清空现有索引
print("清空现有索引...")
if os.path.exists(INDEX_DIR):
shutil.rmtree(INDEX_DIR)
os.makedirs(INDEX_DIR)
# 重建索引
print("重建向量索引...")
print("这可能需要一些时间,请耐心等待...")
print()
# 初始化向量存储
vector_store.documents = []
vector_store.index = None
# 按文件分组处理
file_docs = {}
for doc in all_documents:
filename = doc.get('filename', 'unknown')
if filename not in file_docs:
file_docs[filename] = []
file_docs[filename].append(doc)
total_files = len(file_docs)
processed = 0
for filename, docs in file_docs.items():
processed += 1
print(f"[{processed}/{total_files}] 索引: {filename} ({len(docs)} 块)")
# 提取文本块
chunks = [doc.get('content', '') for doc in docs]
metadata = {
'filename': filename,
'file_path': docs[0].get('file_path', ''),
'char_count': sum(len(c) for c in chunks)
}
try:
vector_store.add_documents(chunks, metadata)
except Exception as e:
print(f" 错误: {e}")
# 保存索引
print()
print("保存索引...")
vector_store.save_index()
# 显示结果
stats = vector_store.get_stats()
print()
print("=" * 60)
print("合并完成!")
print("=" * 60)
print(f"总文件数: {stats['total_files']}")
print(f"总文本块: {stats['total_chunks']}")
print(f"索引目录: {INDEX_DIR}")
print()
print("提示: 合并完成后可以删除 to_merge/ 目录中的文件")
if __name__ == '__main__':
merge_indexes()