lyg
2025-05-14 acde3bd32f07bf02839a21e8fe5b4e69bfca2251
docx文档拆分,文档段落实体词提取,存入mysql数据库。
7个文件已修改
8个文件已添加
1个文件已删除
1355 ■■■■ 已修改文件
.gitignore 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/db/doc_db_helper.py 121 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/db/doc_db_models.py 135 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/db/neo4j.py 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/doc_convert.py 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/doc_processor.py 133 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/docx_split.py 250 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/entity_helper.py 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/entity_recognition.py 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/doc/models.py 117 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/log/__init__.py 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
knowledgebase/utils.py 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
lang_flow.py 147 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tpl/entities.json 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tpl/tc_pkt_format.json 147 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
tpl/tc_transfer_frame.json 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -6,4 +6,5 @@
/docs
/out*
/packages
__pycache__
__pycache__
/static/
knowledgebase/db/doc_db_helper.py
New file
@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-12
# @version: 1
# @description: 文档数据库助手,mysql数据库
import json
from knowledgebase.db.doc_db_models import init_doc_db, TDoc, TEntity, TParagraph, TParagraphLink, TParagraphRefLink, \
    TParagraphEntityLink
from knowledgebase.doc.models import ParagraphInfo, DocInfo
class DocDbHelper:
    """
    文档数据库助手
    """
    def __init__(self):
        self.session = init_doc_db()
    def add_doc(self, doc_info: DocInfo) -> int:
        """
        添加文档
        """
        _doc = TDoc(
            file=doc_info.file,
            file_name=doc_info.file_name,
            is_del=0,
        )
        self.session.add(_doc)
        self.session.commit()
        return _doc.id
    def add_paragraph(self, doc_id: int, parent_id: int, paragraph_info: ParagraphInfo) -> TParagraph:
        """
        添加段落
        :param doc_id: 文档id
        :param parent_id: 父段落id
        :param paragraph_info: 段落信息
        """
        _paragraph = TParagraph(
            doc_id=doc_id,
            text=paragraph_info.text,
            title_level=paragraph_info.title_level,
            title_num=paragraph_info.title_num,
            num=paragraph_info.num,
            num_level=paragraph_info.num_level,
            parent_id=parent_id,
            is_del=0,
        )
        self.session.add(_paragraph)
        self.session.commit()
        if parent_id is not None:
            paragraph_link = TParagraphLink(parent_id=parent_id, child_id=_paragraph.id)
            self.add_paragraph_link(paragraph_link)
        if paragraph_info.entities:
            for entity in paragraph_info.entities:
                self.add_paragraph_entity_link(TParagraphEntityLink(paragraph_id=_paragraph.id, entity_id=entity.id))
        if paragraph_info.children:
            for child in paragraph_info.children:
                self.add_paragraph(doc_id, _paragraph.id, child)
        return _paragraph
    def add_paragraph_link(self, paragraph_link):
        """
        添加段落关系
        :param paragraph_link: 段落关系
        """
        self.session.add(paragraph_link)
        self.session.commit()
        return paragraph_link.id
    def add_paragraph_entity_link(self, paragraph_entity_link):
        """
        添加段落实体关系
        :param paragraph_entity_link: 段落实体关系
        """
        self.session.add(paragraph_entity_link)
        self.session.commit()
        return paragraph_entity_link.id
    def add_entity(self, entity):
        """
        添加实体
        :param entity: 实体
        """
        self.session.add(entity)
        self.session.commit()
        return entity.id
    def add_paragraph_ref_link(self, paragraph_ref_link):
        """
        添加段落引用关系
        :param paragraph_ref_link: 段落引用关系
        """
        self.session.add(paragraph_ref_link)
        self.session.commit()
        return paragraph_ref_link
    def get_all_entities(self) -> list[TEntity]:
        return self.session.query(TEntity).all()
    def get_docs(self) -> list[TDoc]:
        return self.session.query(TDoc).all()
    def commit(self):
        self.session.commit()
doc_dbh = DocDbHelper()
# if __name__ == '__main__':
#     doc_db = DocDbHelper()
#     # doc_db.insert_entities()
#     doc = doc_db.add_doc(DocInfo(file='aaa', file_name='test'))
#     p1 = doc_db.add_paragraph(doc.id, None, ParagraphInfo(text='test1', title_level=1, num=1, num_level=1))
#     p2 = doc_db.add_paragraph(doc.id, p1.id, ParagraphInfo(text='test2', title_level=2, num=1, num_level=2))
#     p3 = doc_db.add_paragraph(doc.id, p2.id, ParagraphInfo(text='test3', title_level=3, num=1, num_level=3))
#     doc_db.add_paragraph_ref_link(TParagraphRefLink(parent_id=p1.id, child_id=p3.id))
knowledgebase/db/doc_db_models.py
New file
@@ -0,0 +1,135 @@
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-12
# @version: 1
# @description: 文档数据库模型
from sqlalchemy import create_engine, Column, DateTime, Integer, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import sessionmaker, scoped_session
from knowledgebase.log import Log
Base = declarative_base()
metadata = Base.metadata
class TParagraph(Base):
    """
    段落表
    """
    __tablename__ = 't_paragraphs'
    id = Column(Integer, primary_key=True)
    text = Column(Text)
    title_level = Column(Integer)
    title_num = Column(Text)
    num_level = Column(Integer)
    num = Column(Integer)
    doc_id = Column(Integer, ForeignKey('t_docs.id'))
    parent_id = Column(Integer, ForeignKey('t_paragraphs.id'))
    parent_link = relationship("TParagraphLink", foreign_keys='TParagraphLink.child_id', back_populates='parent',
                               uselist=False)
    children_links = relationship("TParagraphLink", foreign_keys='TParagraphLink.parent_id', back_populates='child')
    ref_links = relationship("TParagraphRefLink", foreign_keys='TParagraphRefLink.child_id', back_populates='parent')
    entity_links = relationship("TParagraphEntityLink")
    is_del = Column(Integer)
    @property
    def children(self):
        return [link.child for link in self.children_links]
    @property
    def parent(self):
        if self.parent_link:
            return self.parent_link.parent
        return None
    def refs(self):
        return [link.child for link in self.ref_links]
