296 lines
9.2 KiB
Python
296 lines
9.2 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
文档解析器 - 支持多种文件格式,包括扫描版PDF的OCR识别
|
||
支持 EasyOCR(推荐)、PaddleOCR 和 Tesseract
|
||
"""
|
||
import os
|
||
import chardet
|
||
from config import SUPPORTED_EXTENSIONS
|
||
|
||
# Tesseract OCR 路径配置(Windows)- 备用方案
|
||
TESSERACT_PATH = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
|
||
|
||
# OCR 引擎选择:'easyocr'(推荐), 'paddle' 或 'tesseract'
|
||
OCR_ENGINE = 'easyocr'
|
||
|
||
# EasyOCR 实例(延迟初始化)
|
||
_easy_ocr = None
|
||
|
||
# PaddleOCR 实例(延迟初始化)
|
||
_paddle_ocr = None
|
||
|
||
def get_easy_ocr():
|
||
"""获取 EasyOCR 实例(单例模式)"""
|
||
global _easy_ocr
|
||
if _easy_ocr is None:
|
||
try:
|
||
import easyocr
|
||
# 支持中文和英文
|
||
_easy_ocr = easyocr.Reader(['ch_sim', 'en'], gpu=False)
|
||
print(" EasyOCR 初始化成功")
|
||
except ImportError:
|
||
print(" EasyOCR 未安装,请运行: pip install easyocr")
|
||
return None
|
||
return _easy_ocr
|
||
|
||
def get_paddle_ocr():
|
||
"""获取 PaddleOCR 实例(单例模式)"""
|
||
global _paddle_ocr
|
||
if _paddle_ocr is None:
|
||
try:
|
||
from paddleocr import PaddleOCR
|
||
_paddle_ocr = PaddleOCR(use_angle_cls=True, lang='ch', show_log=False)
|
||
print(" PaddleOCR 初始化成功")
|
||
except ImportError:
|
||
print(" PaddleOCR 未安装")
|
||
return None
|
||
return _paddle_ocr
|
||
|
||
def detect_encoding(file_path):
|
||
"""检测文件编码"""
|
||
with open(file_path, 'rb') as f:
|
||
raw_data = f.read(10000)
|
||
result = chardet.detect(raw_data)
|
||
return result['encoding'] or 'utf-8'
|
||
|
||
def parse_txt(file_path):
|
||
"""解析纯文本文件"""
|
||
encoding = detect_encoding(file_path)
|
||
try:
|
||
with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
|
||
return f.read()
|
||
except Exception as e:
|
||
print(f"解析TXT文件失败 {file_path}: {e}")
|
||
return ""
|
||
|
||
def parse_md(file_path):
|
||
"""解析Markdown文件"""
|
||
return parse_txt(file_path)
|
||
|
||
def parse_pdf_with_ocr(file_path):
|
||
"""使用 PyMuPDF + OCR 解析PDF(支持扫描版)"""
|
||
try:
|
||
import fitz # PyMuPDF
|
||
from PIL import Image
|
||
import io
|
||
|
||
file_size = os.path.getsize(file_path)
|
||
file_size_mb = file_size / (1024 * 1024)
|
||
print(f" PDF文件大小: {file_size_mb:.1f} MB")
|
||
|
||
doc = fitz.open(file_path)
|
||
total_pages = len(doc)
|
||
print(f" PDF总页数: {total_pages}")
|
||
|
||
text_parts = []
|
||
ocr_used = False
|
||
|
||
for i, page in enumerate(doc):
|
||
if (i + 1) % 20 == 0 or i == 0:
|
||
print(f" 解析进度: {i + 1}/{total_pages} 页")
|
||
|
||
try:
|
||
# 先尝试直接提取文本
|
||
text = page.get_text()
|
||
|
||
# 如果文本太少,可能是扫描版,使用OCR
|
||
if len(text.strip()) < 50:
|
||
if not ocr_used:
|
||
print(f" 检测到扫描版PDF,启用OCR识别...")
|
||
ocr_used = True
|
||
|
||
# 将页面渲染为图片
|
||
mat = fitz.Matrix(2, 2) # 2x缩放提高OCR精度
|
||
pix = page.get_pixmap(matrix=mat)
|
||
img_data = pix.tobytes("png")
|
||
|
||
# 使用 EasyOCR、PaddleOCR 或 Tesseract
|
||
if OCR_ENGINE == 'easyocr':
|
||
text = ocr_with_easyocr(img_data)
|
||
elif OCR_ENGINE == 'paddle':
|
||
text = ocr_with_paddle(img_data)
|
||
else:
|
||
text = ocr_with_tesseract(img_data)
|
||
|
||
if text and text.strip():
|
||
text_parts.append(text)
|
||
|
||
except Exception as e:
|
||
print(f" 警告: 第 {i + 1} 页解析失败: {e}")
|
||
continue
|
||
|
||
doc.close()
|
||
total_chars = len(''.join(text_parts))
|
||
print(f" PDF解析完成,提取文本 {total_chars} 字符" + (" (使用OCR)" if ocr_used else ""))
|
||
return "\n".join(text_parts)
|
||
|
||
except ImportError as e:
|
||
print(f" 缺少依赖: {e}")
|
||
print(f" 请运行: pip install pymupdf easyocr pillow")
|
||
return parse_pdf_basic(file_path)
|
||
except Exception as e:
|
||
print(f" PyMuPDF解析失败: {e},尝试基础解析...")
|
||
return parse_pdf_basic(file_path)
|
||
|
||
def ocr_with_easyocr(img_data):
|
||
"""使用 EasyOCR 识别图片(推荐)"""
|
||
try:
|
||
from PIL import Image
|
||
import io
|
||
import numpy as np
|
||
|
||
ocr = get_easy_ocr()
|
||
if ocr is None:
|
||
return ""
|
||
|
||
# 将图片数据转换为 numpy 数组
|
||
img = Image.open(io.BytesIO(img_data))
|
||
img_array = np.array(img)
|
||
|
||
# OCR 识别
|
||
result = ocr.readtext(img_array)
|
||
|
||
# 提取文本
|
||
texts = [item[1] for item in result]
|
||
return '\n'.join(texts)
|
||
except Exception as e:
|
||
print(f" EasyOCR 识别失败: {e}")
|
||
return ""
|
||
|
||
def ocr_with_paddle(img_data):
|
||
"""使用 PaddleOCR 识别图片"""
|
||
try:
|
||
from PIL import Image
|
||
import io
|
||
import numpy as np
|
||
|
||
ocr = get_paddle_ocr()
|
||
if ocr is None:
|
||
return ""
|
||
|
||
# 将图片数据转换为 numpy 数组
|
||
img = Image.open(io.BytesIO(img_data))
|
||
img_array = np.array(img)
|
||
|
||
# OCR 识别
|
||
result = ocr.ocr(img_array, cls=True)
|
||
|
||
# 提取文本
|
||
texts = []
|
||
if result and result[0]:
|
||
for line in result[0]:
|
||
if line and len(line) >= 2:
|
||
texts.append(line[1][0])
|
||
|
||
return '\n'.join(texts)
|
||
except Exception as e:
|
||
print(f" PaddleOCR 识别失败: {e}")
|
||
return ""
|
||
|
||
def ocr_with_tesseract(img_data):
|
||
"""使用 Tesseract 识别图片(备用方案)"""
|
||
try:
|
||
import pytesseract
|
||
from PIL import Image
|
||
import io
|
||
|
||
# 配置 Tesseract 路径
|
||
if os.path.exists(TESSERACT_PATH):
|
||
pytesseract.pytesseract.tesseract_cmd = TESSERACT_PATH
|
||
|
||
img = Image.open(io.BytesIO(img_data))
|
||
|
||
# OCR识别(中文+英文)
|
||
text = pytesseract.image_to_string(img, lang='chi_sim+eng')
|
||
return text
|
||
except Exception as e:
|
||
print(f" Tesseract 识别失败: {e}")
|
||
return ""
|
||
|
||
def parse_pdf_basic(file_path):
|
||
"""基础PDF解析(使用PyPDF2,不支持扫描版)"""
|
||
try:
|
||
from PyPDF2 import PdfReader
|
||
|
||
file_size = os.path.getsize(file_path)
|
||
file_size_mb = file_size / (1024 * 1024)
|
||
print(f" PDF文件大小: {file_size_mb:.1f} MB (基础模式)")
|
||
|
||
reader = PdfReader(file_path)
|
||
total_pages = len(reader.pages)
|
||
print(f" PDF总页数: {total_pages}")
|
||
|
||
text_parts = []
|
||
for i, page in enumerate(reader.pages):
|
||
if (i + 1) % 50 == 0 or i == 0:
|
||
print(f" 解析进度: {i + 1}/{total_pages} 页")
|
||
try:
|
||
text = page.extract_text()
|
||
if text:
|
||
text_parts.append(text)
|
||
except Exception as e:
|
||
print(f" 警告: 第 {i + 1} 页解析失败: {e}")
|
||
continue
|
||
|
||
print(f" PDF解析完成,提取文本 {len(''.join(text_parts))} 字符")
|
||
return "\n".join(text_parts)
|
||
except Exception as e:
|
||
print(f"解析PDF文件失败 {file_path}: {e}")
|
||
return ""
|
||
|
||
def parse_pdf(file_path):
|
||
"""解析PDF文件(优先使用OCR方案)"""
|
||
return parse_pdf_with_ocr(file_path)
|
||
|
||
def parse_docx(file_path):
|
||
"""解析Word文档"""
|
||
try:
|
||
from docx import Document
|
||
doc = Document(file_path)
|
||
text_parts = []
|
||
for para in doc.paragraphs:
|
||
if para.text.strip():
|
||
text_parts.append(para.text)
|
||
return "\n".join(text_parts)
|
||
except Exception as e:
|
||
print(f"解析DOCX文件失败 {file_path}: {e}")
|
||
return ""
|
||
|
||
def parse_document(file_path):
|
||
"""根据文件类型解析文档"""
|
||
ext = os.path.splitext(file_path)[1].lower()
|
||
|
||
if ext not in SUPPORTED_EXTENSIONS:
|
||
print(f"不支持的文件类型: {ext}")
|
||
return ""
|
||
|
||
parsers = {
|
||
'.txt': parse_txt,
|
||
'.md': parse_md,
|
||
'.pdf': parse_pdf,
|
||
'.docx': parse_docx,
|
||
'.doc': parse_docx,
|
||
}
|
||
|
||
parser = parsers.get(ext, parse_txt)
|
||
return parser(file_path)
|
||
|
||
def is_supported_file(filename):
|
||
"""检查文件是否支持"""
|
||
ext = os.path.splitext(filename)[1].lower()
|
||
return ext in SUPPORTED_EXTENSIONS
|
||
|
||
def check_ocr_available():
|
||
"""检查OCR是否可用"""
|
||
try:
|
||
import pytesseract
|
||
if os.path.exists(TESSERACT_PATH):
|
||
pytesseract.pytesseract.tesseract_cmd = TESSERACT_PATH
|
||
version = pytesseract.get_tesseract_version()
|
||
print(f"Tesseract OCR 版本: {version}")
|
||
return True
|
||
except Exception as e:
|
||
print(f"OCR不可用: {e}")
|
||
print("请安装 Tesseract OCR: https://github.com/UB-Mannheim/tesseract/wiki")
|
||
return False
|