.gitignore | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
knowledgebase/doc/docx_split.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
knowledgebase/doc/image_to_text.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
knowledgebase/llm.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
vision_test.py | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
.gitignore
@@ -5,4 +5,5 @@ /.conda /docs /out* /packages __pycache__ knowledgebase/doc/docx_split.py
New file @@ -0,0 +1,201 @@ # -*- coding: utf-8 -*- # # @author: lyg, ym # @date: 2025-5-8 # @version: 1 # @description: docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。 import docx import docx.table import json from dataclasses import dataclass from PIL import Image import io import re from knowledgebase.doc.image_to_text import ImageToText @dataclass class ParagraphInfo: """ 段落信息 :param text: str - 段落文本 :param level: int - 段落级别,1-9级标题,0表示正文 :param title_no: str - 标题编号,如1.1、1.1.1等 """ text: str level: int title_no: str @property def full_text(self): """ 获取段落完整文本,包含标题编号 :return: str - 段落完整文本 """ return f"{self.title_no} {self.text}" def __init__(self, text: str, level: int): """ 段落信息 :param text: str - 段落文本 :param level: int - 段落级别,1-9级标题,0表示正文 """ self.text = text self.level = level self.title_no = '' class DocSplit: """ docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。 1.封装段落信息 2.将图片和表格转换为json 3.将段落按照文档标题级别组合成树形结构 """ def __init__(self, doc_file): self.doc_file = doc_file self.image_to_text = ImageToText() self.paragraphs:list[ParagraphInfo] = [] def table_to_json(self, table: docx.table.Table): """ 将表格转换为 JSON 格式 :param table: docx.table.Table - 要转换的表格对象 :return list - 表格数据,以 JSON 格式表示 """ table_data = [] headers = [] first_row = True row: docx.table._Row for row in table.rows: if first_row: for cell in row.cells: headers.append(cell.text) first_row = False continue row_data = {} row_idx = 0 for cell in row.cells: if cell.tables: # 嵌套表格处理 if len(cell.tables) == 1: text = self.table_to_json(cell.tables[0]) else: text = [] for tbl in cell.tables: tbl_json = self.table_to_json(tbl) text.append(tbl_json) else: # 单元格文本获取 text = cell.text row_data[headers[row_idx]] = text row_idx += 1 table_data.append(row_data) return table_data def split(self): """ 将文档拆分成段落,并返回段落列表 :return: list[ParagraphInfo] - 段落列表 """ document = docx.Document(self.doc_file) table_cnt = 0 paragraph_cnt = 0 for element in document.element.body: if element.tag.endswith('p'): # 段落 # 获取标题多级编号 paragraph = document.paragraphs[paragraph_cnt] paragraph_text = paragraph.text if paragraph_text: self.paragraphs.append(ParagraphInfo(paragraph_text, self.get_header_level(paragraph))) # 检查是否是图片,如果是图片则转换为文本 img_data = self.get_image_blob(paragraph) if img_data: text = self.gen_text_from_img(img_data) self.paragraphs.append(ParagraphInfo(text, 0)) paragraph_cnt += 1 elif element.tag.endswith('tbl'): # 表格 table = document.tables[table_cnt] # 获取当前表格对象 table_cnt += 1 table_data = self.table_to_json(table) self.paragraphs.append(ParagraphInfo(json.dumps(table_data, indent=4, ensure_ascii=False), 0)) else: continue # 生成标题编号 self.gen_title_no(self.paragraphs) @staticmethod def get_image_blob(paragraph): # 遍历段落中的所有Run对象(图片通常在单独的Run中) for run in paragraph.runs: xml = run._element.xml if xml.find('v:imagedata') != -1: # 使用正则表达式查找r:id属性 match = re.search(r'r:id="([^"]+)"', xml) if match: r_id = match.group(1) if r_id: # 获取图片信息 image_part = paragraph.part.rels[r_id].target_part return DocSplit.image_convert(image_part.blob, "png") if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1: # 使用正则表达式查找r:embed属性 match = re.search(r'r:embed="([^"]+)"', xml) if match: r_id = match.group(1) if r_id: # 获取图片信息 image_part = paragraph.part.rels[r_id].target_part return DocSplit.image_convert(image_part.blob, "png") return None @staticmethod def gen_title_no(paragraphs: list[ParagraphInfo]): title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1] for i in range(len(paragraphs)): if paragraphs[i].level > 0: for j in range(paragraphs[i].level - 1): title_levels[j] = 1 paragraphs[i].title_no = '.'.join([str(x) for x in title_levels[0:paragraphs[i].level]]) title_levels[paragraphs[i].level - 1] += 1 else: title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1] @staticmethod def get_header_level(paragraph) -> int: if paragraph.style.base_style: style = paragraph.style.base_style else: style = paragraph.style if style and style.name.startswith('Heading'): # 获取标题级别 level = int(style.name.split(' ')[1]) return level else: return 0 @staticmethod def image_convert(_in: bytes, _out_format: str) -> bytes: in_io = io.BytesIO() in_io.write(_in) img = Image.open(in_io, "r") out_io = io.BytesIO() img.save(out_io, "png") out_io.seek(0) return out_io.read() def gen_text_from_img(self, img_data:bytes): return self.image_to_text.gen_text_from_img(img_data) if __name__ == '__main__': doc_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\ZL格式(公开).docx' doc_split = DocSplit(doc_file) doc_split.split() print("\n".join([x.full_text for x in doc_split.paragraphs])) knowledgebase/doc/image_to_text.py
New file @@ -0,0 +1,44 @@ # -*- coding: utf-8 -*- # # @author: lyg # @date: 2025-5-8 # @version: 1 # @description: 利用LLM将图片转为文本。 from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.output_parsers import JsonOutputParser import json import base64 from knowledgebase.llm import vision_llm class ImageToText: def __init__(self): self.llm = vision_llm self.prompt = ChatPromptTemplate.from_messages([ ("system", "你是一个资深软件工程师,请分析图片中的内容。"), ( "user", [ {"type": "text", "text": "{msg}"}, { "type": "image_url", "image_url": {"url": "data:image/jpeg;base64,{image}"}, } ], ) ]) def gen_text_from_img(self, image: bytes) -> str: """ 从图片生成文本。 :param image: 图片数据 :return: 文本 """ image = base64.b64encode(image).decode() chain = self.prompt | self.llm resp = chain.invoke({"msg": "使用自然语言输出图片中的内容,不要做过多的解释。输出格式为纯文本。", "image": image}) return resp.content knowledgebase/llm.py
New file @@ -0,0 +1,17 @@ # -*- coding: utf-8 -*- # # @author: lyg # @date: 2025-5-8 # @version: 1 # @description: 公共langchain LLM 实例 from langchain_openai.chat_models import ChatOpenAI llm = ChatOpenAI(temperature=0, model="qwen2.5-72b-instruct", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e") vision_llm = ChatOpenAI(temperature=0, model="qwen2.5-vl-32b-instruct", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e") vision_test.py
@@ -9,23 +9,22 @@ from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate from langchain_core.messages import HumanMessage, SystemMessage from langchain_core.output_parsers import JsonOutputParser from docx import Document from PIL import Image from io import BytesIO import re import json import base64 class VisionTest: def __init__(self, file): self.llm = ChatOpenAI(temperature=0, model="qwen2.5-72b-instruct", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e") self.llm = ChatOpenAI(temperature=0, model="qwen2.5-72b-instruct", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e") image = base64.b64encode(open(file, 'rb').read()).decode() self.prompt = ChatPromptTemplate.from_messages([ SystemMessage("你是一个资深软件工程师,请分析图片回答问题。"), HumanMessage(content=[ {"type": "text", "text": "{msg}"}, {"type": "text", "text": "describe the weather in this image"}, { "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{image}"}, @@ -38,120 +37,6 @@ resp = chain.invoke({"msg": msg}) print(resp.content) # def get_document_chapters(doc_path): # doc = Document(doc_path) # chapters = [] # current_chapter = None # for para in doc.paragraphs: # if para.style.name.startswith('Heading'): # 检查是否为标题样式 # level = int(para.style.name.replace('Heading', '')) # 获取标题级别 # current_chapter = {'level': level, 'title': para.text, 'content': []} # chapters.append(current_chapter) # elif current_chapter is not None: # current_chapter['content'].append(para.text) # 添加内容到当前章节 # return chapters def has_image(self, paragraph): # 通过检查XML中的嵌入式对象来判断是否有图片 xml = paragraph._element.xml return 'w:object' in xml or 'w:drawing' in xml def convert_blob_to_png_base64(self, image_blob): try: # 打开图片 image = Image.open(BytesIO(image_blob)) # 创建内存缓冲区 buffer = BytesIO() # 保存为PNG格式 image.save(buffer, format="PNG") # 获取PNG格式的二进制数据 png_data = buffer.getvalue() # 转换为Base64编码 base64_data = base64.b64encode(png_data).decode('utf-8') return base64_data except Exception as e: print(f"Error: {e}") return None def get_image_blob(self, paragraph): # 遍历段落中的所有Run对象(图片通常在单独的Run中) for run in paragraph.runs: xml = run._element.xml if xml.find('v:imagedata') != -1: # 使用正则表达式查找r:id属性 match = re.search(r'r:id="([^"]+)"', xml) if match: r_id = match.group(1) if r_id: # 获取图片信息 image_part = paragraph.part.rels[r_id].target_part return image_part.blob if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1: # 使用正则表达式查找r:embed属性 match = re.search(r'r:embed="([^"]+)"', xml) if match: r_id = match.group(1) if r_id: # 获取图片信息 image_part = paragraph.part.rels[r_id].target_part return image_part.blob return None def loadDoc(self): doc = Document('./static/doc/ZL格式(公开).docx') # 按照标题获取段落的层级结构 titles = [] for paragraph in doc.paragraphs: if paragraph.text != "": # 文字不为空 if paragraph.style.base_style is not None: # 有base_style if paragraph.style.base_style.name.startswith('Heading'): # 是标题 level = int(paragraph.style.base_style.name.split(' ')[-1]) obj = {} obj["level"] = level obj["text"] = paragraph.text obj["child"] = [] titles.append(obj) else: length = len(titles) if "child" in titles[length -1]: obj = {} obj["text"] = paragraph.text titles[length -1]['child'].append(obj) else: # 没有base_style length = len(titles) obj = {} obj["text"] = paragraph.text if length > 0 and "child" in titles[length -1]: # 如果是标题内的append进标题的child titles[length -1]['child'].append(obj) else: # 非标题内的直接放在第一层 titles.append(obj) else: # 文字为空时,可能是图片或者表格 if self.has_image(paragraph): # 当前段落为图片 obj = {} # 获取图片的blob img = self.get_image_blob(paragraph) if img is not None: imgBase64 = self.convert_blob_to_png_base64(img) if imgBase64 is not None: obj["imgBase64"] = imgBase64 titles[length -1]['child'].append(obj) # 在这里扩展判断表格 print(titles) # for para in doc.paragraphs: # print(para.text) # print('------------------------') if __name__ == '__main__': vision = VisionTest("./images/test.png") # vision.run("问题") vision.loadDoc() vision = VisionTest("image_path") vision.run("问题")