8cbd6856681a80b7d3336cbd5ccc1f6f003d394e..494637879bc8f5dd9c3d43481927b4a0c07e2f34
2 天以前 lyg
docx文档切割,表格转json,图片内容识别为文本。
494637 对比 | 目录
2 天以前 YM
文档解析,获取章节结构,并获取图片转换为base64
b2bef1 对比 | 目录
1个文件已修改
6个文件已添加
308 ■■■■■ 已修改文件
.gitignore 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/docx_split.py 201 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/image_to_text.py 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/llm.py 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
static/doc/ZL格式(公开).docx 补丁 | 查看 | 原始文档 | blame | 历史
static/images/test.png 补丁 | 查看 | 原始文档 | blame | 历史
vision_test.py 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -4,4 +4,6 @@
/datas
/.conda
/docs
/out*
/out*
/packages
__pycache__
knowledgebase/doc/docx_split.py
New file
@@ -0,0 +1,201 @@
# -*- coding: utf-8 -*-
#
# @author: lyg, ym
# @date: 2025-5-8
# @version: 1
# @description: docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
import docx
import docx.table
import json
from dataclasses import dataclass
from PIL import Image
import io
import re
from knowledgebase.doc.image_to_text import ImageToText
@dataclass
class ParagraphInfo:
    """
    æ®µè½ä¿¡æ¯
    :param text: str - æ®µè½æ–‡æœ¬
    :param level: int - æ®µè½çº§åˆ«ï¼Œ1-9级标题,0表示正文
    :param title_no: str - æ ‡é¢˜ç¼–号,如1.1、1.1.1等
    """
    text: str
    level: int
    title_no: str
    @property
    def full_text(self):
        """
        èŽ·å–æ®µè½å®Œæ•´æ–‡æœ¬ï¼ŒåŒ…å«æ ‡é¢˜ç¼–å·
        :return: str - æ®µè½å®Œæ•´æ–‡æœ¬
        """
        return f"{self.title_no} {self.text}"
    def __init__(self, text: str, level: int):
        """
        æ®µè½ä¿¡æ¯
        :param text: str - æ®µè½æ–‡æœ¬
        :param level: int - æ®µè½çº§åˆ«ï¼Œ1-9级标题,0表示正文
        """
        self.text = text
        self.level = level
        self.title_no = ''
class DocSplit:
    """
    docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
    1.封装段落信息
    2.将图片和表格转换为json
    3.将段落按照文档标题级别组合成树形结构
    """
    def __init__(self, doc_file):
        self.doc_file = doc_file
        self.image_to_text = ImageToText()
        self.paragraphs:list[ParagraphInfo] = []
    def table_to_json(self, table: docx.table.Table):
        """
           å°†è¡¨æ ¼è½¬æ¢ä¸º JSON æ ¼å¼
           :param table: docx.table.Table - è¦è½¬æ¢çš„表格对象
           :return list - è¡¨æ ¼æ•°æ®ï¼Œä»¥ JSON æ ¼å¼è¡¨ç¤º
        """
        table_data = []
        headers = []
        first_row = True
        row: docx.table._Row
        for row in table.rows:
            if first_row:
                for cell in row.cells:
                    headers.append(cell.text)
                first_row = False
                continue
            row_data = {}
            row_idx = 0
            for cell in row.cells:
                if cell.tables:
                    # åµŒå¥—表格处理
                    if len(cell.tables) == 1:
                        text = self.table_to_json(cell.tables[0])
                    else:
                        text = []
                        for tbl in cell.tables:
                            tbl_json = self.table_to_json(tbl)
                            text.append(tbl_json)
                else:
                    # å•元格文本获取
                    text = cell.text
                row_data[headers[row_idx]] = text
                row_idx += 1
            table_data.append(row_data)
        return table_data
    def split(self):
        """
        å°†æ–‡æ¡£æ‹†åˆ†æˆæ®µè½ï¼Œå¹¶è¿”回段落列表
        :return: list[ParagraphInfo] - æ®µè½åˆ—表
        """
        document = docx.Document(self.doc_file)
        table_cnt = 0
        paragraph_cnt = 0
        for element in document.element.body:
            if element.tag.endswith('p'):  # æ®µè½
                # èŽ·å–æ ‡é¢˜å¤šçº§ç¼–å·
                paragraph = document.paragraphs[paragraph_cnt]
                paragraph_text = paragraph.text
                if paragraph_text:
                    self.paragraphs.append(ParagraphInfo(paragraph_text, self.get_header_level(paragraph)))
                # æ£€æŸ¥æ˜¯å¦æ˜¯å›¾ç‰‡ï¼Œå¦‚果是图片则转换为文本
                img_data = self.get_image_blob(paragraph)
                if img_data:
                    text = self.gen_text_from_img(img_data)
                    self.paragraphs.append(ParagraphInfo(text, 0))
                paragraph_cnt += 1
            elif element.tag.endswith('tbl'):  # è¡¨æ ¼
                table = document.tables[table_cnt]  # èŽ·å–å½“å‰è¡¨æ ¼å¯¹è±¡
                table_cnt += 1
                table_data = self.table_to_json(table)
                self.paragraphs.append(ParagraphInfo(json.dumps(table_data, indent=4, ensure_ascii=False), 0))
            else:
                continue
        # ç”Ÿæˆæ ‡é¢˜ç¼–号
        self.gen_title_no(self.paragraphs)
    @staticmethod
    def get_image_blob(paragraph):
        # éåŽ†æ®µè½ä¸­çš„æ‰€æœ‰Run对象(图片通常在单独的Run中)
        for run in paragraph.runs:
            xml = run._element.xml
            if xml.find('v:imagedata') != -1:
                # ä½¿ç”¨æ­£åˆ™è¡¨è¾¾å¼æŸ¥æ‰¾r:id属性
                match = re.search(r'r:id="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # èŽ·å–å›¾ç‰‡ä¿¡æ¯
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob, "png")
            if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
                # ä½¿ç”¨æ­£åˆ™è¡¨è¾¾å¼æŸ¥æ‰¾r:embed属性
                match = re.search(r'r:embed="([^"]+)"', xml)
                if match:
                    r_id = match.group(1)
                    if r_id:
                        # èŽ·å–å›¾ç‰‡ä¿¡æ¯
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob, "png")
        return None
    @staticmethod
    def gen_title_no(paragraphs: list[ParagraphInfo]):
        title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1]
        for i in range(len(paragraphs)):
            if paragraphs[i].level > 0:
                for j in range(paragraphs[i].level - 1):
                    title_levels[j] = 1
                paragraphs[i].title_no = '.'.join([str(x) for x in title_levels[0:paragraphs[i].level]])
                title_levels[paragraphs[i].level - 1] += 1
            else:
                title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1]
    @staticmethod
    def get_header_level(paragraph) -> int:
        if paragraph.style.base_style:
            style = paragraph.style.base_style
        else:
            style = paragraph.style
        if style and style.name.startswith('Heading'):
            # èŽ·å–æ ‡é¢˜çº§åˆ«
            level = int(style.name.split(' ')[1])
            return level
        else:
            return 0
    @staticmethod
    def image_convert(_in: bytes, _out_format: str) -> bytes:
        in_io = io.BytesIO()
        in_io.write(_in)
        img = Image.open(in_io, "r")
        out_io = io.BytesIO()
        img.save(out_io, "png")
        out_io.seek(0)
        return out_io.read()
    def gen_text_from_img(self, img_data:bytes):
        return self.image_to_text.gen_text_from_img(img_data)
