YM
2025-05-08 b2bef1e9348fef4010ed713497fdea85fc751c66
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-7
# @version: 1
# @description:视觉识别文档内容
 
from langchain_openai.chat_models import ChatOpenAI
from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.output_parsers import JsonOutputParser
from docx import Document
from PIL import Image
from io import BytesIO
import re
import json
import base64
 
 
class VisionTest:
    def __init__(self, file):
        self.llm = ChatOpenAI(temperature=0, model="qwen2.5-72b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
        image = base64.b64encode(open(file, 'rb').read()).decode()
        self.prompt = ChatPromptTemplate.from_messages([
            SystemMessage("你是一个资深软件工程师,请分析图片回答问题。"),
            HumanMessage(content=[
                {"type": "text", "text": "{msg}"},
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{image}"},
                }
            ])
        ])
 
    def run(self, msg):
        chain = self.prompt | self.llm
        resp = chain.invoke({"msg": msg})
        print(resp.content)
 
    # def get_document_chapters(doc_path):
    #     doc = Document(doc_path)
    #     chapters = []
    #     current_chapter = None
        
    #     for para in doc.paragraphs:
    #         if para.style.name.startswith('Heading'):  # 检查是否为标题样式
    #             level = int(para.style.name.replace('Heading', ''))  # 获取标题级别
    #             current_chapter = {'level': level, 'title': para.text, 'content': []}
    #             chapters.append(current_chapter)
    #         elif current_chapter is not None:
    #             current_chapter['content'].append(para.text)  # 添加内容到当前章节
        
    #     return chapters
 
    def has_image(self, paragraph):
        # 通过检查XML中的嵌入式对象来判断是否有图片
        xml = paragraph._element.xml
        return 'w:object' in xml or 'w:drawing' in xml
    
    def convert_blob_to_png_base64(self, image_blob):
        try:
            # 打开图片
            image = Image.open(BytesIO(image_blob))
            # 创建内存缓冲区
            buffer = BytesIO()
            # 保存为PNG格式
            image.save(buffer, format="PNG")
            # 获取PNG格式的二进制数据
            png_data = buffer.getvalue()
            # 转换为Base64编码
            base64_data = base64.b64encode(png_data).decode('utf-8')
            return base64_data
        except Exception as e:
            print(f"Error: {e}")
            return None
        
    def get_image_blob(self, paragraph):
        # 遍历段落中的所有Run对象(图片通常在单独的Run中)
        for run in paragraph.runs:
            xml = run._element.xml
            if xml.find('v:imagedata') != -1:
                # 使用正则表达式查找r:id属性
                match = re.search(r'r:id="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return image_part.blob
            if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
                # 使用正则表达式查找r:embed属性
                match = re.search(r'r:embed="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return image_part.blob
        return None
 
    def loadDoc(self):
        doc = Document('./static/doc/ZL格式(公开).docx')
        # 按照标题获取段落的层级结构
        titles = []
        for paragraph in doc.paragraphs:
            if paragraph.text != "":
                # 文字不为空
                if paragraph.style.base_style is not None:
                    # 有base_style
                    if paragraph.style.base_style.name.startswith('Heading'):
                        # 是标题
                        level = int(paragraph.style.base_style.name.split(' ')[-1])
                        obj = {}
                        obj["level"] = level
                        obj["text"] = paragraph.text
                        obj["child"] = []
                        titles.append(obj)
                    else:
                        length = len(titles)
                        if "child" in titles[length -1]:
                            obj = {}
                            obj["text"] = paragraph.text
                            titles[length -1]['child'].append(obj)
                else:
                    # 没有base_style
                    length = len(titles)
                    obj = {}
                    obj["text"] = paragraph.text
                    if length > 0 and "child" in titles[length -1]:
                        # 如果是标题内的append进标题的child
                        titles[length -1]['child'].append(obj)
                    else:
                        # 非标题内的直接放在第一层
                        titles.append(obj)
            else:
                # 文字为空时,可能是图片或者表格
                if self.has_image(paragraph):
                    # 当前段落为图片
                    obj = {}
                    # 获取图片的blob
                    img = self.get_image_blob(paragraph)
                    if img is not None:
                        imgBase64 = self.convert_blob_to_png_base64(img)
                        if imgBase64 is not None:
                            obj["imgBase64"] = imgBase64
                            titles[length -1]['child'].append(obj)
                # 在这里扩展判断表格
        print(titles)
        # for para in doc.paragraphs:
        #     print(para.text)
        #     print('------------------------')
 
if __name__ == '__main__':
    vision = VisionTest("./images/test.png")
    # vision.run("问题")
    vision.loadDoc()