xinli/rag-python/document_parser.py

98 lines
2.8 KiB
Python
Raw Normal View History

2025-12-20 12:08:33 +08:00
# -*- coding: utf-8 -*-
"""
文档解析器 - 支持多种文件格式
"""
import os
import chardet
from config import SUPPORTED_EXTENSIONS
def detect_encoding(file_path):
"""检测文件编码"""
with open(file_path, 'rb') as f:
raw_data = f.read(10000)
result = chardet.detect(raw_data)
return result['encoding'] or 'utf-8'
def parse_txt(file_path):
"""解析纯文本文件"""
encoding = detect_encoding(file_path)
try:
with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
return f.read()
except Exception as e:
print(f"解析TXT文件失败 {file_path}: {e}")
return ""
def parse_md(file_path):
"""解析Markdown文件"""
return parse_txt(file_path)
def parse_pdf(file_path):
"""解析PDF文件支持大文件"""
try:
from PyPDF2 import PdfReader
file_size = os.path.getsize(file_path)
file_size_mb = file_size / (1024 * 1024)
print(f" PDF文件大小: {file_size_mb:.1f} MB")
reader = PdfReader(file_path)
total_pages = len(reader.pages)
print(f" PDF总页数: {total_pages}")
text_parts = []
for i, page in enumerate(reader.pages):
if (i + 1) % 50 == 0 or i == 0:
print(f" 解析进度: {i + 1}/{total_pages}")
try:
text = page.extract_text()
if text:
text_parts.append(text)
except Exception as e:
print(f" 警告: 第 {i + 1} 页解析失败: {e}")
continue
print(f" PDF解析完成提取文本 {len(''.join(text_parts))} 字符")
return "\n".join(text_parts)
except Exception as e:
print(f"解析PDF文件失败 {file_path}: {e}")
return ""
def parse_docx(file_path):
"""解析Word文档"""
try:
from docx import Document
doc = Document(file_path)
text_parts = []
for para in doc.paragraphs:
if para.text.strip():
text_parts.append(para.text)
return "\n".join(text_parts)
except Exception as e:
print(f"解析DOCX文件失败 {file_path}: {e}")
return ""
def parse_document(file_path):
"""根据文件类型解析文档"""
ext = os.path.splitext(file_path)[1].lower()
if ext not in SUPPORTED_EXTENSIONS:
print(f"不支持的文件类型: {ext}")
return ""
parsers = {
'.txt': parse_txt,
'.md': parse_md,
'.pdf': parse_pdf,
'.docx': parse_docx,
'.doc': parse_docx,
}
parser = parsers.get(ext, parse_txt)
return parser(file_path)
def is_supported_file(filename):
"""检查文件是否支持"""
ext = os.path.splitext(filename)[1].lower()
return ext in SUPPORTED_EXTENSIONS