class TParagraphLink(Base):
    """
    段落link表
    """
    __tablename__ = 't_paragraph_links'
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey('t_paragraphs.id'))
    parent = relationship("TParagraph", foreign_keys=[parent_id], back_populates="children_links")
    child_id = Column(Integer, ForeignKey('t_paragraphs.id'))
    child = relationship("TParagraph", foreign_keys=[child_id], back_populates="parent_link")
    is_del = Column(Integer)
class TParagraphRefLink(Base):
    """
    段落引用link表
    """
    __tablename__ = 't_paragraph_ref_links'
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey('t_paragraphs.id'))
    parent = relationship("TParagraph", foreign_keys=[parent_id], back_populates="ref_links")
    child_id = Column(Integer, ForeignKey('t_paragraphs.id'))
    child = relationship("TParagraph", foreign_keys=[child_id], viewonly=True)
    is_del = Column(Integer)
class TParagraphEntityLink(Base):
    """
    段落实体link表
    """
    __tablename__ = 't_paragraph_entity_links'
    id = Column(Integer, primary_key=True)
    paragraph_id = Column(Integer, ForeignKey('t_paragraphs.id'))
    paragraph = relationship("TParagraph", foreign_keys=[paragraph_id], back_populates="entity_links")
    entity_id = Column(Integer, ForeignKey('t_entities.id'))
    entity = relationship("TEntity", foreign_keys=[entity_id])
    is_del = Column(Integer)
class TDoc(Base):
    """
    文档表
    """
    __tablename__ = 't_docs'
    id = Column(Integer, primary_key=True)
    file = Column(Text)
    file_name = Column(Text)
    paragraphs = relationship("TParagraph")
    is_del = Column(Integer)
class TEntity(Base):
    """
    实体表
    """
    __tablename__ = 't_entities'
    id = Column(Integer, primary_key=True)
    name = Column(Text)
    prompts = Column(Text)
    type = Column(Text)
    doc_type = Column(Text)
class TEntityLink(Base):
    __tablename__ = 't_entity_links'
    id = Column(Integer, primary_key=True)
    entity_id = Column(Integer, ForeignKey('t_entities.id'))
    is_del = Column(Integer)
def init_doc_db():
    """
    初始化文档数据库
    :return: 数据库session实例
    """
    # mysql
    Log.info("连接并初始化文档数据库...")
    engine = create_engine('mysql+pymysql://root:123456@192.168.3.145:3306/knowledgebase', echo=False)
    Base.metadata.create_all(engine)
    SessionFactory = sessionmaker(bind=engine)
    Session = scoped_session(SessionFactory)
    session = Session()
    return session
knowledgebase/db/neo4j.py
@@ -24,7 +24,15 @@
                    entities=json.dumps(page_info.entities, ensure_ascii=False, indent=2))
        self.graph.create(node)
        return node
    def create_trunk_node(self, trunk, entities):
        """
        创建页面节点
        """
        # 创建节点
        node = Node("Trunk", trunk=trunk,
                    entities=json.dumps(entities, ensure_ascii=False, indent=2))
        self.graph.create(node)
        return node
    def create_entity_node(self, entity: str):
        """
        创建实体节点
@@ -36,6 +44,15 @@
        self.graph.create(node)
        return node
    def create_trunk_entity_relationship(self, page_node, entity_node):
        """
        创建页面和实体节点的关系
        """
        relationship = Relationship(page_node, "trunk_entity", entity_node)
        self.graph.create(relationship)
        return relationship
    def create_page_entity_relationship(self, page_node, entity_node):
        """
        创建页面和实体节点的关系
knowledgebase/doc/doc_convert.py
@@ -69,17 +69,3 @@
        print(f"文件 {docx_file} 已成功转换为 {pdf_file}!")
    except  Exception as e:
        print(f"出现错误: {e}")
def test():
    # doc_to_docx("D:\\projects\\KnowledgeBase\\doc\\XA-5D无人机探测大纲(公开).doc",
    #             "D:\\projects\\KnowledgeBase\\doc\\XA-5D无人机探测大纲(公开)111.docx")
    # docx_to_pdf("D:/workspace/PythonProjects/KnowledgeBase/doc/ZL格式(公开).docx",
    #             "D:/workspace/PythonProjects/KnowledgeBase/doc/ZL格式(公开).pdf")
    import pymupdf4llm
    md_text = pymupdf4llm.to_markdown("D:/workspace/PythonProjects/KnowledgeBase/doc/ZL格式(公开).pdf")
    print(md_text)
if __name__ == '__main__':
    test()
