lyg
2025-05-22 e60d75228fb161e464ca59fa2526bf0765f4d902
vision_test.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
#
#
# @author: lyg
# @date: 2025-5-7
# @version: 1
@@ -7,25 +7,24 @@
from langchain_openai.chat_models import ChatOpenAI
from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.messages import HumanMessage,SystemMessage
from langchain_core.output_parsers import JsonOutputParser
from docx import Document
from PIL import Image
from io import BytesIO
import re
import json
import base64
class VisionTest:
    def __init__(self, file):
        self.llm = ChatOpenAI(temperature=0, model="qwen2.5-72b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
    def __init__(self,file):
        self.llm = ChatOpenAI(temperature=0,
                              model="qwen2.5-72b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                              api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
        image = base64.b64encode(open(file, 'rb').read()).decode()
        self.prompt = ChatPromptTemplate.from_messages([
            SystemMessage("你是一个资深软件工程师,请分析图片回答问题。"),
            HumanMessage(content=[
                {"type": "text", "text": "{msg}"},
                {"type": "text", "text": "describe the weather in this image"},
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{image}"},
@@ -33,125 +32,11 @@
            ])
        ])
    def run(self, msg):
    def run(self,msg):
        chain = self.prompt | self.llm
        resp = chain.invoke({"msg": msg})
        print(resp.content)
    # def get_document_chapters(doc_path):
    #     doc = Document(doc_path)
    #     chapters = []
    #     current_chapter = None
    #     for para in doc.paragraphs:
    #         if para.style.name.startswith('Heading'):  # 检查是否为标题样式
    #             level = int(para.style.name.replace('Heading', ''))  # 获取标题级别
    #             current_chapter = {'level': level, 'title': para.text, 'content': []}
    #             chapters.append(current_chapter)
    #         elif current_chapter is not None:
    #             current_chapter['content'].append(para.text)  # 添加内容到当前章节
    #     return chapters
    def has_image(self, paragraph):
        # 通过检查XML中的嵌入式对象来判断是否有图片
        xml = paragraph._element.xml
        return 'w:object' in xml or 'w:drawing' in xml
    def convert_blob_to_png_base64(self, image_blob):
        try:
            # 打开图片
            image = Image.open(BytesIO(image_blob))
            # 创建内存缓冲区
            buffer = BytesIO()
            # 保存为PNG格式
            image.save(buffer, format="PNG")
            # 获取PNG格式的二进制数据
            png_data = buffer.getvalue()
            # 转换为Base64编码
            base64_data = base64.b64encode(png_data).decode('utf-8')
            return base64_data
        except Exception as e:
            print(f"Error: {e}")
            return None
    def get_image_blob(self, paragraph):
        # 遍历段落中的所有Run对象(图片通常在单独的Run中)
        for run in paragraph.runs:
            xml = run._element.xml
            if xml.find('v:imagedata') != -1:
                # 使用正则表达式查找r:id属性
                match = re.search(r'r:id="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return image_part.blob
            if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
                # 使用正则表达式查找r:embed属性
                match = re.search(r'r:embed="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return image_part.blob
        return None
    def loadDoc(self):
        doc = Document('./static/doc/ZL格式(公开).docx')
        # 按照标题获取段落的层级结构
        titles = []
        for paragraph in doc.paragraphs:
            if paragraph.text != "":
                # 文字不为空
                if paragraph.style.base_style is not None:
                    # 有base_style
                    if paragraph.style.base_style.name.startswith('Heading'):
                        # 是标题
                        level = int(paragraph.style.base_style.name.split(' ')[-1])
                        obj = {}
                        obj["level"] = level
                        obj["text"] = paragraph.text
                        obj["child"] = []
                        titles.append(obj)
                    else:
                        length = len(titles)
                        if "child" in titles[length -1]:
                            obj = {}
                            obj["text"] = paragraph.text
                            titles[length -1]['child'].append(obj)
                else:
                    # 没有base_style
                    length = len(titles)
                    obj = {}
                    obj["text"] = paragraph.text
                    if length > 0 and "child" in titles[length -1]:
                        # 如果是标题内的append进标题的child
                        titles[length -1]['child'].append(obj)
                    else:
                        # 非标题内的直接放在第一层
                        titles.append(obj)
            else:
                # 文字为空时,可能是图片或者表格
                if self.has_image(paragraph):
                    # 当前段落为图片
                    obj = {}
                    # 获取图片的blob
                    img = self.get_image_blob(paragraph)
                    if img is not None:
                        imgBase64 = self.convert_blob_to_png_base64(img)
                        if imgBase64 is not None:
                            obj["imgBase64"] = imgBase64
                            titles[length -1]['child'].append(obj)
                # 在这里扩展判断表格
        print(titles)
        # for para in doc.paragraphs:
        #     print(para.text)
        #     print('------------------------')
if __name__ == '__main__':
    vision = VisionTest("./static/images/test.png")
    # vision.run("问题")
    vision.loadDoc()
    vision = VisionTest("image_path")
    vision.run("问题")