# -*- coding: utf-8 -*- # # @author: lyg, ym # @date: 2025-5-8 # @version: 1 # @description: docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。 import docx import docx.table import json from PIL import Image import io import re import typing from knowledgebase.doc.image_to_text import ImageToText from knowledgebase.doc.models import ParagraphInfo from knowledgebase.log import Log from bs4 import BeautifulSoup class DocSplit: """ docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。 1.封装段落信息 2.将图片转换为自然语言描述 3.将表格转换为json格式 4.将段落按照文档标题级别组合成树形结构 """ def __init__(self, docx_file: str, docx_type: str): """ docx文档拆分 :param docx_file: 要拆分的docx文件路径 :param docx_type: 文档类型 """ self.docx_file = docx_file self.docx_type = docx_type self.image_to_text = ImageToText() self.paragraphs: list[ParagraphInfo] = [] self.paragraph_tree: list[ParagraphInfo] = [] def table_to_json(self, table: docx.table.Table): """ 将表格转换为 JSON 格式 :param table: docx.table.Table - 要转换的表格对象 :return list - 表格数据,以 JSON 格式表示 """ headers = [] table_data = [headers] first_row = True row: docx.table._Row for row in table.rows: if first_row: for cell in row.cells: headers.append(cell.text) first_row = False continue row_data = [] row_idx = 0 for cell in row.cells: if cell.tables: # 嵌套表格处理 if len(cell.tables) == 1: text = self.table_to_json(cell.tables[0]) else: text = [] for tbl in cell.tables: tbl_json = self.table_to_json(tbl) text.append(tbl_json) else: # 单元格文本获取 text = cell.text if cell._element.xml.find("w:ins")!=-1: soup = BeautifulSoup(cell._element.xml, "xml") text = ''.join([x.get_text() for x in soup.find_all("w:t")]) # row_data[headers[row_idx]] = text row_data.append(text) row_idx += 1 table_data.append(row_data) return table_data def split(self): """ 将文档拆分成段落,并返回段落列表 :return: list[ParagraphInfo] - 段落列表 """ Log.info(f"开始拆分文档:{self.docx_file}") document = docx.Document(self.docx_file) table_cnt = 0 paragraph_cnt = 0 for element in document.element.body: if element.tag.endswith('p'): # 段落 # 获取标题多级编号 paragraph = document.paragraphs[paragraph_cnt] p_text = paragraph.text is_toc = paragraph.style.name.startswith('TOC') and '目' in p_text and '录' in p_text try: num = element.pPr.numPr.numId.val level = element.pPr.numPr.ilvl.val except: num = 0 level = 0 if p_text: title_level = self.get_title_level(paragraph) self.paragraphs.append(ParagraphInfo(p_text, title_level, num, level, is_toc)) # 检查是否是图片,如果是图片则转换为文本 img_data = self.get_image_text(paragraph) if img_data: text = self.gen_text_from_img(img_data) text = f"```图片(以下内容为图片描述)\n{text}\n```" self.paragraphs.append(ParagraphInfo(text, 0, num, level)) paragraph_cnt += 1 elif element.tag.endswith('tbl'): # 表格 table = document.tables[table_cnt] # 获取当前表格对象 table_cnt += 1 table_data = self.table_to_json(table) self.paragraphs.append( ParagraphInfo("```json\n" + json.dumps(table_data, indent=4, ensure_ascii=False) + "\n```", 0)) else: continue # 去除目录 self.remove_toc(self.paragraphs) # 生成标题编号 Log.info(f"开始生成标题编号和列表编号") self.gen_title_num(self.paragraphs) # 生成树形结构 Log.info(f"开始生成树形结构") self.gen_paragraph_tree(self.paragraphs) @staticmethod def remove_toc(paragraphs: [ParagraphInfo]): rm_list = [] for p in paragraphs: if p.is_toc: rm_list.append(p) elif rm_list and p.title_level == 1: break elif rm_list: rm_list.append(p) for p in rm_list: paragraphs.remove(p) @staticmethod def get_image_text(paragraph): """ 获取段落中的图片描述 :param paragraph: 段落 :return: 图片内容描述信息 """ # 遍历段落中的所有Run对象(图片通常在单独的Run中) for run in paragraph.runs: xml = run._element.xml if xml.find('v:imagedata') != -1: # 使用正则表达式查找r:id属性 match = re.search(r'r:id="([^"]+)"', xml) if match: r_id = match.group(1) if r_id: # 获取图片信息 image_part = paragraph.part.rels[r_id].target_part return DocSplit.image_convert(image_part.blob) if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1: # 使用正则表达式查找r:embed属性 match = re.search(r'r:embed="([^"]+)"', xml) if match: r_id = match.group(1) if r_id: # 获取图片信息 image_part = paragraph.part.rels[r_id].target_part return DocSplit.image_convert(image_part.blob) return None @staticmethod def gen_title_num(paragraphs: list[ParagraphInfo]): """ 生成标题编号和列表编号 标题级别从1-9,0表示正文 :param paragraphs: list[ParagraphInfo] - 段落列表 :return: None """ MAX_TITLE_LEVEL = 9 # 定义为常量,便于统一管理和扩展 title_levels = [0] * MAX_TITLE_LEVEL # 初始化为全0 list_counters = [0] * MAX_TITLE_LEVEL def format_number(level: int, value: int) -> str: # 使用映射方式简化逻辑 if level < 0 or level > 4: return str(value) formats = { 0: lambda v: f"({v})", 1: lambda v: f"{v})", 2: lambda v: f"({chr(96 + v)})", 3: lambda v: f"{chr(96 + v)})", 4: lambda v: chr(96 + v), } return formats[level](value) for p in paragraphs: if p.title_level > 0: title_levels[p.title_level - 1] += 1 for i in range(p.title_level, MAX_TITLE_LEVEL): title_levels[i] = 0 p.title_num = '.'.join([str(x) for x in title_levels[:p.title_level]]) list_counters = [0] * MAX_TITLE_LEVEL else: # 处理列表编号 if p.num > 0: level = p.num_level # 校验 level 合法性 if level < 0 or level >= MAX_TITLE_LEVEL: continue list_counters[level] += 1 # 重置当前层级之后的计数器 for l in range(level + 1, MAX_TITLE_LEVEL): list_counters[l] = 0 # 当前层级递增并赋值 p.title_num = format_number(level, list_counters[level]) else: list_counters = [0] * MAX_TITLE_LEVEL @staticmethod def get_title_level(paragraph) -> int: """ 获取段落标题级别 :param paragraph: docx.paragraph.Paragraph - 要获取标题级别的段落对象 :return: int - 标题级别,0 表示非标题 """ style = paragraph.style if style and style.name.startswith('Heading'): # 获取标题级别 level = int(style.name.split(' ')[1]) return level elif style.base_style and style.base_style.name.startswith('Heading'): level = int(style.base_style.name.split(' ')[1]) return level else: return 0 @staticmethod def image_convert(_in: bytes) -> bytes: """ 将图片转换为png格式的bytes :param _in: bytes - 图片数据 :return: bytes - png格式的图片数据 """ in_io = io.BytesIO() in_io.write(_in) img = Image.open(in_io, "r") out_io = io.BytesIO() img.save(out_io, "png") out_io.seek(0) return out_io.read() def gen_text_from_img(self, img_data: bytes): """ 利用LLM将图片转为文本 :param img_data: bytes - 图片数据 :return: str - 文本 """ return '' return self.image_to_text.gen_text_from_img(img_data) def gen_paragraph_tree(self, paragraphs: typing.List[ParagraphInfo]): """ 生成段落树结构,根据title_level划分段落树 :param paragraphs: list[ParagraphInfo] - 段落列表(会被原地修改) """ if not paragraphs: return stack = [] result = [] _paragraphs = [] def merge_paragraph_text(info: ParagraphInfo): text_nodes = [child for child in info.children if child.title_level == 0] info.text += '\n' + '\n'.join([child.full_text for child in text_nodes]) info.children = [child for child in info.children if child.title_level > 0] for p in paragraphs: if p.title_level == 1: result.append(p) # 清理栈顶比当前级别低或相等的节点 while stack and p.title_level != 0 and stack[-1].title_level >= p.title_level: _p = stack.pop() merge_paragraph_text(_p) if p.title_level > 0: if len(stack): stack[-1].children.append(p) stack.append(p) _paragraphs.append(p) elif len(stack): stack[-1].children.append(p) else: # 非标题段落直接加入结果 result.append(p) while stack: merge_paragraph_text(stack.pop()) # 替换原始列表内容,避免多次 remove 操作 self.paragraphs[:] = _paragraphs self.paragraph_tree = result