| | |
| | | import docx |
| | | import docx.table |
| | | import json |
| | | from dataclasses import dataclass |
| | | from PIL import Image |
| | | import io |
| | | import re |
| | | import typing |
| | | |
| | | from knowledgebase.doc.image_to_text import ImageToText |
| | | |
| | | |
| | | @dataclass |
| | | class ParagraphInfo: |
| | | """ |
| | | 段落信息 |
| | | :param text: str - 段落文本 |
| | | :param level: int - 段落级别,1-9级标题,0表示正文 |
| | | :param title_no: str - 标题编号,如1.1、1.1.1等 |
| | | """ |
| | | text: str |
| | | level: int |
| | | title_no: str |
| | | |
| | | @property |
| | | def full_text(self): |
| | | """ |
| | | 获取段落完整文本,包含标题编号 |
| | | :return: str - 段落完整文本 |
| | | """ |
| | | return f"{self.title_no} {self.text}" |
| | | |
| | | def __init__(self, text: str, level: int): |
| | | """ |
| | | 段落信息 |
| | | :param text: str - 段落文本 |
| | | :param level: int - 段落级别,1-9级标题,0表示正文 |
| | | """ |
| | | self.text = text |
| | | self.level = level |
| | | self.title_no = '' |
| | | from knowledgebase.doc.models import ParagraphInfo |
| | | from knowledgebase.log import Log |
| | | |
| | | |
| | | class DocSplit: |
| | | """ |
| | | docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。 |
| | | 1.封装段落信息 |
| | | 2.将图片和表格转换为json |
| | | 3.将段落按照文档标题级别组合成树形结构 |
| | | 2.将图片转换为自然语言描述 |
| | | 3.将表格转换为json格式 |
| | | 4.将段落按照文档标题级别组合成树形结构 |
| | | |
| | | """ |
| | | |
| | | def __init__(self, doc_file): |
| | | self.doc_file = doc_file |
| | | def __init__(self, docx_file: str): |
| | | """ |
| | | docx文档拆分 |
| | | :param docx_file: 要拆分的docx文件路径 |
| | | """ |
| | | self.docx_file = docx_file |
| | | self.image_to_text = ImageToText() |
| | | self.paragraphs:list[ParagraphInfo] = [] |
| | | self.paragraphs: list[ParagraphInfo] = [] |
| | | self.paragraph_tree: list[ParagraphInfo] = [] |
| | | |
| | | def table_to_json(self, table: docx.table.Table): |
| | | """ |
| | |
| | | :param table: docx.table.Table - 要转换的表格对象 |
| | | :return list - 表格数据,以 JSON 格式表示 |
| | | """ |
| | | table_data = [] |
| | | headers = [] |
| | | table_data = [headers] |
| | | first_row = True |
| | | row: docx.table._Row |
| | | for row in table.rows: |
| | |
| | | headers.append(cell.text) |
| | | first_row = False |
| | | continue |
| | | row_data = {} |
| | | row_data = [] |
| | | row_idx = 0 |
| | | for cell in row.cells: |
| | | if cell.tables: |
| | |
| | | else: |
| | | # 单元格文本获取 |
| | | text = cell.text |
| | | row_data[headers[row_idx]] = text |
| | | # row_data[headers[row_idx]] = text |
| | | row_data.append(text) |
| | | row_idx += 1 |
| | | |
| | | table_data.append(row_data) |
| | |
| | | |
| | | :return: list[ParagraphInfo] - 段落列表 |
| | | """ |
| | | document = docx.Document(self.doc_file) |
| | | Log.info(f"开始拆分文档:{self.docx_file}") |
| | | document = docx.Document(self.docx_file) |
| | | table_cnt = 0 |
| | | paragraph_cnt = 0 |
| | | |
| | |
| | | if element.tag.endswith('p'): # 段落 |
| | | # 获取标题多级编号 |
| | | paragraph = document.paragraphs[paragraph_cnt] |
| | | paragraph_text = paragraph.text |
| | | if paragraph_text: |
| | | self.paragraphs.append(ParagraphInfo(paragraph_text, self.get_header_level(paragraph))) |
| | | p_text = paragraph.text |
| | | try: |
| | | num = element.pPr.numPr.numId.val |
| | | level = element.pPr.numPr.ilvl.val |
| | | except: |
| | | num = 0 |
| | | level = 0 |
| | | if p_text: |
| | | title_level = self.get_title_level(paragraph) |
| | | self.paragraphs.append(ParagraphInfo(p_text, title_level, num, level)) |
| | | # 检查是否是图片,如果是图片则转换为文本 |
| | | img_data = self.get_image_blob(paragraph) |
| | | img_data = self.get_image_text(paragraph) |
| | | if img_data: |
| | | text = self.gen_text_from_img(img_data) |
| | | self.paragraphs.append(ParagraphInfo(text, 0)) |
| | | text = f"```图片(以下内容为图片描述)\n{text}\n```" |
| | | self.paragraphs.append(ParagraphInfo(text, 0, num, level)) |
| | | paragraph_cnt += 1 |
| | | elif element.tag.endswith('tbl'): # 表格 |
| | | table = document.tables[table_cnt] # 获取当前表格对象 |
| | | table_cnt += 1 |
| | | table_data = self.table_to_json(table) |
| | | self.paragraphs.append(ParagraphInfo(json.dumps(table_data, indent=4, ensure_ascii=False), 0)) |
| | | self.paragraphs.append( |
| | | ParagraphInfo("```json\n" + json.dumps(table_data, indent=4, ensure_ascii=False) + "\n```", 0)) |
| | | else: |
| | | continue |
| | | # 生成标题编号 |
| | | self.gen_title_no(self.paragraphs) |
| | | Log.info(f"开始生成标题编号和列表编号") |
| | | self.gen_title_num(self.paragraphs) |
| | | # 生成树形结构 |
| | | Log.info(f"开始生成树形结构") |
| | | self.gen_paragraph_tree(self.paragraphs) |
| | | |
| | | @staticmethod |
| | | def get_image_blob(paragraph): |
| | | def get_image_text(paragraph): |
| | | """ |
| | | 获取段落中的图片描述 |
| | | :param paragraph: 段落 |
| | | :return: 图片内容描述信息 |
| | | """ |
| | | # 遍历段落中的所有Run对象(图片通常在单独的Run中) |
| | | for run in paragraph.runs: |
| | | xml = run._element.xml |
| | |
| | | if r_id: |
| | | # 获取图片信息 |
| | | image_part = paragraph.part.rels[r_id].target_part |
| | | return DocSplit.image_convert(image_part.blob, "png") |
| | | return DocSplit.image_convert(image_part.blob) |
| | | if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1: |
| | | # 使用正则表达式查找r:embed属性 |
| | | match = re.search(r'r:embed="([^"]+)"', xml) |
| | |
| | | if r_id: |
| | | # 获取图片信息 |
| | | image_part = paragraph.part.rels[r_id].target_part |
| | | return DocSplit.image_convert(image_part.blob, "png") |
| | | return DocSplit.image_convert(image_part.blob) |
| | | return None |
| | | |
| | | @staticmethod |
| | | def gen_title_no(paragraphs: list[ParagraphInfo]): |
| | | title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1] |
| | | for i in range(len(paragraphs)): |
| | | if paragraphs[i].level > 0: |
| | | for j in range(paragraphs[i].level - 1): |
| | | title_levels[j] = 1 |
| | | paragraphs[i].title_no = '.'.join([str(x) for x in title_levels[0:paragraphs[i].level]]) |
| | | title_levels[paragraphs[i].level - 1] += 1 |
| | | def gen_title_num(paragraphs: list[ParagraphInfo]): |
| | | """ |
| | | 生成标题编号和列表编号 |
| | | 标题级别从1-9,0表示正文 |
| | | |
| | | :param paragraphs: list[ParagraphInfo] - 段落列表 |
| | | :return: None |
| | | """ |
| | | MAX_TITLE_LEVEL = 9 # 定义为常量,便于统一管理和扩展 |
| | | title_levels = [0] * MAX_TITLE_LEVEL # 初始化为全0 |
| | | |
| | | list_counters = [0] * MAX_TITLE_LEVEL |
| | | |
| | | def format_number(level: int, value: int) -> str: |
| | | # 使用映射方式简化逻辑 |
| | | if level < 0 or level > 4: |
| | | return str(value) |
| | | formats = { |
| | | 0: lambda v: f"({v})", |
| | | 1: lambda v: f"{v})", |
| | | 2: lambda v: f"({chr(96 + v)})", |
| | | 3: lambda v: f"{chr(96 + v)})", |
| | | 4: lambda v: chr(96 + v), |
| | | } |
| | | return formats[level](value) |
| | | |
| | | for p in paragraphs: |
| | | if p.title_level > 0: |
| | | title_levels[p.title_level - 1] += 1 |
| | | for i in range(p.title_level, MAX_TITLE_LEVEL): |
| | | title_levels[i] = 0 |
| | | p.title_num = '.'.join([str(x) for x in title_levels[:p.title_level]]) |
| | | list_counters = [0] * MAX_TITLE_LEVEL |
| | | else: |
| | | title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1] |
| | | # 处理列表编号 |
| | | if p.num > 0: |
| | | level = p.num_level |
| | | |
| | | # 校验 level 合法性 |
| | | if level < 0 or level >= MAX_TITLE_LEVEL: |
| | | continue |
| | | list_counters[level] += 1 |
| | | |
| | | # 重置当前层级之后的计数器 |
| | | for l in range(level + 1, MAX_TITLE_LEVEL): |
| | | list_counters[l] = 0 |
| | | |
| | | # 当前层级递增并赋值 |
| | | p.title_num = format_number(level, list_counters[level]) |
| | | else: |
| | | list_counters = [0] * MAX_TITLE_LEVEL |
| | | |
| | | @staticmethod |
| | | def get_header_level(paragraph) -> int: |
| | | if paragraph.style.base_style: |
| | | style = paragraph.style.base_style |
| | | else: |
| | | style = paragraph.style |
| | | def get_title_level(paragraph) -> int: |
| | | """ |
| | | 获取段落标题级别 |
| | | |
| | | :param paragraph: docx.paragraph.Paragraph - 要获取标题级别的段落对象 |
| | | :return: int - 标题级别,0 表示非标题 |
| | | """ |
| | | style = paragraph.style |
| | | if style and style.name.startswith('Heading'): |
| | | # 获取标题级别 |
| | | level = int(style.name.split(' ')[1]) |
| | | return level |
| | | elif style.base_style and style.base_style.name.startswith('Heading'): |
| | | level = int(style.base_style.name.split(' ')[1]) |
| | | return level |
| | | else: |
| | | return 0 |
| | | |
| | | @staticmethod |
| | | def image_convert(_in: bytes, _out_format: str) -> bytes: |
| | | def image_convert(_in: bytes) -> bytes: |
| | | """ |
| | | 将图片转换为png格式的bytes |
| | | :param _in: bytes - 图片数据 |
| | | :return: bytes - png格式的图片数据 |
| | | """ |
| | | in_io = io.BytesIO() |
| | | in_io.write(_in) |
| | | img = Image.open(in_io, "r") |
| | |
| | | out_io.seek(0) |
| | | return out_io.read() |
| | | |
| | | def gen_text_from_img(self, img_data:bytes): |
| | | def gen_text_from_img(self, img_data: bytes): |
| | | """ |
| | | 利用LLM将图片转为文本 |
| | | :param img_data: bytes - 图片数据 |
| | | :return: str - 文本 |
| | | """ |
| | | return self.image_to_text.gen_text_from_img(img_data) |
| | | |
| | | def gen_paragraph_tree(self, paragraphs: typing.List[ParagraphInfo]): |
| | | """ |
| | | 生成段落树结构,根据title_level划分段落树 |
| | | |
| | | :param paragraphs: list[ParagraphInfo] - 段落列表(会被原地修改) |
| | | """ |
| | | if not paragraphs: |
| | | return |
| | | |
| | | stack = [] |
| | | result = [] |
| | | _paragraphs = [] |
| | | |
| | | def merge_paragraph_text(info: ParagraphInfo): |
| | | text_nodes = [child for child in info.children if child.title_level == 0] |
| | | info.text += '\n' + '\n'.join([child.full_text for child in text_nodes]) |
| | | info.children = [child for child in info.children if child.title_level > 0] |
| | | |
| | | for p in paragraphs: |
| | | if p.title_level == 1: |
| | | result.append(p) |
| | | # 清理栈顶比当前级别低或相等的节点 |
| | | while stack and p.title_level != 0 and stack[-1].title_level >= p.title_level: |
| | | _p = stack.pop() |
| | | merge_paragraph_text(_p) |
| | | |
| | | if p.title_level > 0: |
| | | if len(stack): |
| | | stack[-1].children.append(p) |
| | | stack.append(p) |
| | | _paragraphs.append(p) |
| | | elif len(stack): |
| | | stack[-1].children.append(p) |
| | | else: |
| | | # 非标题段落直接加入结果 |
| | | result.append(p) |
| | | |
| | | while stack: |
| | | merge_paragraph_text(stack.pop()) |
| | | |
| | | # 替换原始列表内容,避免多次 remove 操作 |
| | | self.paragraphs[:] = _paragraphs |
| | | self.paragraph_tree = result |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | doc_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\ZL格式(公开).docx' |
| | | doc_split = DocSplit(doc_file) |
| | | docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx' |
| | | # docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\table_test.docx' |
| | | doc_split = DocSplit(docx_file) |
| | | doc_split.split() |
| | | print("\n".join([x.full_text for x in doc_split.paragraphs])) |
| | | # er = EntityRecognition() |
| | | # db = Neo4jHelper() |
| | | # for trunk in doc_split.trunks: |
| | | # print('段落文本:') |
| | | # print(trunk) |
| | | # print('实体词:') |
| | | # print(er.run(trunk)) |
| | | # entities = er.run(trunk) |
| | | # db.create_page_node() |
| | | print("\n".join([x.full_text_with_children for x in doc_split.paragraphs])) |
| | | print() |