# -*- coding: utf-8 -*- """ 文件监控服务 - 监控知识库文件夹的变化 """ import os import time import threading from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler from config import KNOWLEDGE_DIR from document_parser import is_supported_file # 全局变量:正在上传的文件列表(用于避免文件监控器重复处理) uploading_files = set() uploading_lock = threading.Lock() def mark_uploading(filename): """标记文件正在上传""" with uploading_lock: uploading_files.add(filename) def unmark_uploading(filename): """取消上传标记""" with uploading_lock: uploading_files.discard(filename) def is_uploading(filename): """检查文件是否正在上传""" with uploading_lock: return filename in uploading_files class KnowledgeFileHandler(FileSystemEventHandler): """文件变化处理器""" def __init__(self, knowledge_service): self.knowledge_service = knowledge_service self.pending_files = {} # 防抖:记录待处理的文件 self.debounce_seconds = 3 # 增加防抖时间 self._lock = threading.Lock() def _should_process(self, path): """检查是否应该处理该文件""" if not os.path.isfile(path): return False filename = os.path.basename(path) # 跳过正在上传的文件 if is_uploading(filename): return False return is_supported_file(filename) def _debounce_process(self, path, action): """防抖处理""" filename = os.path.basename(path) # 跳过正在上传的文件 if is_uploading(filename): print(f"[文件监控] 跳过正在上传的文件: {filename}") return with self._lock: self.pending_files[path] = { 'action': action, 'time': time.time() } # 延迟处理 def delayed_process(): time.sleep(self.debounce_seconds) with self._lock: if path in self.pending_files: info = self.pending_files.pop(path) # 再次检查是否正在上传 if not is_uploading(os.path.basename(path)): self._do_process(path, info['action']) threading.Thread(target=delayed_process, daemon=True).start() def _do_process(self, path, action): """执行实际处理""" filename = os.path.basename(path) # 最后一次检查 if is_uploading(filename): return try: if action == 'add': # 检查文件是否已经在索引中 stats = self.knowledge_service.get_stats() if filename in stats.get('files', []): print(f"[文件监控] 文件已索引,跳过: {filename}") return print(f"[文件监控] 检测到新文件: {filename}") result = self.knowledge_service.add_document(path, filename) if result['success']: print(f"[文件监控] 已索引: {filename}, {result['chunks']} 个文本块") else: print(f"[文件监控] 索引失败: {filename}, {result.get('error')}") elif action == 'delete': # 确认文件确实不存在 if os.path.exists(path): print(f"[文件监控] 文件仍存在,跳过删除: {filename}") return print(f"[文件监控] 检测到文件删除: {filename}") result = self.knowledge_service.delete_document(filename) print(f"[文件监控] 已从索引删除: {filename}") elif action == 'modify': # 对于修改事件,只有当文件内容确实变化时才重新索引 # 这里简化处理:跳过修改事件,因为上传时已经索引过了 print(f"[文件监控] 检测到文件修改,跳过: {filename}") except Exception as e: print(f"[文件监控] 处理失败 {filename}: {e}") def on_created(self, event): if not event.is_directory and self._should_process(event.src_path): self._debounce_process(event.src_path, 'add') def on_deleted(self, event): if not event.is_directory: filename = os.path.basename(event.src_path) if is_supported_file(filename) and not is_uploading(filename): self._debounce_process(event.src_path, 'delete') def on_modified(self, event): # 暂时禁用修改事件处理,避免与上传冲突 pass def on_moved(self, event): if not event.is_directory: src_filename = os.path.basename(event.src_path) # 处理移出 if is_supported_file(src_filename) and not is_uploading(src_filename): self._debounce_process(event.src_path, 'delete') # 处理移入 if self._should_process(event.dest_path): self._debounce_process(event.dest_path, 'add') class FileWatcher: """文件监控服务""" def __init__(self, knowledge_service): self.knowledge_service = knowledge_service self.observer = None self.running = False def start(self): """启动文件监控""" if self.running: return print(f"[文件监控] 开始监控文件夹: {KNOWLEDGE_DIR}") handler = KnowledgeFileHandler(self.knowledge_service) self.observer = Observer() self.observer.schedule(handler, KNOWLEDGE_DIR, recursive=True) self.observer.start() self.running = True def stop(self): """停止文件监控""" if self.observer: self.observer.stop() self.observer.join() self.running = False print("[文件监控] 已停止")