YM
2025-05-12 e22a78a2f2857ff98ec624b7c4f5c15b2c8362dd
Merge branch 'master' of http://182.92.203.7:2001/r/KnowledgeBase
2个文件已修改
6个文件已添加
429 ■■■■ 已修改文件
.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/docx_split.py 201 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/image_to_text.py 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/gen_base_db/__init__.py 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/gen_base_db/db_generate.py 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/gen_base_db/json_generate.py 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/llm.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
vision_test.py 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -5,4 +5,5 @@
/.conda
/docs
/out*
/packages
__pycache__
knowledgebase/doc/docx_split.py
New file
@@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
#
# @author: lyg, ym
# @date: 2025-5-8
# @version: 1
# @description: docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
import docx
import docx.table
import json
from dataclasses import dataclass
from PIL import Image
import io
import re
from knowledgebase.doc.image_to_text import ImageToText
@dataclass
class ParagraphInfo:
    """
    段落信息
    :param text: str - 段落文本
    :param level: int - 段落级别,1-9级标题,0表示正文
    :param title_no: str - 标题编号,如1.1、1.1.1等
    """
    text: str
    level: int
    title_no: str
    @property
    def full_text(self):
        """
        获取段落完整文本,包含标题编号
        :return: str - 段落完整文本
        """
        return f"{self.title_no} {self.text}"
    def __init__(self, text: str, level: int):
        """
        段落信息
        :param text: str - 段落文本
        :param level: int - 段落级别,1-9级标题,0表示正文
        """
        self.text = text
        self.level = level
        self.title_no = ''
class DocSplit:
    """
    docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
    1.封装段落信息
    2.将图片和表格转换为json
    3.将段落按照文档标题级别组合成树形结构
    """
    def __init__(self, doc_file):
        self.doc_file = doc_file
        self.image_to_text = ImageToText()
        self.paragraphs:list[ParagraphInfo] = []
    def table_to_json(self, table: docx.table.Table):
        """
           将表格转换为 JSON 格式
           :param table: docx.table.Table - 要转换的表格对象
           :return list - 表格数据,以 JSON 格式表示
        """
        table_data = []
        headers = []
        first_row = True
        row: docx.table._Row
        for row in table.rows:
            if first_row:
                for cell in row.cells:
                    headers.append(cell.text)
                first_row = False
                continue
            row_data = {}
            row_idx = 0
            for cell in row.cells:
                if cell.tables:
                    # 嵌套表格处理
                    if len(cell.tables) == 1:
                        text = self.table_to_json(cell.tables[0])
                    else:
                        text = []
                        for tbl in cell.tables:
                            tbl_json = self.table_to_json(tbl)
                            text.append(tbl_json)
                else:
                    # 单元格文本获取
                    text = cell.text
                row_data[headers[row_idx]] = text
                row_idx += 1
            table_data.append(row_data)
        return table_data
    def split(self):
        """
        将文档拆分成段落,并返回段落列表
        :return: list[ParagraphInfo] - 段落列表
        """
        document = docx.Document(self.doc_file)
        table_cnt = 0
        paragraph_cnt = 0
        for element in document.element.body:
            if element.tag.endswith('p'):  # 段落
                # 获取标题多级编号
                paragraph = document.paragraphs[paragraph_cnt]
                paragraph_text = paragraph.text
                if paragraph_text:
                    self.paragraphs.append(ParagraphInfo(paragraph_text, self.get_header_level(paragraph)))
                # 检查是否是图片,如果是图片则转换为文本
                img_data = self.get_image_blob(paragraph)
                if img_data:
                    text = self.gen_text_from_img(img_data)
                    self.paragraphs.append(ParagraphInfo(text, 0))
                paragraph_cnt += 1
            elif element.tag.endswith('tbl'):  # 表格
                table = document.tables[table_cnt]  # 获取当前表格对象
                table_cnt += 1
                table_data = self.table_to_json(table)
                self.paragraphs.append(ParagraphInfo(json.dumps(table_data, indent=4, ensure_ascii=False), 0))
            else:
                continue
        # 生成标题编号
        self.gen_title_no(self.paragraphs)
    @staticmethod
    def get_image_blob(paragraph):
        # 遍历段落中的所有Run对象(图片通常在单独的Run中)
        for run in paragraph.runs:
            xml = run._element.xml
            if xml.find('v:imagedata') != -1:
                # 使用正则表达式查找r:id属性
                match = re.search(r'r:id="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob, "png")
            if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
                # 使用正则表达式查找r:embed属性
                match = re.search(r'r:embed="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob, "png")
        return None
    @staticmethod
    def gen_title_no(paragraphs: list[ParagraphInfo]):
        title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1]
        for i in range(len(paragraphs)):
            if paragraphs[i].level > 0:
                for j in range(paragraphs[i].level - 1):
                    title_levels[j] = 1
                paragraphs[i].title_no = '.'.join([str(x) for x in title_levels[0:paragraphs[i].level]])
                title_levels[paragraphs[i].level - 1] += 1
            else:
                title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1]
    @staticmethod
    def get_header_level(paragraph) -> int:
        if paragraph.style.base_style:
            style = paragraph.style.base_style
        else:
            style = paragraph.style
        if style and style.name.startswith('Heading'):
            # 获取标题级别
            level = int(style.name.split(' ')[1])
            return level
        else:
            return 0
    @staticmethod
    def image_convert(_in: bytes, _out_format: str) -> bytes:
        in_io = io.BytesIO()
        in_io.write(_in)
        img = Image.open(in_io, "r")
        out_io = io.BytesIO()
        img.save(out_io, "png")
        out_io.seek(0)
        return out_io.read()
    def gen_text_from_img(self, img_data:bytes):
        return self.image_to_text.gen_text_from_img(img_data)