if __name__ == '__main__':
    doc_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\ZL格式(公开).docx'
    doc_split = DocSplit(doc_file)
    doc_split.split()
    print("\n".join([x.full_text for x in doc_split.paragraphs]))
knowledgebase/doc/image_to_text.py
New file
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-8
# @version: 1
# @description: åˆ©ç”¨LLM将图片转为文本。
from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.output_parsers import JsonOutputParser
import json
import base64
from knowledgebase.llm import vision_llm
class ImageToText:
    def __init__(self):
        self.llm = vision_llm
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", "你是一个资深软件工程师,请分析图片中的内容。"),
            (
                "user",
                [
                    {"type": "text", "text": "{msg}"},
                    {
                        "type": "image_url",
                        "image_url": {"url": "data:image/jpeg;base64,{image}"},
                    }
                ],
            )
        ])
    def gen_text_from_img(self, image: bytes) -> str:
        """
        ä»Žå›¾ç‰‡ç”Ÿæˆæ–‡æœ¬ã€‚
        :param image:  å›¾ç‰‡æ•°æ®
        :return: æ–‡æœ¬
        """
        image = base64.b64encode(image).decode()
        chain = self.prompt | self.llm
        resp = chain.invoke({"msg": "使用自然语言输出图片中的内容,不要做过多的解释。输出格式为纯文本。", "image": image})
        return resp.content
knowledgebase/llm.py
New file
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-8
# @version: 1
# @description: å…¬å…±langchain LLM å®žä¾‹
from langchain_openai.chat_models import ChatOpenAI
llm = ChatOpenAI(temperature=0,
                              model="qwen2.5-72b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                              api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
vision_llm = ChatOpenAI(temperature=0,
                              model="qwen2.5-vl-32b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                              api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
static/doc/ZL¸ñʽ(¹«¿ª).docx
Binary files differ
static/images/test.png
vision_test.py
New file
@@ -0,0 +1,42 @@
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-7
# @version: 1
# @description:视觉识别文档内容
from langchain_openai.chat_models import ChatOpenAI
from langchain_core.prompts import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain_core.messages import HumanMessage,SystemMessage
from langchain_core.output_parsers import JsonOutputParser
import json
import base64
class VisionTest:
    def __init__(self,file):
        self.llm = ChatOpenAI(temperature=0,
                              model="qwen2.5-72b-instruct",
                              base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                              api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
        image = base64.b64encode(open(file, 'rb').read()).decode()
        self.prompt = ChatPromptTemplate.from_messages([
            SystemMessage("你是一个资深软件工程师,请分析图片回答问题。"),
            HumanMessage(content=[
                {"type": "text", "text": "describe the weather in this image"},
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/jpeg;base64,{image}"},
                }
            ])
        ])
    def run(self,msg):
        chain = self.prompt | self.llm
        resp = chain.invoke({"msg": msg})
        print(resp.content)
if __name__ == '__main__':
    vision = VisionTest("image_path")
    vision.run("问题")