| | |
| | | # -*- coding: utf-8 -*- |
| | | # |
| | | # |
| | | # @author: lyg |
| | | # @date: 2025-5-7 |
| | | # @version: 1 |
| | |
| | | |
| | | from langchain_openai.chat_models import ChatOpenAI |
| | | from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate |
| | | from langchain_core.messages import HumanMessage, SystemMessage |
| | | from langchain_core.messages import HumanMessage,SystemMessage |
| | | from langchain_core.output_parsers import JsonOutputParser |
| | | from docx import Document |
| | | from PIL import Image |
| | | from io import BytesIO |
| | | import re |
| | | import json |
| | | import base64 |
| | | |
| | | |
| | | class VisionTest: |
| | | def __init__(self, file): |
| | | self.llm = ChatOpenAI(temperature=0, model="qwen2.5-72b-instruct", |
| | | base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e") |
| | | def __init__(self,file): |
| | | self.llm = ChatOpenAI(temperature=0, |
| | | model="qwen2.5-72b-instruct", |
| | | base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", |
| | | api_key="sk-15ecf7e273ad4b729c7f7f42b542749e") |
| | | |
| | | image = base64.b64encode(open(file, 'rb').read()).decode() |
| | | self.prompt = ChatPromptTemplate.from_messages([ |
| | | SystemMessage("你是一个资深软件工程师,请分析图片回答问题。"), |
| | | HumanMessage(content=[ |
| | | {"type": "text", "text": "{msg}"}, |
| | | {"type": "text", "text": "describe the weather in this image"}, |
| | | { |
| | | "type": "image_url", |
| | | "image_url": {"url": f"data:image/jpeg;base64,{image}"}, |
| | |
| | | ]) |
| | | ]) |
| | | |
| | | def run(self, msg): |
| | | def run(self,msg): |
| | | chain = self.prompt | self.llm |
| | | resp = chain.invoke({"msg": msg}) |
| | | print(resp.content) |
| | | |
| | | # def get_document_chapters(doc_path): |
| | | # doc = Document(doc_path) |
| | | # chapters = [] |
| | | # current_chapter = None |
| | | |
| | | # for para in doc.paragraphs: |
| | | # if para.style.name.startswith('Heading'): # 检查是否为标题样式 |
| | | # level = int(para.style.name.replace('Heading', '')) # 获取标题级别 |
| | | # current_chapter = {'level': level, 'title': para.text, 'content': []} |
| | | # chapters.append(current_chapter) |
| | | # elif current_chapter is not None: |
| | | # current_chapter['content'].append(para.text) # 添加内容到当前章节 |
| | | |
| | | # return chapters |
| | | |
| | | def has_image(self, paragraph): |
| | | # 通过检查XML中的嵌入式对象来判断是否有图片 |
| | | xml = paragraph._element.xml |
| | | return 'w:object' in xml or 'w:drawing' in xml |
| | | |
| | | def convert_blob_to_png_base64(self, image_blob): |
| | | try: |
| | | # 打开图片 |
| | | image = Image.open(BytesIO(image_blob)) |
| | | # 创建内存缓冲区 |
| | | buffer = BytesIO() |
| | | # 保存为PNG格式 |
| | | image.save(buffer, format="PNG") |
| | | # 获取PNG格式的二进制数据 |
| | | png_data = buffer.getvalue() |
| | | # 转换为Base64编码 |
| | | base64_data = base64.b64encode(png_data).decode('utf-8') |
| | | return base64_data |
| | | except Exception as e: |
| | | print(f"Error: {e}") |
| | | return None |
| | | |
| | | def get_image_blob(self, paragraph): |
| | | # 遍历段落中的所有Run对象(图片通常在单独的Run中) |
| | | for run in paragraph.runs: |
| | | xml = run._element.xml |
| | | if xml.find('v:imagedata') != -1: |
| | | # 使用正则表达式查找r:id属性 |
| | | match = re.search(r'r:id="([^"]+)"', xml) |
| | | if match: |
| | | r_id = match.group(1) |
| | | if r_id: |
| | | # 获取图片信息 |
| | | image_part = paragraph.part.rels[r_id].target_part |
| | | return image_part.blob |
| | | if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1: |
| | | # 使用正则表达式查找r:embed属性 |
| | | match = re.search(r'r:embed="([^"]+)"', xml) |
| | | if match: |
| | | r_id = match.group(1) |
| | | if r_id: |
| | | # 获取图片信息 |
| | | image_part = paragraph.part.rels[r_id].target_part |
| | | return image_part.blob |
| | | return None |
| | | |
| | | def loadDoc(self): |
| | | doc = Document('./static/doc/ZL格式(公开).docx') |
| | | # 按照标题获取段落的层级结构 |
| | | titles = [] |
| | | for paragraph in doc.paragraphs: |
| | | if paragraph.text != "": |
| | | # 文字不为空 |
| | | if paragraph.style.base_style is not None: |
| | | # 有base_style |
| | | if paragraph.style.base_style.name.startswith('Heading'): |
| | | # 是标题 |
| | | level = int(paragraph.style.base_style.name.split(' ')[-1]) |
| | | obj = {} |
| | | obj["level"] = level |
| | | obj["text"] = paragraph.text |
| | | obj["child"] = [] |
| | | titles.append(obj) |
| | | else: |
| | | length = len(titles) |
| | | if "child" in titles[length -1]: |
| | | obj = {} |
| | | obj["text"] = paragraph.text |
| | | titles[length -1]['child'].append(obj) |
| | | else: |
| | | # 没有base_style |
| | | length = len(titles) |
| | | obj = {} |
| | | obj["text"] = paragraph.text |
| | | if length > 0 and "child" in titles[length -1]: |
| | | # 如果是标题内的append进标题的child |
| | | titles[length -1]['child'].append(obj) |
| | | else: |
| | | # 非标题内的直接放在第一层 |
| | | titles.append(obj) |
| | | else: |
| | | # 文字为空时,可能是图片或者表格 |
| | | if self.has_image(paragraph): |
| | | # 当前段落为图片 |
| | | obj = {} |
| | | # 获取图片的blob |
| | | img = self.get_image_blob(paragraph) |
| | | if img is not None: |
| | | imgBase64 = self.convert_blob_to_png_base64(img) |
| | | if imgBase64 is not None: |
| | | obj["imgBase64"] = imgBase64 |
| | | titles[length -1]['child'].append(obj) |
| | | # 在这里扩展判断表格 |
| | | print(titles) |
| | | # for para in doc.paragraphs: |
| | | # print(para.text) |
| | | # print('------------------------') |
| | | |
| | | if __name__ == '__main__': |
| | | vision = VisionTest("./static/images/test.png") |
| | | # vision.run("问题") |
| | | vision.loadDoc() |
| | | vision = VisionTest("image_path") |
| | | vision.run("问题") |