# -*- coding: utf-8 -*-
|
#
|
# @author: lyg, ym
|
# @date: 2025-5-8
|
# @version: 1
|
# @description: docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
|
import docx
|
import docx.table
|
import json
|
from PIL import Image
|
import io
|
import re
|
import typing
|
|
from knowledgebase.doc.image_to_text import ImageToText
|
from knowledgebase.doc.models import ParagraphInfo
|
from knowledgebase.log import Log
|
|
|
class DocSplit:
|
"""
|
docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
|
1.封装段落信息
|
2.将图片转换为自然语言描述
|
3.将表格转换为json格式
|
4.将段落按照文档标题级别组合成树形结构
|
|
"""
|
|
def __init__(self, docx_file: str):
|
"""
|
docx文档拆分
|
:param docx_file: 要拆分的docx文件路径
|
"""
|
self.docx_file = docx_file
|
self.image_to_text = ImageToText()
|
self.paragraphs: list[ParagraphInfo] = []
|
self.paragraph_tree: list[ParagraphInfo] = []
|
|
def table_to_json(self, table: docx.table.Table):
|
"""
|
将表格转换为 JSON 格式
|
|
:param table: docx.table.Table - 要转换的表格对象
|
:return list - 表格数据,以 JSON 格式表示
|
"""
|
headers = []
|
table_data = [headers]
|
first_row = True
|
row: docx.table._Row
|
for row in table.rows:
|
if first_row:
|
for cell in row.cells:
|
headers.append(cell.text)
|
first_row = False
|
continue
|
row_data = []
|
row_idx = 0
|
for cell in row.cells:
|
if cell.tables:
|
# 嵌套表格处理
|
if len(cell.tables) == 1:
|
text = self.table_to_json(cell.tables[0])
|
else:
|
text = []
|
for tbl in cell.tables:
|
tbl_json = self.table_to_json(tbl)
|
text.append(tbl_json)
|
else:
|
# 单元格文本获取
|
text = cell.text
|
# row_data[headers[row_idx]] = text
|
row_data.append(text)
|
row_idx += 1
|
|
table_data.append(row_data)
|
return table_data
|
|
def split(self):
|
"""
|
将文档拆分成段落,并返回段落列表
|
|
:return: list[ParagraphInfo] - 段落列表
|
"""
|
Log.info(f"开始拆分文档:{self.docx_file}")
|
document = docx.Document(self.docx_file)
|
table_cnt = 0
|
paragraph_cnt = 0
|
|
for element in document.element.body:
|
if element.tag.endswith('p'): # 段落
|
# 获取标题多级编号
|
paragraph = document.paragraphs[paragraph_cnt]
|
p_text = paragraph.text
|
try:
|
num = element.pPr.numPr.numId.val
|
level = element.pPr.numPr.ilvl.val
|
except:
|
num = 0
|
level = 0
|
if p_text:
|
title_level = self.get_title_level(paragraph)
|
self.paragraphs.append(ParagraphInfo(p_text, title_level, num, level))
|
# 检查是否是图片,如果是图片则转换为文本
|
img_data = self.get_image_text(paragraph)
|
if img_data:
|
text = self.gen_text_from_img(img_data)
|
text = f"```图片(以下内容为图片描述)\n{text}\n```"
|
self.paragraphs.append(ParagraphInfo(text, 0, num, level))
|
paragraph_cnt += 1
|
elif element.tag.endswith('tbl'): # 表格
|
table = document.tables[table_cnt] # 获取当前表格对象
|
table_cnt += 1
|
table_data = self.table_to_json(table)
|
self.paragraphs.append(
|
ParagraphInfo("```json\n" + json.dumps(table_data, indent=4, ensure_ascii=False) + "\n```", 0))
|
else:
|
continue
|
# 生成标题编号
|
Log.info(f"开始生成标题编号和列表编号")
|
self.gen_title_num(self.paragraphs)
|
# 生成树形结构
|
Log.info(f"开始生成树形结构")
|
self.gen_paragraph_tree(self.paragraphs)
|
|
@staticmethod
|
def get_image_text(paragraph):
|
"""
|
获取段落中的图片描述
|
:param paragraph: 段落
|
:return: 图片内容描述信息
|
"""
|
# 遍历段落中的所有Run对象(图片通常在单独的Run中)
|
for run in paragraph.runs:
|
xml = run._element.xml
|
if xml.find('v:imagedata') != -1:
|
# 使用正则表达式查找r:id属性
|
match = re.search(r'r:id="([^"]+)"', xml)
|
if match:
|
r_id = match.group(1)
|
if r_id:
|
# 获取图片信息
|
image_part = paragraph.part.rels[r_id].target_part
|
return DocSplit.image_convert(image_part.blob)
|
if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
|
# 使用正则表达式查找r:embed属性
|
match = re.search(r'r:embed="([^"]+)"', xml)
|
if match:
|
r_id = match.group(1)
|
if r_id:
|
# 获取图片信息
|
image_part = paragraph.part.rels[r_id].target_part
|
return DocSplit.image_convert(image_part.blob)
|
return None
|
|
@staticmethod
|
def gen_title_num(paragraphs: list[ParagraphInfo]):
|
"""
|
生成标题编号和列表编号
|
标题级别从1-9,0表示正文
|
|
:param paragraphs: list[ParagraphInfo] - 段落列表
|
:return: None
|
"""
|
MAX_TITLE_LEVEL = 9 # 定义为常量,便于统一管理和扩展
|
title_levels = [0] * MAX_TITLE_LEVEL # 初始化为全0
|
|
list_counters = [0] * MAX_TITLE_LEVEL
|
|
def format_number(level: int, value: int) -> str:
|
# 使用映射方式简化逻辑
|
if level < 0 or level > 4:
|
return str(value)
|
formats = {
|
0: lambda v: f"({v})",
|
1: lambda v: f"{v})",
|
2: lambda v: f"({chr(96 + v)})",
|
3: lambda v: f"{chr(96 + v)})",
|
4: lambda v: chr(96 + v),
|
}
|
return formats[level](value)
|
|
for p in paragraphs:
|
if p.title_level > 0:
|
title_levels[p.title_level - 1] += 1
|
for i in range(p.title_level, MAX_TITLE_LEVEL):
|
title_levels[i] = 0
|
p.title_num = '.'.join([str(x) for x in title_levels[:p.title_level]])
|
list_counters = [0] * MAX_TITLE_LEVEL
|
else:
|
# 处理列表编号
|
if p.num > 0:
|
level = p.num_level
|
|
# 校验 level 合法性
|
if level < 0 or level >= MAX_TITLE_LEVEL:
|
continue
|
list_counters[level] += 1
|
|
# 重置当前层级之后的计数器
|
for l in range(level + 1, MAX_TITLE_LEVEL):
|
list_counters[l] = 0
|
|
# 当前层级递增并赋值
|
p.title_num = format_number(level, list_counters[level])
|
else:
|
list_counters = [0] * MAX_TITLE_LEVEL
|
|
@staticmethod
|
def get_title_level(paragraph) -> int:
|
"""
|
获取段落标题级别
|
|
:param paragraph: docx.paragraph.Paragraph - 要获取标题级别的段落对象
|
:return: int - 标题级别,0 表示非标题
|
"""
|
style = paragraph.style
|
if style and style.name.startswith('Heading'):
|
# 获取标题级别
|
level = int(style.name.split(' ')[1])
|
return level
|
elif style.base_style and style.base_style.name.startswith('Heading'):
|
level = int(style.base_style.name.split(' ')[1])
|
return level
|
else:
|
return 0
|
|
@staticmethod
|
def image_convert(_in: bytes) -> bytes:
|
"""
|
将图片转换为png格式的bytes
|
:param _in: bytes - 图片数据
|
:return: bytes - png格式的图片数据
|
"""
|
in_io = io.BytesIO()
|
in_io.write(_in)
|
img = Image.open(in_io, "r")
|
out_io = io.BytesIO()
|
img.save(out_io, "png")
|
out_io.seek(0)
|
return out_io.read()
|
|
def gen_text_from_img(self, img_data: bytes):
|
"""
|
利用LLM将图片转为文本
|
:param img_data: bytes - 图片数据
|
:return: str - 文本
|
"""
|
return self.image_to_text.gen_text_from_img(img_data)
|
|
def gen_paragraph_tree(self, paragraphs: typing.List[ParagraphInfo]):
|
"""
|
生成段落树结构,根据title_level划分段落树
|
|
:param paragraphs: list[ParagraphInfo] - 段落列表(会被原地修改)
|
"""
|
if not paragraphs:
|
return
|
|
stack = []
|
result = []
|
_paragraphs = []
|
|
def merge_paragraph_text(info: ParagraphInfo):
|
text_nodes = [child for child in info.children if child.title_level == 0]
|
info.text += '\n' + '\n'.join([child.full_text for child in text_nodes])
|
info.children = [child for child in info.children if child.title_level > 0]
|
|
for p in paragraphs:
|
if p.title_level == 1:
|
result.append(p)
|
# 清理栈顶比当前级别低或相等的节点
|
while stack and p.title_level != 0 and stack[-1].title_level >= p.title_level:
|
_p = stack.pop()
|
merge_paragraph_text(_p)
|
|
if p.title_level > 0:
|
if len(stack):
|
stack[-1].children.append(p)
|
stack.append(p)
|
_paragraphs.append(p)
|
elif len(stack):
|
stack[-1].children.append(p)
|
else:
|
# 非标题段落直接加入结果
|
result.append(p)
|
|
while stack:
|
merge_paragraph_text(stack.pop())
|
|
# 替换原始列表内容,避免多次 remove 操作
|
self.paragraphs[:] = _paragraphs
|
self.paragraph_tree = result
|
|
|
if __name__ == '__main__':
|
docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx'
|
# docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\table_test.docx'
|
doc_split = DocSplit(docx_file)
|
doc_split.split()
|
# er = EntityRecognition()
|
# db = Neo4jHelper()
|
# for trunk in doc_split.trunks:
|
# print('段落文本:')
|
# print(trunk)
|
# print('实体词:')
|
# print(er.run(trunk))
|
# entities = er.run(trunk)
|
# db.create_page_node()
|
print("\n".join([x.full_text_with_children for x in doc_split.paragraphs]))
|
print()
|