lyg
2 天以前 22f370322412074174cde20ecfd14ec03657ab63
knowledgebase/doc/docx_split.py
@@ -7,58 +7,37 @@
import docx
import docx.table
import json
from dataclasses import dataclass
from PIL import Image
import io
import re
import typing
from knowledgebase.doc.image_to_text import ImageToText
@dataclass
class ParagraphInfo:
    """
    段落信息
    :param text: str - 段落文本
    :param level: int - 段落级别,1-9级标题,0表示正文
    :param title_no: str - 标题编号,如1.1、1.1.1等
    """
    text: str
    level: int
    title_no: str
    @property
    def full_text(self):
        """
        获取段落完整文本,包含标题编号
        :return: str - 段落完整文本
        """
        return f"{self.title_no} {self.text}"
    def __init__(self, text: str, level: int):
        """
        段落信息
        :param text: str - 段落文本
        :param level: int - 段落级别,1-9级标题,0表示正文
        """
        self.text = text
        self.level = level
        self.title_no = ''
from knowledgebase.doc.models import ParagraphInfo
from knowledgebase.log import Log
from bs4 import BeautifulSoup
class DocSplit:
    """
    docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
    1.封装段落信息
    2.将图片和表格转换为json
    3.将段落按照文档标题级别组合成树形结构
    2.将图片转换为自然语言描述
    3.将表格转换为json格式
    4.将段落按照文档标题级别组合成树形结构
    """
    def __init__(self, doc_file):
        self.doc_file = doc_file
    def __init__(self, docx_file: str, docx_type: str):
        """
        docx文档拆分
        :param docx_file: 要拆分的docx文件路径
        :param docx_type: 文档类型
        """
        self.docx_file = docx_file
        self.docx_type = docx_type
        self.image_to_text = ImageToText()
        self.paragraphs:list[ParagraphInfo] = []
        self.paragraphs: list[ParagraphInfo] = []
        self.paragraph_tree: list[ParagraphInfo] = []
    def table_to_json(self, table: docx.table.Table):
        """
@@ -67,8 +46,8 @@
           :param table: docx.table.Table - 要转换的表格对象
           :return list - 表格数据,以 JSON 格式表示
        """
        table_data = []
        headers = []
        table_data = [headers]
        first_row = True
        row: docx.table._Row
        for row in table.rows:
@@ -77,7 +56,7 @@
                    headers.append(cell.text)
                first_row = False
                continue
            row_data = {}
            row_data = []
            row_idx = 0
            for cell in row.cells:
                if cell.tables:
@@ -92,7 +71,11 @@
                else:
                    # 单元格文本获取
                    text = cell.text
                row_data[headers[row_idx]] = text
                    if cell._element.xml.find("w:ins")!=-1:
                        soup = BeautifulSoup(cell._element.xml, "xml")
                        text = ''.join([x.get_text() for x in soup.find_all("w:t")])
                # row_data[headers[row_idx]] = text
                row_data.append(text)
                row_idx += 1
            table_data.append(row_data)
