173 lines
6.1 KiB
Python
173 lines
6.1 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
文件监控服务 - 监控知识库文件夹的变化
|
|
"""
|
|
import os
|
|
import time
|
|
import threading
|
|
from watchdog.observers import Observer
|
|
from watchdog.events import FileSystemEventHandler
|
|
from config import KNOWLEDGE_DIR
|
|
from document_parser import is_supported_file
|
|
|
|
# 全局变量:正在上传的文件列表(用于避免文件监控器重复处理)
|
|
uploading_files = set()
|
|
uploading_lock = threading.Lock()
|
|
|
|
def mark_uploading(filename):
|
|
"""标记文件正在上传"""
|
|
with uploading_lock:
|
|
uploading_files.add(filename)
|
|
|
|
def unmark_uploading(filename):
|
|
"""取消上传标记"""
|
|
with uploading_lock:
|
|
uploading_files.discard(filename)
|
|
|
|
def is_uploading(filename):
|
|
"""检查文件是否正在上传"""
|
|
with uploading_lock:
|
|
return filename in uploading_files
|
|
|
|
|
|
class KnowledgeFileHandler(FileSystemEventHandler):
|
|
"""文件变化处理器"""
|
|
|
|
def __init__(self, knowledge_service):
|
|
self.knowledge_service = knowledge_service
|
|
self.pending_files = {} # 防抖:记录待处理的文件
|
|
self.debounce_seconds = 3 # 增加防抖时间
|
|
self._lock = threading.Lock()
|
|
|
|
def _should_process(self, path):
|
|
"""检查是否应该处理该文件"""
|
|
if not os.path.isfile(path):
|
|
return False
|
|
filename = os.path.basename(path)
|
|
# 跳过正在上传的文件
|
|
if is_uploading(filename):
|
|
return False
|
|
return is_supported_file(filename)
|
|
|
|
def _debounce_process(self, path, action):
|
|
"""防抖处理"""
|
|
filename = os.path.basename(path)
|
|
|
|
# 跳过正在上传的文件
|
|
if is_uploading(filename):
|
|
print(f"[文件监控] 跳过正在上传的文件: {filename}")
|
|
return
|
|
|
|
with self._lock:
|
|
self.pending_files[path] = {
|
|
'action': action,
|
|
'time': time.time()
|
|
}
|
|
|
|
# 延迟处理
|
|
def delayed_process():
|
|
time.sleep(self.debounce_seconds)
|
|
with self._lock:
|
|
if path in self.pending_files:
|
|
info = self.pending_files.pop(path)
|
|
# 再次检查是否正在上传
|
|
if not is_uploading(os.path.basename(path)):
|
|
self._do_process(path, info['action'])
|
|
|
|
threading.Thread(target=delayed_process, daemon=True).start()
|
|
|
|
def _do_process(self, path, action):
|
|
"""执行实际处理"""
|
|
filename = os.path.basename(path)
|
|
|
|
# 最后一次检查
|
|
if is_uploading(filename):
|
|
return
|
|
|
|
try:
|
|
if action == 'add':
|
|
# 检查文件是否已经在索引中
|
|
stats = self.knowledge_service.get_stats()
|
|
if filename in stats.get('files', []):
|
|
print(f"[文件监控] 文件已索引,跳过: {filename}")
|
|
return
|
|
|
|
print(f"[文件监控] 检测到新文件: {filename}")
|
|
result = self.knowledge_service.add_document(path, filename)
|
|
if result['success']:
|
|
print(f"[文件监控] 已索引: {filename}, {result['chunks']} 个文本块")
|
|
else:
|
|
print(f"[文件监控] 索引失败: {filename}, {result.get('error')}")
|
|
|
|
elif action == 'delete':
|
|
# 确认文件确实不存在
|
|
if os.path.exists(path):
|
|
print(f"[文件监控] 文件仍存在,跳过删除: {filename}")
|
|
return
|
|
print(f"[文件监控] 检测到文件删除: {filename}")
|
|
result = self.knowledge_service.delete_document(filename)
|
|
print(f"[文件监控] 已从索引删除: {filename}")
|
|
|
|
elif action == 'modify':
|
|
# 对于修改事件,只有当文件内容确实变化时才重新索引
|
|
# 这里简化处理:跳过修改事件,因为上传时已经索引过了
|
|
print(f"[文件监控] 检测到文件修改,跳过: {filename}")
|
|
|
|
except Exception as e:
|
|
print(f"[文件监控] 处理失败 {filename}: {e}")
|
|
|
|
def on_created(self, event):
|
|
if not event.is_directory and self._should_process(event.src_path):
|
|
self._debounce_process(event.src_path, 'add')
|
|
|
|
def on_deleted(self, event):
|
|
if not event.is_directory:
|
|
filename = os.path.basename(event.src_path)
|
|
if is_supported_file(filename) and not is_uploading(filename):
|
|
self._debounce_process(event.src_path, 'delete')
|
|
|
|
def on_modified(self, event):
|
|
# 暂时禁用修改事件处理,避免与上传冲突
|
|
pass
|
|
|
|
def on_moved(self, event):
|
|
if not event.is_directory:
|
|
src_filename = os.path.basename(event.src_path)
|
|
# 处理移出
|
|
if is_supported_file(src_filename) and not is_uploading(src_filename):
|
|
self._debounce_process(event.src_path, 'delete')
|
|
|
|
# 处理移入
|
|
if self._should_process(event.dest_path):
|
|
self._debounce_process(event.dest_path, 'add')
|
|
|
|
|
|
class FileWatcher:
|
|
"""文件监控服务"""
|
|
|
|
def __init__(self, knowledge_service):
|
|
self.knowledge_service = knowledge_service
|
|
self.observer = None
|
|
self.running = False
|
|
|
|
def start(self):
|
|
"""启动文件监控"""
|
|
if self.running:
|
|
return
|
|
|
|
print(f"[文件监控] 开始监控文件夹: {KNOWLEDGE_DIR}")
|
|
|
|
handler = KnowledgeFileHandler(self.knowledge_service)
|
|
self.observer = Observer()
|
|
self.observer.schedule(handler, KNOWLEDGE_DIR, recursive=True)
|
|
self.observer.start()
|
|
self.running = True
|
|
|
|
def stop(self):
|
|
"""停止文件监控"""
|
|
if self.observer:
|
|
self.observer.stop()
|
|
self.observer.join()
|
|
self.running = False
|
|
print("[文件监控] 已停止")
|