xinli/rag-python/file_watcher.py

173 lines
6.1 KiB
Python
Raw Normal View History

2026-02-24 16:49:05 +08:00
# -*- coding: utf-8 -*-
"""
文件监控服务 - 监控知识库文件夹的变化
"""
import os
import time
import threading
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from config import KNOWLEDGE_DIR
from document_parser import is_supported_file
# 全局变量:正在上传的文件列表(用于避免文件监控器重复处理)
uploading_files = set()
uploading_lock = threading.Lock()
def mark_uploading(filename):
"""标记文件正在上传"""
with uploading_lock:
uploading_files.add(filename)
def unmark_uploading(filename):
"""取消上传标记"""
with uploading_lock:
uploading_files.discard(filename)
def is_uploading(filename):
"""检查文件是否正在上传"""
with uploading_lock:
return filename in uploading_files
class KnowledgeFileHandler(FileSystemEventHandler):
"""文件变化处理器"""
def __init__(self, knowledge_service):
self.knowledge_service = knowledge_service
self.pending_files = {} # 防抖:记录待处理的文件
self.debounce_seconds = 3 # 增加防抖时间
self._lock = threading.Lock()
def _should_process(self, path):
"""检查是否应该处理该文件"""
if not os.path.isfile(path):
return False
filename = os.path.basename(path)
# 跳过正在上传的文件
if is_uploading(filename):
return False
return is_supported_file(filename)
def _debounce_process(self, path, action):
"""防抖处理"""
filename = os.path.basename(path)
# 跳过正在上传的文件
if is_uploading(filename):
print(f"[文件监控] 跳过正在上传的文件: {filename}")
return
with self._lock:
self.pending_files[path] = {
'action': action,
'time': time.time()
}
# 延迟处理
def delayed_process():
time.sleep(self.debounce_seconds)
with self._lock:
if path in self.pending_files:
info = self.pending_files.pop(path)
# 再次检查是否正在上传
if not is_uploading(os.path.basename(path)):
self._do_process(path, info['action'])
threading.Thread(target=delayed_process, daemon=True).start()
def _do_process(self, path, action):
"""执行实际处理"""
filename = os.path.basename(path)
# 最后一次检查
if is_uploading(filename):
return
try:
if action == 'add':
# 检查文件是否已经在索引中
stats = self.knowledge_service.get_stats()
if filename in stats.get('files', []):
print(f"[文件监控] 文件已索引,跳过: {filename}")
return
print(f"[文件监控] 检测到新文件: {filename}")
result = self.knowledge_service.add_document(path, filename)
if result['success']:
print(f"[文件监控] 已索引: {filename}, {result['chunks']} 个文本块")
else:
print(f"[文件监控] 索引失败: {filename}, {result.get('error')}")
elif action == 'delete':
# 确认文件确实不存在
if os.path.exists(path):
print(f"[文件监控] 文件仍存在,跳过删除: {filename}")
return
print(f"[文件监控] 检测到文件删除: {filename}")
result = self.knowledge_service.delete_document(filename)
print(f"[文件监控] 已从索引删除: {filename}")
elif action == 'modify':
# 对于修改事件,只有当文件内容确实变化时才重新索引
# 这里简化处理:跳过修改事件,因为上传时已经索引过了
print(f"[文件监控] 检测到文件修改,跳过: {filename}")
except Exception as e:
print(f"[文件监控] 处理失败 {filename}: {e}")
def on_created(self, event):
if not event.is_directory and self._should_process(event.src_path):
self._debounce_process(event.src_path, 'add')
def on_deleted(self, event):
if not event.is_directory:
filename = os.path.basename(event.src_path)
if is_supported_file(filename) and not is_uploading(filename):
self._debounce_process(event.src_path, 'delete')
def on_modified(self, event):
# 暂时禁用修改事件处理,避免与上传冲突
pass
def on_moved(self, event):
if not event.is_directory:
src_filename = os.path.basename(event.src_path)
# 处理移出
if is_supported_file(src_filename) and not is_uploading(src_filename):
self._debounce_process(event.src_path, 'delete')
# 处理移入
if self._should_process(event.dest_path):
self._debounce_process(event.dest_path, 'add')
class FileWatcher:
"""文件监控服务"""
def __init__(self, knowledge_service):
self.knowledge_service = knowledge_service
self.observer = None
self.running = False
def start(self):
"""启动文件监控"""
if self.running:
return
print(f"[文件监控] 开始监控文件夹: {KNOWLEDGE_DIR}")
handler = KnowledgeFileHandler(self.knowledge_service)
self.observer = Observer()
self.observer.schedule(handler, KNOWLEDGE_DIR, recursive=True)
self.observer.start()
self.running = True
def stop(self):
"""停止文件监控"""
if self.observer:
self.observer.stop()
self.observer.join()
self.running = False
print("[文件监控] 已停止")