if __name__ == '__main__':
    doc_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\ZL格式(公开).docx'
    doc_split = DocSplit(doc_file)
    doc_split.split()
    print("\n".join([x.full_text for x in doc_split.paragraphs]))
knowledgebase/doc/image_to_text.py
New file
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-8
# @version: 1
# @description: 利用LLM将图片转为文本。
from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.output_parsers import JsonOutputParser
import json
import base64
from knowledgebase.llm import vision_llm
class ImageToText:
    def __init__(self):
        self.llm = vision_llm
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个资深软件工程师,请分析图片中的内容。"),
            (
                "user",
                [
                    {"type": "text", "text": "{msg}"},
                    {
                        "type": "image_url",
                        "image_url": {"url": "data:image/jpeg;base64,{image}"},
                    }
                ],
            )
        ])
    def gen_text_from_img(self, image: bytes) -> str:
        """
        从图片生成文本。
        :param image:  图片数据
        :return: 文本
        """
        image = base64.b64encode(image).decode()
        chain = self.prompt | self.llm
        resp = chain.invoke({"msg": "使用自然语言输出图片中的内容,不要做过多的解释。输出格式为纯文本。", "image": image})
        return resp.content
knowledgebase/gen_base_db/__init__.py
New file
@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
#
# @author:
# @date:
# @version:
# @description:
knowledgebase/gen_base_db/db_generate.py
New file
@@ -0,0 +1,10 @@
# -*- coding: utf-8 -*-
#
# @author:
# @date:
# @version:
# @description:
class DbGenerate:
    def __init__(self):
        pass
knowledgebase/gen_base_db/json_generate.py
New file
@@ -0,0 +1,11 @@
# -*- coding: utf-8 -*-
#
# @author:
# @date:
# @version:
# @description:
from knowledgebase.llm import llm
class JsonGenerate:
    def __init__(self):
        self.llm = llm