knowledgebase/doc/doc_processor.py
@@ -1,65 +1,106 @@
# -*- coding: utf-8 -*-
# @file: doc_processor.py
# @author: lyg
# @date: 20250427
# @date: 2025-5-13
# @version: 
# @description: 处理文档,提取章节信息,提取页码信息,提取实体词,写入图数据库(neo4j)。
from knowledgebase.db.neo4j import Neo4jHelper
from knowledgebase.doc.doc_split import DocSplit
from knowledgebase.doc.entity_recognition import EntityRecognition
# @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。
from langchain_core.messages import HumanMessage
from knowledgebase.doc.docx_split import DocSplit
import asyncio
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.doc.entity_helper import entity_helper
from knowledgebase.doc.entity_recognition import EntityRecognition
import os.path
from knowledgebase.doc.models import DocInfo, ParagraphInfo
from knowledgebase.llm import llm
from knowledgebase.log import Log
from knowledgebase import utils
class DocProcessor:
    def __init__(self, pdf_file):
        self.doc_split = DocSplit(pdf_file)
        self.entity_recognition = EntityRecognition()
        self.neo4j = Neo4jHelper()
    def __init__(self, docx_file: str):
        """
        文档处理
        :param docx_file: 要处理的文档
        """
        Log.info(f'开始处理文档:{docx_file}')
        self.docx_file = docx_file
        self.doc_split = DocSplit(docx_file)
        self.doc_type = self.get_doc_type()
        self.entity_recognition = EntityRecognition(self.doc_type)
        self.doc_id = 0
    async def gen_page_entities(self, page_info):
        # 获取页面实体词
        page_entities = await asyncio.to_thread(lambda: self.entity_recognition.run(page_info.text))
        page_info.entities = page_entities
    def get_doc_type(self):
        Log.info(f'识别文档类型:{self.docx_file}')
        rules = ';\n'.join([f'- {it}:{entity_helper.doc_prompt_map[it]}' for it in entity_helper.doc_prompt_map.keys()])
        msg = HumanMessage(f'''
# 指令
请从下面的文件名中识别文档类型,如果识别失败不要输出任何字符。
文件名:{os.path.basename(self.docx_file)}
# 识别规则
{rules}
# 示例
遥测大纲
''')
        resp = llm.invoke([msg])
        Log.info(f'识别结果:{resp.content}')
        return resp.content
    async def gen_sect_entities(self, paragraph: ParagraphInfo):
        # Log.info(f'生成章节实体词:{paragraph.full_text}')
        # 获取章节实体词
        entities = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text))
        Log.info(f'章节实体词:{entities}')
        if entities:
            paragraph.entities = [next(filter(lambda x: x.name == e, entity_helper.entities), None) for e in entities]
            paragraph.entities = [e for e in paragraph.entities if e]
    def process(self):
        # 分批并发处理,每批10页
        self.doc_split.split()
        # 分批并发处理,每批10个
        batch_size = 10
        for i in range(0, len(self.doc_split.page_infos), batch_size):
            batch_page_infos = self.doc_split.page_infos[i:i + batch_size]
        for i in range(0, len(self.doc_split.paragraphs), batch_size):
            batch_paragraphs = self.doc_split.paragraphs[i:i + batch_size]
            tasks = []
            for page_info in batch_page_infos:
                tasks.append(self.gen_page_entities(page_info))
            asyncio.run(asyncio.gather(*tasks))
        self.save_to_neo4j()
            for paragraph in batch_paragraphs:
                tasks.append(self.gen_sect_entities(paragraph))
    def save_to_neo4j(self):
        """
        保存页和页实体词到neo4j数据库。
            async def run():
                await asyncio.gather(*tasks)
        1.每一页为一个Node;
        2.每一个实体词为一个Node;
        3.页和实体词直接建立关系 页->实体词
        :return:
            asyncio.run(run())
        # 保存到数据库
        self.save_to_db()
    def save_to_db(self):
        """
        for page_info in self.doc_split.page_infos:
            # 创建页节点
            page_node = self.neo4j.create_page_node(page_info)
            entity_nodes = []
            for entity in page_info.entities:
                # 创建实体词节点
                entity_node = self.neo4j.create_entity_node(entity)
                # 建立关系 页->实体词
                self.neo4j.create_page_entity_relationship(page_node, entity_node)
                entity_nodes.append(entity_node)
            if len(entity_nodes) > 0:
                for i in range(len(entity_nodes)):
                    prev_entity_node = entity_nodes[i]
                    for entity_node in entity_nodes[i + 1:]:
                        # 建立关系 一页中的 实体词1->实体词2
                        self.neo4j.create_entity_relationship(prev_entity_node, entity_node)
        保存段落和段落实体词关系到数据库。
        """
        Log.info('保存段落和段落实体词关系到数据库...')
        with open(self.docx_file, 'rb') as f:
            file_bytes = f.read()
            md5 = utils.generate_bytes_md5(file_bytes)
        doc = DocInfo(os.path.basename(self.docx_file), md5, self.doc_type, self.doc_split.paragraph_tree)
        self.doc_id = doc_dbh.add_doc(doc)
        for paragraph in doc.paragraphs:
            doc_dbh.add_paragraph(self.doc_id, None, paragraph)
        Log.info('保存段落和段落实体词关系到数据库完成')
if __name__ == '__main__':
    pdf_file = "D:/workspace/PythonProjects/KnowledgeBase/doc/XA-5D无人机探测大纲(公开)111.pdf"
    doc_processor = DocProcessor(pdf_file)
    doc_processor.process()
    files = [
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机分系统遥测源包设计报告(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机软件用户需求(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测大纲(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测信号分配表(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机指令格式与编码定义(公开).docx",
        r"D:\workspace\PythonProjects\KnowledgeBase\doc\指令格式(公开).docx"
    ]
    for file in files:
        doc_processor = DocProcessor(file)
        doc_processor.process()
    # doc_dbh.get_docs()
knowledgebase/doc/docx_split.py
@@ -7,58 +7,35 @@
import docx
import docx.table
import json
from dataclasses import dataclass
from PIL import Image
import io
import re
import typing
from knowledgebase.doc.image_to_text import ImageToText
@dataclass
class ParagraphInfo:
    """
    段落信息
    :param text: str - 段落文本
    :param level: int - 段落级别,1-9级标题,0表示正文
    :param title_no: str - 标题编号,如1.1、1.1.1等
    """
    text: str
    level: int
    title_no: str
    @property
    def full_text(self):
        """
        获取段落完整文本,包含标题编号
        :return: str - 段落完整文本
        """
        return f"{self.title_no} {self.text}"
    def __init__(self, text: str, level: int):
        """
        段落信息
        :param text: str - 段落文本
        :param level: int - 段落级别,1-9级标题,0表示正文
        """
        self.text = text
        self.level = level
        self.title_no = ''
from knowledgebase.doc.models import ParagraphInfo
from knowledgebase.log import Log
class DocSplit:
    """
    docx文档拆分器,根据段落拆分,将图片和表格转换为json数据。
    1.封装段落信息
    2.将图片和表格转换为json
    3.将段落按照文档标题级别组合成树形结构
    2.将图片转换为自然语言描述
    3.将表格转换为json格式
    4.将段落按照文档标题级别组合成树形结构
    """
    def __init__(self, doc_file):
        self.doc_file = doc_file
    def __init__(self, docx_file: str):
        """
        docx文档拆分
        :param docx_file: 要拆分的docx文件路径
        """
        self.docx_file = docx_file
        self.image_to_text = ImageToText()
        self.paragraphs:list[ParagraphInfo] = []
        self.paragraphs: list[ParagraphInfo] = []
        self.paragraph_tree: list[ParagraphInfo] = []
    def table_to_json(self, table: docx.table.Table):
        """
