lyg
2 天以前 22f370322412074174cde20ecfd14ec03657ab63
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
# -*- coding: utf-8 -*-
# @author: lyg, ym
# @date: 2025-5-8
# @version: 1
# @description: docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
import docx
import docx.table
import json
from PIL import Image
import io
import re
import typing
 
from knowledgebase.doc.image_to_text import ImageToText
from knowledgebase.doc.models import ParagraphInfo
from knowledgebase.log import Log
from bs4 import BeautifulSoup
 
class DocSplit:
    """
    docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
    1.封装段落信息
    2.将图片转换为自然语言描述
    3.将表格转换为json格式
    4.将段落按照文档标题级别组合成树形结构
 
    """
 
    def __init__(self, docx_file: str, docx_type: str):
        """
        docx文档拆分
        :param docx_file: 要拆分的docx文件路径
        :param docx_type: 文档类型
        """
        self.docx_file = docx_file
        self.docx_type = docx_type
        self.image_to_text = ImageToText()
        self.paragraphs: list[ParagraphInfo] = []
        self.paragraph_tree: list[ParagraphInfo] = []
 
    def table_to_json(self, table: docx.table.Table):
        """
           将表格转换为 JSON 格式
 
           :param table: docx.table.Table - 要转换的表格对象
           :return list - 表格数据,以 JSON 格式表示
        """
        headers = []
        table_data = [headers]
        first_row = True
        row: docx.table._Row
        for row in table.rows:
            if first_row:
                for cell in row.cells:
                    headers.append(cell.text)
                first_row = False
                continue
            row_data = []
            row_idx = 0
            for cell in row.cells:
                if cell.tables:
                    # 嵌套表格处理
                    if len(cell.tables) == 1:
                        text = self.table_to_json(cell.tables[0])
                    else:
                        text = []
                        for tbl in cell.tables:
                            tbl_json = self.table_to_json(tbl)
                            text.append(tbl_json)
                else:
                    # 单元格文本获取
                    text = cell.text
                    if cell._element.xml.find("w:ins")!=-1:
                        soup = BeautifulSoup(cell._element.xml, "xml")
                        text = ''.join([x.get_text() for x in soup.find_all("w:t")])
                # row_data[headers[row_idx]] = text
                row_data.append(text)
                row_idx += 1
 
            table_data.append(row_data)
        return table_data
 
    def split(self):
        """
        将文档拆分成段落,并返回段落列表
 
        :return: list[ParagraphInfo] - 段落列表
        """
        Log.info(f"开始拆分文档:{self.docx_file}")
        document = docx.Document(self.docx_file)
        table_cnt = 0
        paragraph_cnt = 0
 
        for element in document.element.body:
            if element.tag.endswith('p'):  # 段落
                # 获取标题多级编号
                paragraph = document.paragraphs[paragraph_cnt]
                p_text = paragraph.text
                is_toc = paragraph.style.name.startswith('TOC') and '目' in p_text and '录' in p_text
                try:
                    num = element.pPr.numPr.numId.val
                    level = element.pPr.numPr.ilvl.val
                except:
                    num = 0
                    level = 0
                if p_text:
                    title_level = self.get_title_level(paragraph)
                    self.paragraphs.append(ParagraphInfo(p_text, title_level, num, level, is_toc))
                # 检查是否是图片,如果是图片则转换为文本
                img_data = self.get_image_text(paragraph)
                if img_data:
                    text = self.gen_text_from_img(img_data)
                    text = f"```图片(以下内容为图片描述)\n{text}\n```"
                    self.paragraphs.append(ParagraphInfo(text, 0, num, level))
                paragraph_cnt += 1
            elif element.tag.endswith('tbl'):  # 表格
                table = document.tables[table_cnt]  # 获取当前表格对象
                table_cnt += 1
                table_data = self.table_to_json(table)
                self.paragraphs.append(
                    ParagraphInfo("```json\n" + json.dumps(table_data, indent=4, ensure_ascii=False) + "\n```", 0))
            else:
                continue
        # 去除目录
        self.remove_toc(self.paragraphs)
        # 生成标题编号
        Log.info(f"开始生成标题编号和列表编号")
        self.gen_title_num(self.paragraphs)
        # 生成树形结构
        Log.info(f"开始生成树形结构")
        self.gen_paragraph_tree(self.paragraphs)
 
    @staticmethod
    def remove_toc(paragraphs: [ParagraphInfo]):
        rm_list = []
        for p in paragraphs:
            if p.is_toc:
                rm_list.append(p)
            elif rm_list and p.title_level == 1:
                break
            elif rm_list:
                rm_list.append(p)
        for p in rm_list:
            paragraphs.remove(p)
 
    @staticmethod
    def get_image_text(paragraph):
        """
        获取段落中的图片描述
        :param paragraph: 段落
        :return: 图片内容描述信息
        """
        # 遍历段落中的所有Run对象(图片通常在单独的Run中)
        for run in paragraph.runs:
            xml = run._element.xml
            if xml.find('v:imagedata') != -1:
                # 使用正则表达式查找r:id属性
                match = re.search(r'r:id="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob)
            if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
                # 使用正则表达式查找r:embed属性
                match = re.search(r'r:embed="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob)
        return None
 
    @staticmethod
    def gen_title_num(paragraphs: list[ParagraphInfo]):
        """
        生成标题编号和列表编号
        标题级别从1-9,0表示正文
 
        :param paragraphs: list[ParagraphInfo] - 段落列表
        :return: None
        """
        MAX_TITLE_LEVEL = 9  # 定义为常量,便于统一管理和扩展
        title_levels = [0] * MAX_TITLE_LEVEL  # 初始化为全0
 
        list_counters = [0] * MAX_TITLE_LEVEL
 
        def format_number(level: int, value: int) -> str:
            # 使用映射方式简化逻辑
            if level < 0 or level > 4:
                return str(value)
            formats = {
                0: lambda v: f"({v})",
                1: lambda v: f"{v})",
                2: lambda v: f"({chr(96 + v)})",
                3: lambda v: f"{chr(96 + v)})",
                4: lambda v: chr(96 + v),
            }
            return formats[level](value)
 
        for p in paragraphs:
            if p.title_level > 0:
                title_levels[p.title_level - 1] += 1
                for i in range(p.title_level, MAX_TITLE_LEVEL):
                    title_levels[i] = 0
                p.title_num = '.'.join([str(x) for x in title_levels[:p.title_level]])
                list_counters = [0] * MAX_TITLE_LEVEL
            else:
                # 处理列表编号
                if p.num > 0:
                    level = p.num_level
 
                    # 校验 level 合法性
                    if level < 0 or level >= MAX_TITLE_LEVEL:
                        continue
                    list_counters[level] += 1
 
                    # 重置当前层级之后的计数器
                    for l in range(level + 1, MAX_TITLE_LEVEL):
                        list_counters[l] = 0
 
                    # 当前层级递增并赋值
                    p.title_num = format_number(level, list_counters[level])
                else:
                    list_counters = [0] * MAX_TITLE_LEVEL
 
    @staticmethod
    def get_title_level(paragraph) -> int:
        """
        获取段落标题级别
 
        :param paragraph: docx.paragraph.Paragraph - 要获取标题级别的段落对象
        :return: int - 标题级别,0 表示非标题
        """
        style = paragraph.style
        if style and style.name.startswith('Heading'):
            # 获取标题级别
            level = int(style.name.split(' ')[1])
            return level
        elif style.base_style and style.base_style.name.startswith('Heading'):
            level = int(style.base_style.name.split(' ')[1])
            return level
        else:
            return 0
 
    @staticmethod
    def image_convert(_in: bytes) -> bytes:
        """
        将图片转换为png格式的bytes
        :param _in: bytes - 图片数据
        :return: bytes - png格式的图片数据
        """
        in_io = io.BytesIO()
        in_io.write(_in)
        img = Image.open(in_io, "r")
        out_io = io.BytesIO()
        img.save(out_io, "png")
        out_io.seek(0)
        return out_io.read()
 
    def gen_text_from_img(self, img_data: bytes):
        """
        利用LLM将图片转为文本
        :param img_data: bytes - 图片数据
        :return: str - 文本
        """
        return ''
        return self.image_to_text.gen_text_from_img(img_data)
 
    def gen_paragraph_tree(self, paragraphs: typing.List[ParagraphInfo]):
        """
        生成段落树结构,根据title_level划分段落树
 
        :param paragraphs: list[ParagraphInfo] - 段落列表(会被原地修改)
        """
        if not paragraphs:
            return
 
        stack = []
        result = []
        _paragraphs = []
 
        def merge_paragraph_text(info: ParagraphInfo):
            text_nodes = [child for child in info.children if child.title_level == 0]
            info.text += '\n' + '\n'.join([child.full_text for child in text_nodes])
            info.children = [child for child in info.children if child.title_level > 0]
 
        for p in paragraphs:
            if p.title_level == 1:
                result.append(p)
            # 清理栈顶比当前级别低或相等的节点
            while stack and p.title_level != 0 and stack[-1].title_level >= p.title_level:
                _p = stack.pop()
                merge_paragraph_text(_p)
 
            if p.title_level > 0:
                if len(stack):
                    stack[-1].children.append(p)
                stack.append(p)
                _paragraphs.append(p)
            elif len(stack):
                stack[-1].children.append(p)
            else:
                # 非标题段落直接加入结果
                result.append(p)
 
        while stack:
            merge_paragraph_text(stack.pop())
 
        # 替换原始列表内容,避免多次 remove 操作
        self.paragraphs[:] = _paragraphs
        self.paragraph_tree = result