# -*- coding: utf-8 -*-
|
#
|
# @author: lyg
|
# @date: 2025-5-7
|
# @version: 1
|
# @description:视觉识别文档内容
|
|
from langchain_openai.chat_models import ChatOpenAI
|
from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
|
from langchain_core.messages import HumanMessage, SystemMessage
|
from langchain_core.output_parsers import JsonOutputParser
|
from docx import Document
|
from PIL import Image
|
from io import BytesIO
|
import re
|
import json
|
import base64
|
|
|
class VisionTest:
|
def __init__(self, file):
|
self.llm = ChatOpenAI(temperature=0, model="qwen2.5-72b-instruct",
|
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
|
image = base64.b64encode(open(file, 'rb').read()).decode()
|
self.prompt = ChatPromptTemplate.from_messages([
|
SystemMessage("你是一个资深软件工程师,请分析图片回答问题。"),
|
HumanMessage(content=[
|
{"type": "text", "text": "{msg}"},
|
{
|
"type": "image_url",
|
"image_url": {"url": f"data:image/jpeg;base64,{image}"},
|
}
|
])
|
])
|
|
def run(self, msg):
|
chain = self.prompt | self.llm
|
resp = chain.invoke({"msg": msg})
|
print(resp.content)
|
|
# def get_document_chapters(doc_path):
|
# doc = Document(doc_path)
|
# chapters = []
|
# current_chapter = None
|
|
# for para in doc.paragraphs:
|
# if para.style.name.startswith('Heading'): # 检查是否为标题样式
|
# level = int(para.style.name.replace('Heading', '')) # 获取标题级别
|
# current_chapter = {'level': level, 'title': para.text, 'content': []}
|
# chapters.append(current_chapter)
|
# elif current_chapter is not None:
|
# current_chapter['content'].append(para.text) # 添加内容到当前章节
|
|
# return chapters
|
|
def has_image(self, paragraph):
|
# 通过检查XML中的嵌入式对象来判断是否有图片
|
xml = paragraph._element.xml
|
return 'w:object' in xml or 'w:drawing' in xml
|
|
def convert_blob_to_png_base64(self, image_blob):
|
try:
|
# 打开图片
|
image = Image.open(BytesIO(image_blob))
|
# 创建内存缓冲区
|
buffer = BytesIO()
|
# 保存为PNG格式
|
image.save(buffer, format="PNG")
|
# 获取PNG格式的二进制数据
|
png_data = buffer.getvalue()
|
# 转换为Base64编码
|
base64_data = base64.b64encode(png_data).decode('utf-8')
|
return base64_data
|
except Exception as e:
|
print(f"Error: {e}")
|
return None
|
|
def get_image_blob(self, paragraph):
|
# 遍历段落中的所有Run对象(图片通常在单独的Run中)
|
for run in paragraph.runs:
|
xml = run._element.xml
|
if xml.find('v:imagedata') != -1:
|
# 使用正则表达式查找r:id属性
|
match = re.search(r'r:id="([^"]+)"', xml)
|
if match:
|
r_id = match.group(1)
|
if r_id:
|
# 获取图片信息
|
image_part = paragraph.part.rels[r_id].target_part
|
return image_part.blob
|
if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
|
# 使用正则表达式查找r:embed属性
|
match = re.search(r'r:embed="([^"]+)"', xml)
|
if match:
|
r_id = match.group(1)
|
if r_id:
|
# 获取图片信息
|
image_part = paragraph.part.rels[r_id].target_part
|
return image_part.blob
|
return None
|
|
def loadDoc(self):
|
doc = Document('./static/doc/ZL格式(公开).docx')
|
# 按照标题获取段落的层级结构
|
titles = []
|
for paragraph in doc.paragraphs:
|
if paragraph.text != "":
|
# 文字不为空
|
if paragraph.style.base_style is not None:
|
# 有base_style
|
if paragraph.style.base_style.name.startswith('Heading'):
|
# 是标题
|
level = int(paragraph.style.base_style.name.split(' ')[-1])
|
obj = {}
|
obj["level"] = level
|
obj["text"] = paragraph.text
|
obj["child"] = []
|
titles.append(obj)
|
else:
|
length = len(titles)
|
if "child" in titles[length -1]:
|
obj = {}
|
obj["text"] = paragraph.text
|
titles[length -1]['child'].append(obj)
|
else:
|
# 没有base_style
|
length = len(titles)
|
obj = {}
|
obj["text"] = paragraph.text
|
if length > 0 and "child" in titles[length -1]:
|
# 如果是标题内的append进标题的child
|
titles[length -1]['child'].append(obj)
|
else:
|
# 非标题内的直接放在第一层
|
titles.append(obj)
|
else:
|
# 文字为空时,可能是图片或者表格
|
if self.has_image(paragraph):
|
# 当前段落为图片
|
obj = {}
|
# 获取图片的blob
|
img = self.get_image_blob(paragraph)
|
if img is not None:
|
imgBase64 = self.convert_blob_to_png_base64(img)
|
if imgBase64 is not None:
|
obj["imgBase64"] = imgBase64
|
titles[length -1]['child'].append(obj)
|
# 在这里扩展判断表格
|
print(titles)
|
# for para in doc.paragraphs:
|
# print(para.text)
|
# print('------------------------')
|
|
if __name__ == '__main__':
|
vision = VisionTest("./static/images/test.png")
|
# vision.run("问题")
|
vision.loadDoc()
|