@@ -67,8 +44,8 @@
           :param table: docx.table.Table - 要转换的表格对象
           :return list - 表格数据,以 JSON 格式表示
        """
        table_data = []
        headers = []
        table_data = [headers]
        first_row = True
        row: docx.table._Row
        for row in table.rows:
@@ -77,7 +54,7 @@
                    headers.append(cell.text)
                first_row = False
                continue
            row_data = {}
            row_data = []
            row_idx = 0
            for cell in row.cells:
                if cell.tables:
@@ -92,7 +69,8 @@
                else:
                    # 单元格文本获取
                    text = cell.text
                row_data[headers[row_idx]] = text
                # row_data[headers[row_idx]] = text
                row_data.append(text)
                row_idx += 1
            table_data.append(row_data)
@@ -104,7 +82,8 @@
        :return: list[ParagraphInfo] - 段落列表
        """
        document = docx.Document(self.doc_file)
        Log.info(f"开始拆分文档:{self.docx_file}")
        document = docx.Document(self.docx_file)
        table_cnt = 0
        paragraph_cnt = 0
@@ -112,27 +91,45 @@
            if element.tag.endswith('p'):  # 段落
                # 获取标题多级编号
                paragraph = document.paragraphs[paragraph_cnt]
                paragraph_text = paragraph.text
                if paragraph_text:
                    self.paragraphs.append(ParagraphInfo(paragraph_text, self.get_header_level(paragraph)))
                p_text = paragraph.text
                try:
                    num = element.pPr.numPr.numId.val
                    level = element.pPr.numPr.ilvl.val
                except:
                    num = 0
                    level = 0
                if p_text:
                    title_level = self.get_title_level(paragraph)
                    self.paragraphs.append(ParagraphInfo(p_text, title_level, num, level))
                # 检查是否是图片,如果是图片则转换为文本
                img_data = self.get_image_blob(paragraph)
                img_data = self.get_image_text(paragraph)
                if img_data:
                    text = self.gen_text_from_img(img_data)
                    self.paragraphs.append(ParagraphInfo(text, 0))
                    text = f"```图片(以下内容为图片描述)\n{text}\n```"
                    self.paragraphs.append(ParagraphInfo(text, 0, num, level))
                paragraph_cnt += 1
            elif element.tag.endswith('tbl'):  # 表格
                table = document.tables[table_cnt]  # 获取当前表格对象
                table_cnt += 1
                table_data = self.table_to_json(table)
                self.paragraphs.append(ParagraphInfo(json.dumps(table_data, indent=4, ensure_ascii=False), 0))
                self.paragraphs.append(
                    ParagraphInfo("```json\n" + json.dumps(table_data, indent=4, ensure_ascii=False) + "\n```", 0))
            else:
                continue
        # 生成标题编号
        self.gen_title_no(self.paragraphs)
        Log.info(f"开始生成标题编号和列表编号")
        self.gen_title_num(self.paragraphs)
        # 生成树形结构
        Log.info(f"开始生成树形结构")
        self.gen_paragraph_tree(self.paragraphs)
    @staticmethod
    def get_image_blob(paragraph):
    def get_image_text(paragraph):
        """
        获取段落中的图片描述
        :param paragraph: 段落
        :return: 图片内容描述信息
        """
        # 遍历段落中的所有Run对象(图片通常在单独的Run中)
        for run in paragraph.runs:
            xml = run._element.xml
@@ -144,7 +141,7 @@
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob, "png")
                        return DocSplit.image_convert(image_part.blob)
            if xml.find('wp:inline') != -1 or xml.find('wp:anchor') != -1:
                # 使用正则表达式查找r:embed属性
                match = re.search(r'r:embed="([^"]+)"', xml)
