lyg
2025-05-22 c099e6662b8a6e320ac314d31eda9b40455e5aa7
knowledgebase/doc/doc_processor.py
@@ -1,65 +1,173 @@
# -*- coding: utf-8 -*-
# @file: doc_processor.py
# @author: lyg
# @date: 20250427
# @date: 2025-5-13
# @version: 
# @description: 处理文档,提取章节信息,提取页码信息,提取实体词,写入图数据库(neo4j)。
from knowledgebase.db.neo4j import Neo4jHelper
from knowledgebase.doc.doc_split import DocSplit
from knowledgebase.doc.entity_recognition import EntityRecognition
# @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。
from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
from knowledgebase.db.doc_db_models import TEntity
from knowledgebase.doc.docx_split import DocSplit
import asyncio
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.doc.entity_helper import entity_helper
from knowledgebase.doc.entity_recognition import EntityRecognition
import os.path
from knowledgebase.doc.models import DocInfo, ParagraphInfo, DocType
from knowledgebase.llm import llm
from knowledgebase.log import Log
from knowledgebase import utils
class DocProcessor:
    def __init__(self, pdf_file):
        self.doc_split = DocSplit(pdf_file)
        self.entity_recognition = EntityRecognition()
        self.neo4j = Neo4jHelper()
    def __init__(self, docx_file: str):
        """
        文档处理
        :param docx_file: 要处理的文档
        """
        Log.info(f'开始处理文档:{docx_file}')
        self.docx_file = docx_file
        self.doc_type = self.get_doc_type()
        self.doc_split = DocSplit(docx_file, self.doc_type)
        self.entity_recognition = EntityRecognition(self.doc_type)
        self.doc_id = 0
    async def gen_page_entities(self, page_info):
        # 获取页面实体词
        page_entities = await asyncio.to_thread(lambda: self.entity_recognition.run(page_info.text))
        page_info.entities = page_entities
    def get_doc_type(self):
        Log.info(f'识别文档类型:{self.docx_file}')
        rules = ';\n'.join([f'- {it}:{entity_helper.doc_prompt_map[it]}' for it in entity_helper.doc_prompt_map.keys()])
        msg = HumanMessage(f'''
# 指令
请从下面的文件名中识别文档类型,如果识别失败不要输出任何字符。
文件名:{os.path.basename(self.docx_file)}
# 识别规则
{rules}
# 示例
遥测大纲
''')
        resp = llm.invoke([msg])
        Log.info(f'识别结果:{resp.content}')
        return resp.content
    def get_tc_info(self,  paragraph: ParagraphInfo):
        if self.doc_type not in [DocType.tc_format]:
            return ''
        prompt = HumanMessagePromptTemplate.from_template('''
# 指令
请从下面的文本中识别指令信息,如果识别失败不要输出任何字符。
指令信息包括:指令名称。
# 识别规则
- 文本内容为遥控指令数据域或遥控指令应用数据的定义描述。
# 约束
- 如果文本内容是目录则不要输出任何字符;
- 指令名称在章节标题中,提取指令名称要和文本中的严格一致;
- 如果没有识别到指令信息不要输出任何字符;
- 识别失败,不要输出任何内容,包括解释性文本;
- 输出json格式。
# 示例 - 识别到指令
{{
    "name": "xxx"
}}
# 示例 - 未识别到数据包
""
# 文本内容:
{text}
''')
        chain = prompt.prompt | llm | JsonOutputParser()
        resp = chain.invoke({"text": paragraph.full_text})
        return resp
    def get_tm_pkt_info(self, paragraph: ParagraphInfo):
        if self.doc_type not in [DocType.tm_outline, DocType.tm_pkt_design]:
            return ''
        prompt = HumanMessagePromptTemplate.from_template('''
# 指令
识别遥测包信息,请从下面的文本中识别遥测包信息,如果识别失败不要输出任何字符。
识别规则:章节标题中包含包名称和代号,章节内容为表格,表格中包括包头定义和包参数定义。
提取的遥测包信息包括:包名称,包代号,APID。
# 约束
- 如果文本内容是目录则不要输出任何字符;
- 文本描述的内容是单个遥测包,如果有多个遥测包则不要输出任何字符;
- 文本结构通常是:包名称、代号和APID在开头,后面紧接着是包头和参数定义表;
- 如果没有识别到遥测包信息不要输出任何字符;
- 识别失败,不要输出任何内容,包括解释性文本;
- 输出json格式。
# 复合要求的文本结构
1.1.1 code xxx包(APID=0x123)
```json
表格内容
```
# 示例 - 识别到数据包
{{
    "name": "xxx包",
    "code": "xxx",
    "apid": 123
}}
# 示例 - 未识别到数据包
""
# 文本内容:
{text}
''')
        chain = prompt.prompt | llm | JsonOutputParser()
        resp = chain.invoke({"text": paragraph.full_text})
        return resp
    async def gen_chapter_entities(self, paragraph: ParagraphInfo):
        # 获取章节实体词
        entity_names = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text))
        Log.info(f'章节{paragraph.title_num}实体词:{entity_names}')
        if entity_names:
            paragraph.entities = doc_dbh.get_entities_by_names(entity_names)
        # 获取遥测包信息
        pkt = self.get_tm_pkt_info(paragraph)
        if pkt:
            entity = TEntity(name=pkt['code'], type='遥测包配置', prompts='', doc_type='')
            e = doc_dbh.get_entity(entity)
            if e:
                entity.id = e.id
            else:
                doc_dbh.add_entity(entity)
                Log.info(f"新增Entity:{entity.name},id:{entity.id}")
            paragraph.entities.append(entity)
        # 获取指令信息
        cmd = self.get_tc_info(paragraph)
        if cmd:
            entity = TEntity(name=cmd['name'], type='指令格式配置', prompts='', doc_type='')
            e = doc_dbh.get_entity(entity)
            if e:
                entity.id = e.id
            else:
                doc_dbh.add_entity(entity)
                Log.info(f"新增Entity:{entity.name},id:{entity.id}")
            paragraph.entities.append(entity)
    def process(self):
        # 分批并发处理,每批10页
        self.doc_split.split()
        # 分批并发处理,每批10个
        batch_size = 10
        for i in range(0, len(self.doc_split.page_infos), batch_size):
            batch_page_infos = self.doc_split.page_infos[i:i + batch_size]
        for i in range(0, len(self.doc_split.paragraphs), batch_size):
            batch_paragraphs = self.doc_split.paragraphs[i:i + batch_size]
            tasks = []
            for page_info in batch_page_infos:
                tasks.append(self.gen_page_entities(page_info))
            asyncio.run(asyncio.gather(*tasks))
        self.save_to_neo4j()
            for paragraph in batch_paragraphs:
                tasks.append(self.gen_chapter_entities(paragraph))
    def save_to_neo4j(self):
            async def run():
                await asyncio.gather(*tasks)
            asyncio.run(run())
        # 保存到数据库
        self.save_to_db()
    def save_to_db(self):
        """
        保存页和页实体词到neo4j数据库。
        1.每一页为一个Node;
        2.每一个实体词为一个Node;
        3.页和实体词直接建立关系 页->实体词
        :return:
        保存段落和段落实体词关系到数据库。
        """
        for page_info in self.doc_split.page_infos:
            # 创建页节点
            page_node = self.neo4j.create_page_node(page_info)
            entity_nodes = []
            for entity in page_info.entities:
                # 创建实体词节点
                entity_node = self.neo4j.create_entity_node(entity)
                # 建立关系 页->实体词
                self.neo4j.create_page_entity_relationship(page_node, entity_node)
                entity_nodes.append(entity_node)
            if len(entity_nodes) > 0:
                for i in range(len(entity_nodes)):
                    prev_entity_node = entity_nodes[i]
                    for entity_node in entity_nodes[i + 1:]:
                        # 建立关系 一页中的 实体词1->实体词2
                        self.neo4j.create_entity_relationship(prev_entity_node, entity_node)
if __name__ == '__main__':
    pdf_file = "D:/workspace/PythonProjects/KnowledgeBase/doc/XA-5D无人机探测大纲(公开)111.pdf"
    doc_processor = DocProcessor(pdf_file)
    doc_processor.process()
        Log.info('保存段落和段落实体词关系到数据库...')
        with open(self.docx_file, 'rb') as f:
            file_bytes = f.read()
            md5 = utils.generate_bytes_md5(file_bytes)
        doc = DocInfo(os.path.basename(self.docx_file), md5, self.doc_type, self.doc_split.paragraph_tree)
        self.doc_id = doc_dbh.add_doc(doc)
        for paragraph in doc.paragraphs:
            doc_dbh.add_paragraph(self.doc_id, None, paragraph)
        Log.info('保存段落和段落实体词关系到数据库完成')