@@ -104,7 +87,8 @@
        :return: list[ParagraphInfo] - 段落列表
        """
        document = docx.Document(self.doc_file)
        Log.info(f"开始拆分文档:{self.docx_file}")
        document = docx.Document(self.docx_file)
        table_cnt = 0
        paragraph_cnt = 0
@@ -112,27 +96,61 @@
            if element.tag.endswith('p'):  # 段落
                # 获取标题多级编号
                paragraph = document.paragraphs[paragraph_cnt]
                paragraph_text = paragraph.text
                if paragraph_text:
                    self.paragraphs.append(ParagraphInfo(paragraph_text, self.get_header_level(paragraph)))
                p_text = paragraph.text
                is_toc = paragraph.style.name.startswith('TOC') and '目' in p_text and '录' in p_text
                try:
                    num = element.pPr.numPr.numId.val
                    level = element.pPr.numPr.ilvl.val
                except:
                    num = 0
                    level = 0
                if p_text:
                    title_level = self.get_title_level(paragraph)
                    self.paragraphs.append(ParagraphInfo(p_text, title_level, num, level, is_toc))
                # 检查是否是图片,如果是图片则转换为文本
                img_data = self.get_image_blob(paragraph)
                img_data = self.get_image_text(paragraph)
                if img_data:
                    text = self.gen_text_from_img(img_data)
                    self.paragraphs.append(ParagraphInfo(text, 0))
                    text = f"```图片(以下内容为图片描述)\n{text}\n```"
                    self.paragraphs.append(ParagraphInfo(text, 0, num, level))
                paragraph_cnt += 1
            elif element.tag.endswith('tbl'):  # 表格
                table = document.tables[table_cnt]  # 获取当前表格对象
                table_cnt += 1
                table_data = self.table_to_json(table)
                self.paragraphs.append(ParagraphInfo(json.dumps(table_data, indent=4, ensure_ascii=False), 0))
                self.paragraphs.append(
                    ParagraphInfo("```json\n" + json.dumps(table_data, indent=4, ensure_ascii=False) + "\n```", 0))
            else:
                continue
        # 去除目录
        self.remove_toc(self.paragraphs)
        # 生成标题编号
        self.gen_title_no(self.paragraphs)
        Log.info(f"开始生成标题编号和列表编号")
        self.gen_title_num(self.paragraphs)
        # 生成树形结构
        Log.info(f"开始生成树形结构")
        self.gen_paragraph_tree(self.paragraphs)
    @staticmethod
    def get_image_blob(paragraph):
    def remove_toc(paragraphs: [ParagraphInfo]):
        rm_list = []
        for p in paragraphs:
            if p.is_toc:
                rm_list.append(p)
            elif rm_list and p.title_level == 1:
                break
            elif rm_list:
                rm_list.append(p)
        for p in rm_list:
            paragraphs.remove(p)
    @staticmethod
    def get_image_text(paragraph):
        """
        获取段落中的图片描述
        :param paragraph: 段落
        :return: 图片内容描述信息
        """
        # 遍历段落中的所有Run对象(图片通常在单独的Run中)
        for run in paragraph.runs:
            xml = run._element.xml
@@ -144,7 +162,7 @@
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob, "png")
                        return DocSplit.image_convert(image_part.blob)
            if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
                # 使用正则表达式查找r:embed属性
                match = re.search(r'r:embed="([^"]+)"', xml)
@@ -153,36 +171,88 @@
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob, "png")
                        return DocSplit.image_convert(image_part.blob)
        return None
    @staticmethod
    def gen_title_no(paragraphs: list[ParagraphInfo]):
        title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1]
        for i in range(len(paragraphs)):
            if paragraphs[i].level > 0:
                for j in range(paragraphs[i].level - 1):
                    title_levels[j] = 1
                paragraphs[i].title_no = '.'.join([str(x) for x in title_levels[0:paragraphs[i].level]])
                title_levels[paragraphs[i].level - 1] += 1
    def gen_title_num(paragraphs: list[ParagraphInfo]):
        """
        生成标题编号和列表编号
        标题级别从1-9,0表示正文
        :param paragraphs: list[ParagraphInfo] - 段落列表
        :return: None
        """
        MAX_TITLE_LEVEL = 9  # 定义为常量,便于统一管理和扩展
        title_levels = [0] * MAX_TITLE_LEVEL  # 初始化为全0
        list_counters = [0] * MAX_TITLE_LEVEL
        def format_number(level: int, value: int) -> str:
            # 使用映射方式简化逻辑
            if level < 0 or level > 4:
                return str(value)
            formats = {
                0: lambda v: f"({v})",
                1: lambda v: f"{v})",
                2: lambda v: f"({chr(96 + v)})",
                3: lambda v: f"{chr(96 + v)})",
                4: lambda v: chr(96 + v),
            }
            return formats[level](value)
        for p in paragraphs:
            if p.title_level > 0:
                title_levels[p.title_level - 1] += 1
                for i in range(p.title_level, MAX_TITLE_LEVEL):
                    title_levels[i] = 0
                p.title_num = '.'.join([str(x) for x in title_levels[:p.title_level]])
                list_counters = [0] * MAX_TITLE_LEVEL
            else:
                title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1]
                # 处理列表编号
                if p.num > 0:
                    level = p.num_level
                    # 校验 level 合法性
                    if level < 0 or level >= MAX_TITLE_LEVEL:
                        continue
                    list_counters[level] += 1
                    # 重置当前层级之后的计数器
                    for l in range(level + 1, MAX_TITLE_LEVEL):
                        list_counters[l] = 0
                    # 当前层级递增并赋值
                    p.title_num = format_number(level, list_counters[level])
                else:
                    list_counters = [0] * MAX_TITLE_LEVEL
    @staticmethod
    def get_header_level(paragraph) -> int:
        if paragraph.style.base_style:
            style = paragraph.style.base_style
        else:
            style = paragraph.style
    def get_title_level(paragraph) -> int:
        """
        获取段落标题级别
        :param paragraph: docx.paragraph.Paragraph - 要获取标题级别的段落对象
        :return: int - 标题级别,0 表示非标题
        """
        style = paragraph.style
        if style and style.name.startswith('Heading'):
            # 获取标题级别
            level = int(style.name.split(' ')[1])
            return level
        elif style.base_style and style.base_style.name.startswith('Heading'):
            level = int(style.base_style.name.split(' ')[1])
            return level
        else:
            return 0
    @staticmethod
    def image_convert(_in: bytes, _out_format: str) -> bytes:
    def image_convert(_in: bytes) -> bytes:
        """
        将图片转换为png格式的bytes
        :param _in: bytes - 图片数据
        :return: bytes - png格式的图片数据
        """
        in_io = io.BytesIO()
        in_io.write(_in)
        img = Image.open(in_io, "r")
@@ -191,11 +261,55 @@
        out_io.seek(0)
        return out_io.read()
    def gen_text_from_img(self, img_data:bytes):
    def gen_text_from_img(self, img_data: bytes):
        """
        利用LLM将图片转为文本
        :param img_data: bytes - 图片数据
        :return: str - 文本
        """
        return ''
        return self.image_to_text.gen_text_from_img(img_data)
if __name__ == '__main__':
    doc_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\ZL格式(公开).docx'
    doc_split = DocSplit(doc_file)
    doc_split.split()
    print("\n".join([x.full_text for x in doc_split.paragraphs]))
    def gen_paragraph_tree(self, paragraphs: typing.List[ParagraphInfo]):
        """
        生成段落树结构,根据title_level划分段落树
        :param paragraphs: list[ParagraphInfo] - 段落列表(会被原地修改)
        """
        if not paragraphs:
            return
        stack = []
        result = []
        _paragraphs = []
        def merge_paragraph_text(info: ParagraphInfo):
            text_nodes = [child for child in info.children if child.title_level == 0]
            info.text += '\n' + '\n'.join([child.full_text for child in text_nodes])
            info.children = [child for child in info.children if child.title_level > 0]
        for p in paragraphs:
            if p.title_level == 1:
                result.append(p)
            # 清理栈顶比当前级别低或相等的节点
            while stack and p.title_level != 0 and stack[-1].title_level >= p.title_level:
                _p = stack.pop()
                merge_paragraph_text(_p)
            if p.title_level > 0:
                if len(stack):
                    stack[-1].children.append(p)
                stack.append(p)
                _paragraphs.append(p)
            elif len(stack):
                stack[-1].children.append(p)
            else:
                # 非标题段落直接加入结果
                result.append(p)
        while stack:
            merge_paragraph_text(stack.pop())
        # 替换原始列表内容,避免多次 remove 操作
        self.paragraphs[:] = _paragraphs
        self.paragraph_tree = result