knowledgebase/llm.py
New file
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-8
# @version: 1
# @description: 公共langchain LLM 实例
from langchain_openai.chat_models import ChatOpenAI
llm = ChatOpenAI(temperature=0,
                              model="qwen2.5-72b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                              api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
vision_llm = ChatOpenAI(temperature=0,
                              model="qwen2.5-vl-32b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                              api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
vision_test.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
#
#
# @author: lyg
# @date: 2025-5-7
# @version: 1
@@ -7,25 +7,24 @@
from langchain_openai.chat_models import ChatOpenAI
from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.messages import HumanMessage,SystemMessage
from langchain_core.output_parsers import JsonOutputParser
from docx import Document
from PIL import Image
from io import BytesIO
import re
import json
import base64
class VisionTest:
    def __init__(self, file):
        self.llm = ChatOpenAI(temperature=0, model="qwen2.5-72b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
    def __init__(self,file):
        self.llm = ChatOpenAI(temperature=0,
                              model="qwen2.5-72b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                              api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
        image = base64.b64encode(open(file, 'rb').read()).decode()
        self.prompt = ChatPromptTemplate.from_messages([
            SystemMessage("你是一个资深软件工程师,请分析图片回答问题。"),
            HumanMessage(content=[
                {"type": "text", "text": "{msg}"},
                {"type": "text", "text": "describe the weather in this image"},
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{image}"},
@@ -33,125 +32,11 @@
            ])
        ])
    def run(self, msg):
    def run(self,msg):
        chain = self.prompt | self.llm
        resp = chain.invoke({"msg": msg})
        print(resp.content)
    # def get_document_chapters(doc_path):
    #     doc = Document(doc_path)
    #     chapters = []
    #     current_chapter = None
    #     for para in doc.paragraphs:
    #         if para.style.name.startswith('Heading'):  # 检查是否为标题样式
    #             level = int(para.style.name.replace('Heading', ''))  # 获取标题级别
    #             current_chapter = {'level': level, 'title': para.text, 'content': []}
    #             chapters.append(current_chapter)
    #         elif current_chapter is not None:
    #             current_chapter['content'].append(para.text)  # 添加内容到当前章节
    #     return chapters
    def has_image(self, paragraph):
        # 通过检查XML中的嵌入式对象来判断是否有图片
        xml = paragraph._element.xml
        return 'w:object' in xml or 'w:drawing' in xml
    def convert_blob_to_png_base64(self, image_blob):
        try:
            # 打开图片
            image = Image.open(BytesIO(image_blob))
            # 创建内存缓冲区
            buffer = BytesIO()
            # 保存为PNG格式
            image.save(buffer, format="PNG")
            # 获取PNG格式的二进制数据
            png_data = buffer.getvalue()
            # 转换为Base64编码
            base64_data = base64.b64encode(png_data).decode('utf-8')
            return base64_data
        except Exception as e:
            print(f"Error: {e}")
            return None
    def get_image_blob(self, paragraph):
        # 遍历段落中的所有Run对象(图片通常在单独的Run中)
        for run in paragraph.runs:
            xml = run._element.xml
            if xml.find('v:imagedata') != -1:
                # 使用正则表达式查找r:id属性
                match = re.search(r'r:id="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return image_part.blob
            if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
                # 使用正则表达式查找r:embed属性
                match = re.search(r'r:embed="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return image_part.blob
        return None
    def loadDoc(self):
        doc = Document('./static/doc/ZL格式(公开).docx')
        # 按照标题获取段落的层级结构
        titles = []
        for paragraph in doc.paragraphs:
            if paragraph.text != "":
                # 文字不为空
                if paragraph.style.base_style is not None:
                    # 有base_style
                    if paragraph.style.base_style.name.startswith('Heading'):
                        # 是标题
                        level = int(paragraph.style.base_style.name.split(' ')[-1])
                        obj = {}
                        obj["level"] = level
                        obj["text"] = paragraph.text
                        obj["child"] = []
                        titles.append(obj)
                    else:
                        length = len(titles)
                        if "child" in titles[length -1]:
                            obj = {}
                            obj["text"] = paragraph.text
                            titles[length -1]['child'].append(obj)
                else:
                    # 没有base_style
                    length = len(titles)
                    obj = {}
                    obj["text"] = paragraph.text
                    if length > 0 and "child" in titles[length -1]:
                        # 如果是标题内的append进标题的child
                        titles[length -1]['child'].append(obj)
                    else:
                        # 非标题内的直接放在第一层
                        titles.append(obj)
            else:
                # 文字为空时,可能是图片或者表格
                if self.has_image(paragraph):
                    # 当前段落为图片
                    obj = {}
                    # 获取图片的blob
                    img = self.get_image_blob(paragraph)
                    if img is not None:
                        imgBase64 = self.convert_blob_to_png_base64(img)
                        if imgBase64 is not None:
                            obj["imgBase64"] = imgBase64
                            titles[length -1]['child'].append(obj)
                # 在这里扩展判断表格
        print(titles)
        # for para in doc.paragraphs:
        #     print(para.text)
        #     print('------------------------')
if __name__ == '__main__':
    vision = VisionTest("./static/images/test.png")
    # vision.run("问题")
    vision.loadDoc()
    vision = VisionTest("image_path")
    vision.run("问题")