@@ -153,36 +150,88 @@
                    if r_id:
                        # 获取图片信息
                        image_part = paragraph.part.rels[r_id].target_part
                        return DocSplit.image_convert(image_part.blob, "png")
                        return DocSplit.image_convert(image_part.blob)
        return None
    @staticmethod
    def gen_title_no(paragraphs: list[ParagraphInfo]):
        title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1]
        for i in range(len(paragraphs)):
            if paragraphs[i].level > 0:
                for j in range(paragraphs[i].level - 1):
                    title_levels[j] = 1
                paragraphs[i].title_no = '.'.join([str(x) for x in title_levels[0:paragraphs[i].level]])
                title_levels[paragraphs[i].level - 1] += 1
    def gen_title_num(paragraphs: list[ParagraphInfo]):
        """
        生成标题编号和列表编号
        标题级别从1-9,0表示正文
        :param paragraphs: list[ParagraphInfo] - 段落列表
        :return: None
        """
        MAX_TITLE_LEVEL = 9  # 定义为常量,便于统一管理和扩展
        title_levels = [0] * MAX_TITLE_LEVEL  # 初始化为全0
        list_counters = [0] * MAX_TITLE_LEVEL
        def format_number(level: int, value: int) -> str:
            # 使用映射方式简化逻辑
            if level < 0 or level > 4:
                return str(value)
            formats = {
                0: lambda v: f"({v})",
                1: lambda v: f"{v})",
                2: lambda v: f"({chr(96 + v)})",
                3: lambda v: f"{chr(96 + v)})",
                4: lambda v: chr(96 + v),
            }
            return formats[level](value)
        for p in paragraphs:
            if p.title_level > 0:
                title_levels[p.title_level - 1] += 1
                for i in range(p.title_level, MAX_TITLE_LEVEL):
                    title_levels[i] = 0
                p.title_num = '.'.join([str(x) for x in title_levels[:p.title_level]])
                list_counters = [0] * MAX_TITLE_LEVEL
            else:
                title_levels = [1, 1, 1, 1, 1, 1, 1, 1, 1]
                # 处理列表编号
                if p.num > 0:
                    level = p.num_level
                    # 校验 level 合法性
                    if level < 0 or level >= MAX_TITLE_LEVEL:
                        continue
                    list_counters[level] += 1
                    # 重置当前层级之后的计数器
                    for l in range(level + 1, MAX_TITLE_LEVEL):
                        list_counters[l] = 0
                    # 当前层级递增并赋值
                    p.title_num = format_number(level, list_counters[level])
                else:
                    list_counters = [0] * MAX_TITLE_LEVEL
    @staticmethod
    def get_header_level(paragraph) -> int:
        if paragraph.style.base_style:
            style = paragraph.style.base_style
        else:
            style = paragraph.style
    def get_title_level(paragraph) -> int:
        """
        获取段落标题级别
        :param paragraph: docx.paragraph.Paragraph - 要获取标题级别的段落对象
        :return: int - 标题级别,0 表示非标题
        """
        style = paragraph.style
        if style and style.name.startswith('Heading'):
            # 获取标题级别
            level = int(style.name.split(' ')[1])
            return level
        elif style.base_style and style.base_style.name.startswith('Heading'):
            level = int(style.base_style.name.split(' ')[1])
            return level
        else:
            return 0
    @staticmethod
    def image_convert(_in: bytes, _out_format: str) -> bytes:
    def image_convert(_in: bytes) -> bytes:
        """
        将图片转换为png格式的bytes
        :param _in: bytes - 图片数据
        :return: bytes - png格式的图片数据
        """
        in_io = io.BytesIO()
        in_io.write(_in)
        img = Image.open(in_io, "r")
@@ -191,11 +240,72 @@
        out_io.seek(0)
        return out_io.read()
    def gen_text_from_img(self, img_data:bytes):
    def gen_text_from_img(self, img_data: bytes):
        """
        利用LLM将图片转为文本
        :param img_data: bytes - 图片数据
        :return: str - 文本
        """
        return self.image_to_text.gen_text_from_img(img_data)
    def gen_paragraph_tree(self, paragraphs: typing.List[ParagraphInfo]):
        """
        生成段落树结构,根据title_level划分段落树
        :param paragraphs: list[ParagraphInfo] - 段落列表(会被原地修改)
        """
        if not paragraphs:
            return
        stack = []
        result = []
        _paragraphs = []
        def merge_paragraph_text(info: ParagraphInfo):
            text_nodes = [child for child in info.children if child.title_level == 0]
            info.text += '\n' + '\n'.join([child.full_text for child in text_nodes])
            info.children = [child for child in info.children if child.title_level > 0]
        for p in paragraphs:
            if p.title_level == 1:
                result.append(p)
            # 清理栈顶比当前级别低或相等的节点
            while stack and p.title_level != 0 and stack[-1].title_level >= p.title_level:
                _p = stack.pop()
                merge_paragraph_text(_p)
            if p.title_level > 0:
                if len(stack):
                    stack[-1].children.append(p)
                stack.append(p)
                _paragraphs.append(p)
            elif len(stack):
                stack[-1].children.append(p)
            else:
                # 非标题段落直接加入结果
                result.append(p)
        while stack:
            merge_paragraph_text(stack.pop())
        # 替换原始列表内容,避免多次 remove 操作
        self.paragraphs[:] = _paragraphs
        self.paragraph_tree = result
