xinli/rag-python/batch_index.py
2026-02-24 16:49:05 +08:00

156 lines
4.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
"""
批量索引脚本 - 用于处理大文件
直接运行此脚本来索引 knowledge_docs 目录中的所有文件
使用方法:
1. 将 PDF 文件放入 rag-python/knowledge_docs/ 目录
2. 运行: python batch_index.py
"""
import os
import sys
import time
# 添加当前目录到路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from config import KNOWLEDGE_DIR, CHUNK_SIZE
from document_parser import parse_document, is_supported_file
from text_splitter import split_text
from vector_store import vector_store
def format_time(seconds):
"""格式化时间"""
if seconds < 60:
return f"{seconds:.1f}"
elif seconds < 3600:
return f"{seconds/60:.1f}分钟"
else:
return f"{seconds/3600:.1f}小时"
def estimate_time(char_count):
"""估算处理时间"""
# 每300字符一个块每块约1.5秒
chunks = char_count / CHUNK_SIZE
seconds = chunks * 1.5
return format_time(seconds)
def batch_index():
"""批量索引所有文件"""
print("=" * 60)
print("批量索引工具")
print("=" * 60)
print(f"知识库目录: {KNOWLEDGE_DIR}")
print(f"分块大小: {CHUNK_SIZE} 字符")
print()
# 加载现有索引
print("加载现有索引...")
vector_store.load_index()
stats = vector_store.get_stats()
indexed_files = set(stats.get('files', []))
print(f"已索引文件: {len(indexed_files)}")
print()
# 扫描文件
files_to_process = []
for filename in os.listdir(KNOWLEDGE_DIR):
file_path = os.path.join(KNOWLEDGE_DIR, filename)
if os.path.isfile(file_path) and is_supported_file(filename):
if filename not in indexed_files:
file_size = os.path.getsize(file_path)
files_to_process.append((filename, file_path, file_size))
if not files_to_process:
print("没有新文件需要索引。")
print(f"如需重新索引,请先删除 index_data 目录中的文件。")
return
# 显示待处理文件
print(f"发现 {len(files_to_process)} 个新文件:")
total_size = 0
for filename, _, size in files_to_process:
size_mb = size / (1024 * 1024)
total_size += size
print(f" - {filename} ({size_mb:.1f} MB)")
print(f"\n总大小: {total_size / (1024 * 1024):.1f} MB")
print()
# 确认处理
confirm = input("是否开始处理?(y/n): ").strip().lower()
if confirm != 'y':
print("已取消。")
return
print()
print("=" * 60)
print("开始处理...")
print("=" * 60)
total_start = time.time()
success_count = 0
fail_count = 0
for i, (filename, file_path, file_size) in enumerate(files_to_process):
print()
print(f"[{i+1}/{len(files_to_process)}] 处理: {filename}")
print("-" * 40)
file_start = time.time()
try:
# 解析文档
print("解析文档...")
content = parse_document(file_path)
if not content or not content.strip():
print(f" 警告: 文档内容为空,跳过")
fail_count += 1
continue
char_count = len(content)
print(f" 提取文本: {char_count} 字符")
print(f" 预计处理时间: {estimate_time(char_count)}")
# 分块
print("分块处理...")
chunks = split_text(content)
print(f" 生成 {len(chunks)} 个文本块")
# 向量化
print("向量化处理...")
metadata = {
'filename': filename,
'file_path': file_path,
'char_count': char_count
}
added = vector_store.add_documents(chunks, metadata)
file_time = time.time() - file_start
print(f" 完成! 耗时: {format_time(file_time)}")
success_count += 1
except Exception as e:
print(f" 错误: {e}")
fail_count += 1
# 总结
total_time = time.time() - total_start
print()
print("=" * 60)
print("处理完成!")
print("=" * 60)
print(f"成功: {success_count} 个文件")
print(f"失败: {fail_count} 个文件")
print(f"总耗时: {format_time(total_time)}")
# 显示最终统计
final_stats = vector_store.get_stats()
print(f"索引总文件数: {final_stats['total_files']}")
print(f"索引总文本块: {final_stats['total_chunks']}")
if __name__ == '__main__':
batch_index()