if __name__ == '__main__':
    doc_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\ZL格式(公开).docx'
    doc_split = DocSplit(doc_file)
    docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx'
    # docx_file = r'D:\workspace\PythonProjects\KnowledgeBase\doc\table_test.docx'
    doc_split = DocSplit(docx_file)
    doc_split.split()
    print("\n".join([x.full_text for x in doc_split.paragraphs]))
    # er = EntityRecognition()
    # db = Neo4jHelper()
    # for trunk in doc_split.trunks:
    #     print('段落文本:')
    #     print(trunk)
    #     print('实体词:')
    #     print(er.run(trunk))
    # entities = er.run(trunk)
    # db.create_page_node()
    print("\n".join([x.full_text_with_children for x in doc_split.paragraphs]))
    print()
knowledgebase/doc/entity_helper.py
New file
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
#
# @author:
# @date:
# @version:
# @description:
import json
import os.path
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.db.doc_db_models import TEntity
from knowledgebase.log import Log
class EntityHelper:
    # 文档类型和识别提示词map
    doc_prompt_map: dict
    # 所有实体
    entities: list[TEntity]
    def __init__(self):
        Log.info("初始化EntityHelper")
        current_dir = os.path.dirname(__file__)
        self.entities = doc_dbh.get_all_entities()
        self.doc_prompt_map = {}
        entity_names = [entity.name for entity in self.entities]
        with open(f'{current_dir}/../../tpl/entities.json', 'r', encoding='utf-8') as f:
            text = f.read()
            obj = json.loads(text)
            for ty in obj:
                obj2 = obj[ty]
                for doc_ty in obj2:
                    prompts = obj2[doc_ty]['prompts']
                    self.doc_prompt_map[doc_ty] = prompts
                    for entity in obj2[doc_ty]['entities']:
                        if entity in entity_names:
                            continue
                        _entity = TEntity(name=entity, type=ty, doc_type=doc_ty,
                                          prompts=obj2[doc_ty]['entities'][entity])
                        doc_dbh.add_entity(_entity)
                        Log.info(f"新增Entity:{entity},id:{_entity.id}")
entity_helper = EntityHelper()
knowledgebase/doc/entity_recognition.py
@@ -11,6 +11,12 @@
import json
from knowledgebase import utils
from knowledgebase.doc.entity_helper import entity_helper
llm = ChatOpenAI(temperature=0,
                 model="qwen2.5-72b-instruct",
                 base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                 api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
class EntityRecognition:
@@ -21,20 +27,22 @@
    """
    cache_file = "entity_recognition.cache"
    def __init__(self):
        llm = ChatOpenAI(temperature=0,
                         model="qwen2.5-72b-instruct",
                         base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
                         api_key="sk-15ecf7e273ad4b729c7f7f42b542749e")
    def __init__(self, doc_type: str):
        # 实体词列表
        entities = filter(lambda x: x.doc_type == doc_type, entity_helper.entities)
        entity_list = ';\n'.join([f'- {entity.name}:{entity.prompts}' for entity in entities]) + "。"
        msg = HumanMessagePromptTemplate.from_template(template="""
# 指令
请从给定的文本中提取实体词列表。
请从给定的文本中提取实体词列表,实体词列表定义如下:
## 实体词列表及识别规则
""" + entity_list + """
# 约束
- 输出格式为JSON格式;
- 提取的实体词必须是上面列举的实体词;
- 输出数据结构为字符串数组。
# 示例
```json
["实体1","实体2"]
["遥控帧格式","遥控包格式"]
```
# 文本如下:
@@ -65,9 +73,10 @@
    def run(self, in_text: str) -> list[str]:
        """
        运行实体识别抽取。
        :param in_text: str - 输入文本
        """
        # 缓存命中
        text_md5 = utils.generate_md5(in_text)
        text_md5 = utils.generate_text_md5(in_text)
        if text_md5 in self.cache:
            return self.cache[text_md5]
        result = self.chain.invoke({"text": in_text})
knowledgebase/doc/models.py
New file
@@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
#
# @author: lyg
# @date: 2025-5-12
# @version: 1
# @description: 文档相关数据类
from dataclasses import dataclass
import typing
from knowledgebase.db.doc_db_models import TEntity
@dataclass
class ParagraphInfo:
    """
    段落信息
    属性:
        text: str - 段落文本
        title_level: int - 段落级别,1-9级标题,0表示正文
        title_num: str - 标题编号,如1.1、1.1.1等,列表编号,如(1)、(2)
        num_level: int - 列表序号级别,0表示正文
        num: int - 列表序号,如果是列表
        children: typing.List[ParagraphInfo] - 子段落列表
        refs: 引用文档
    """
    text: str
    title_level: int
    title_num: str
    num_level: int
    num: int
    children: typing.List
    refs: typing.List
    entities: typing.List[TEntity]
    @property
    def full_text(self):
        """
        获取段落完整文本,包含标题编号
        :return: str - 段落完整文本
        """
        if self.title_num:
            return f"{self.title_num}. {self.text}"
        else:
            return f"{self.text}"
    @property
    def full_text_with_children(self):
        """
        获取段落完整文本,包含标题编号和子段落
        :return: str - 段落完整文本
        """
        full_text = ''
        if self.title_num:
            full_text = f"{self.title_num}. {self.text}"
        else:
            full_text = f"{self.text}"
        if len(self.children):
            for child in self.children:
                full_text = full_text + "\n" + child.full_text_with_children
        return full_text
    def __init__(self, text: str, title_level: int, num=0, num_level=0):
        """
        段落信息
        属性:
            text: str - 段落文本
            title_level: int - 段落级别,1-9级标题,0表示正文
            num: int - 列表序号
            num_level: int - 列表序号级别
        """
        self.text = text
        self.title_level = title_level
        self.title_num = ''
        self.num = num
        self.num_level = num_level
        self.children: typing.List[ParagraphInfo] = []
        self.entities: typing.List[TEntity] = []
    def __str__(self):
        return f"{self.full_text}"
    def __repr__(self):
        return f"{self.full_text}"
@dataclass
class DocInfo:
    """
    文档信息
    属性:
        id: int - id
        file_name: str - 文档名称。
        file: typing.BinaryIO - 文档文件。
        file_type: str - 文档类型
        paragraphs: typing.List[ParagraphInfo] - 文档段落列表。
    """
    id: int
    file_name: str
    file: str
    file_type: str
    paragraphs: typing.List[ParagraphInfo]
    def __init__(self, file_name: str, file: bytes, file_type: str, paragraphs: typing.List[ParagraphInfo]):
        """
        文档信息
        属性:
            file_name: str - 文档名称。
            file: bytes - 文档文件。
        """
        self.file_name = file_name
        self.file = file
        self.file_type = file_type
        self.paragraphs: typing.List[ParagraphInfo] = paragraphs
knowledgebase/log/__init__.py
New file
@@ -0,0 +1,50 @@
# -*- coding: utf-8 -*-
#
# @author:
# @date:
# @version:
# @description:
import logging
logger = logging.getLogger('logs_logger')
logger.setLevel(logging.DEBUG)
# 创建一个文件处理器
file_handler = logging.FileHandler('logs.log')
file_handler.setLevel(logging.DEBUG)
# 创建一个控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# 创建一个日志格式
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
# 将处理器添加到记录器
logger.addHandler(file_handler)
logger.addHandler(console_handler)
class Log:
    @staticmethod
    def debug(msg):
        logger.debug(msg)
    @staticmethod
    def info(msg):
        logger.info(msg)
    @staticmethod
    def error(msg):
        logger.error(msg)
    @staticmethod
    def warning(msg):
        logger.warning(msg)
    @staticmethod
    def critical(msg):
        logger.critical(msg)
knowledgebase/utils.py
@@ -15,7 +15,7 @@
    return mask
def generate_md5(input_string):
def generate_text_md5(input_string):
    # 创建一个 md5 哈希对象
    md5_hash = hashlib.md5()
@@ -27,6 +27,17 @@
    return md5_digest
def generate_bytes_md5(input_bytes):
    # 创建一个 md5 哈希对象
    md5_hash = hashlib.md5()
    # 更新哈希对象内容
    md5_hash.update(input_bytes)
    # 获取哈希值的十六进制表示
    md5_digest = md5_hash.hexdigest()
    return md5_digest
def file_exists(cache_file: str):
    return os.path.exists(cache_file)
lang_flow.py
File was deleted
tpl/entities.json
New file
@@ -0,0 +1,54 @@
{
  "型号基础信息": {
    "用户需求": {
      "prompts": "文件名包含“需求”,如果有多份需求文件,那么选择包含“星务管理”类似关键字的,内容包含遥控遥测相关功能描述",
      "entities": {
        "系统概述": "一般在第一章节,类似于“前言”或“概述”的章节,这里描述了型号基本信息及分系统组成",
        "总线管理": "一般是“星务管理”的子章节,章节名包含“总线”,内容主要描述总线相关功能或引用文件"
      }
    }
  },
  "遥测包配置": {
    "遥测大纲": {
      "prompts": "文件名通常包含“遥测” “大纲”等关键字,内容包含对遥测帧及遥测包格式的定义",
      "entities": {
        "遥测格式定义": "一般在“遥测格式”章节,内容包含”遥测帧“ ”遥测包“具体格式的定义",
        "虚拟信道定义": "章节名包含“虚拟信道”,内容包含虚拟信道的划分,各源包在各虚拟信道下传分配",
        "插入域": "章节名包含“插入域”,内容为一张表格,定义了插入域中的遥测参数",
        "源包参数表": "章节名包含“源包设计”,内容为多个源包具体参数的表格,每个源包单独一张表格"
      }
    },
    "源包设计": {
      "prompts": "文件名通常包含“源包”关键字",
      "entities": {
        "源包参数表": "通常为叶子节点,章节名通常为 “xxx包”,内容为源包参数表格,定义了包头、数据域具体内容"
      }
    }
  },
  "总线配置": {
    "总线通信协议": {
      "prompts": "文件名中包含“总线”关键字,内容为各分系统源包在总线上传输的定义",
      "entities": {
        "RT地址分配": "章节名包含“RT地址”,内容为各分系统和RT地址分配关系的表格",
        "分系统源包": "通常在叶子章节中,内容为该分系统各源包在总线上传输时所使用的“传输服务”、“子地址”、“通信帧号”等,并描述了源包名称、APID、包长等信息",
        "源包参数表": "章节名包含“源包设计”,内容为多个源包具体参数的表格,每个源包单独一张表格"
      }
    }
  },
  "指令格式配置": {
    "指令格式": {
      "prompts": "文件名中包含“指令格式”关键字,内容为指令格式的定义",
      "entities": {
        "遥控帧格式": "章节名包含“遥控”和“帧”关键字,内容为遥控帧各字段的定义",
        "遥控包格式": "章节名包含“遥控”和“包”关键字,内容为遥控包各字段的定义",
        "APID分配": "章节名包含“APID”或“应用过程标识”关键字,内容为APID值的枚举表达,在遥控包数据域或指令单元的定义章节中,包含有对于APID值的描述"
      }
    },
    "遥控指令表": {
      "prompts": "文件名中包含“遥控指令”和“表”关键字,内容为遥控指令代号、通道号和指令名称(意义)的描述",
      "entities": {
        "开关指令代号": "章节名包含“遥控”和“指令”关键字,内容为遥控指令代号、通道号和指令名称(意义)的描述。一般间接ONOFF指令等同于间接指令,直接指令等同于遥控板ONOFF指令"
      }
    }
  }
}
tpl/tc_pkt_format.json
New file
@@ -0,0 +1,147 @@
{
  "name": "遥控包",
  "type": "pkt",
  "children": [
    {
      "name": "主导头",
      "code": "primaryHeader",
      "length": 48,
      "type": "combPkt",
      "children": [
        {
          "name": "包识别",
          "code": "packetIdentifier",
          "length": 16,
          "type": "combPkt",
          "children": [
            {
              "name": "包版本号",
              "code": "packetVersionNumber",
              "length": 3,
              "value": "{{包版本号}}",
              "type": "const"
            },
            {
              "name": "包类型",
              "code": "packetType",
              "length": 1,
              "value": "{{包类型}}",
              "type": "const"
            },
            {
              "name": "数据区头标志",
              "code": "dataFieldHeaderFlag",
              "length": 1,
              "value": "{{数据区头标志}}",
              "type": "const"
            },
            {
              "name": "应用过程标识符",
              "code": "apid",
              "length": 11,
              "value": "{{应用过程标识符}}",
              "type": "const"
            }
          ]
        },
        {
          "name": "包序列控制",
          "code": "sequenceControl",
          "length": 16,
          "type": "combPkt",
          "children": [
            {
              "name": "序列标志",
              "code": "sequenceFlags",
              "length": 2,
              "value": "{{序列标志}}",
              "type": "const"
            },
            {
              "name": "包序列计数",
              "code": "packetSequenceCount",
              "length": 14,
              "type": "const",
              "value": "0"
            }
          ]
        },
        {
          "name": "包长",
          "code": "packetLength",
          "length": 16,
          "type": "length",
          "value": {
            "start": "secondaryHeader",
            "end": "packetDataEnd",
            "formula": "N-1"
          }
        }
      ]
    },
    {
      "name": "副导头",
      "code": "secondaryHeader",
      "length": 8,
      "type": "combPkt",
      "children": [
        {
          "name": "副导头标志",
          "code": "ccsdsSecondaryHeaderFlag",
          "length": 1,
          "value": "{{副导头标志}}",
          "type": "const"
        },
        {
          "name": "遥控包版本号",
          "code": "tcPktVersionNumber",
          "length": 3,
          "value": "{{遥控包版本号}}",
          "type": "const"
        },
        {
          "name": "命令正确应答",
          "code": "acknowledgmentFlag",
          "length": 4,
          "type": "const",
          "value": "{{命令正确应答}}"
        },
        {
          "name": "服务类型",
          "code": "serviceType",
          "length": 8,
          "type": "const",
          "value": "{{服务类型}}"
        },
        {
          "name": "服务子类型",
          "code": "serviceSubtype",
          "length": 8,
          "type": "const",
          "value": "{{服务子类型}}"
        },
        {
          "name": "源地址",
          "code": "sourceAddr",
          "length": 8,
          "value": "{{源地址}}",
          "type": "const"
        }
      ]
    },
    {
      "name": "应用数据区",
      "code": "data",
      "length": null,
      "type": "insUnitList",
      "children": []
    },
    {
      "name": "包差错控制域",
      "code": "pktCheckSum",
      "length": 16,
      "type": "checkSum"
    }
  ],
  "subPkts": []
}
tpl/tc_transfer_frame.json
New file
@@ -0,0 +1,83 @@
{
    "name": "遥控帧",
    "type": "pkt",
    "children": [
        {
            "name": "主导头",
            "code": "primaryHeader",
            "length": 40,
            "type": "combPkt",
            "children": [
                {
                    "name": "版本号",
                    "code": "versionNumber",
                    "length": 2,
                    "value": "{{版本号}}",
                    "type": "const"
                },
                {
                    "name": "通过标志",
                    "code": "passFlag",
                    "length": 1,
                    "type": "const",
                    "value": "{{通过标志}}"
                },
                {
                    "name": "控制命令标志",
                    "code": "controlCommandFlag",
                    "length": 1,
                    "value": "{{控制命令标志}}",
                    "type": "const"
                },
                {
                    "name": "空闲位",
                    "code": "idleBits",
                    "length": 2,
                    "value": "{{空闲位}}",
                    "type": "const"
                },
                {
                    "name": "航天器标识",
                    "code": "staID",
                    "length": 10,
                    "value": "{{航天器标识}}",
                    "type": "const"
                },
                {
                    "name": "虚拟信道标识",
                    "code": "vcid",
                    "length": 6,
                    "type": "enum",
                    "enums": "{{虚拟信道标识}}"
                },
                {
                    "name": "帧长",
                    "code": "frameLength",
                    "length": 10,
                    "type": "length",
                    "value": {"start": "START", "end": "END", "formula": "N-1"}
                },
                {
                    "name": "帧序列号",
                    "code": "frameSequenceNumber",
                    "length": 8,
                    "type": "const",
                    "value": "0"
                }
            ]
        },
        {
            "name": "传送帧数据域",
            "code": "dataField",
            "length": 8136,
            "type": "subPkt"
        },
        {
            "name": "帧差错控制域",
            "code": "frameCRC",
            "length": 16,
            "type": "checkSum"
        }
    ],
    "subPkts": []
}