| | |
| | | # -*- coding: utf-8 -*- |
| | | # @file: doc_processor.py |
| | | # @author: lyg |
| | | # @date: 20250427 |
| | | # @date: 2025-5-13 |
| | | # @version: |
| | | # @description: 处理文档,提取章节信息,提取页码信息,提取实体词,写入图数据库(neo4j)。 |
| | | from knowledgebase.db.neo4j import Neo4jHelper |
| | | from knowledgebase.doc.doc_split import DocSplit |
| | | from knowledgebase.doc.entity_recognition import EntityRecognition |
| | | # @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。 |
| | | from langchain_core.messages import HumanMessage |
| | | from langchain_core.output_parsers import JsonOutputParser |
| | | from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate |
| | | |
| | | from knowledgebase.db.doc_db_models import TEntity |
| | | from knowledgebase.doc.docx_split import DocSplit |
| | | import asyncio |
| | | from knowledgebase.db.doc_db_helper import doc_dbh |
| | | from knowledgebase.doc.entity_helper import entity_helper |
| | | from knowledgebase.doc.entity_recognition import EntityRecognition |
| | | import os.path |
| | | |
| | | from knowledgebase.doc.models import DocInfo, ParagraphInfo, DocType |
| | | from knowledgebase.llm import llm |
| | | from knowledgebase.log import Log |
| | | from knowledgebase import utils |
| | | |
| | | |
| | | class DocProcessor: |
| | | def __init__(self, pdf_file): |
| | | self.doc_split = DocSplit(pdf_file) |
| | | self.entity_recognition = EntityRecognition() |
| | | self.neo4j = Neo4jHelper() |
| | | def __init__(self, docx_file: str): |
| | | """ |
| | | 文档处理 |
| | | :param docx_file: 要处理的文档 |
| | | """ |
| | | Log.info(f'开始处理文档:{docx_file}') |
| | | self.docx_file = docx_file |
| | | self.doc_type = self.get_doc_type() |
| | | self.doc_split = DocSplit(docx_file, self.doc_type) |
| | | self.entity_recognition = EntityRecognition(self.doc_type) |
| | | self.doc_id = 0 |
| | | |
| | | async def gen_page_entities(self, page_info): |
| | | # 获取页面实体词 |
| | | page_entities = await asyncio.to_thread(lambda: self.entity_recognition.run(page_info.text)) |
| | | page_info.entities = page_entities |
| | | def get_doc_type(self): |
| | | Log.info(f'识别文档类型:{self.docx_file}') |
| | | rules = ';\n'.join([f'- {it}:{entity_helper.doc_prompt_map[it]}' for it in entity_helper.doc_prompt_map.keys()]) |
| | | msg = HumanMessage(f''' |
| | | # 指令 |
| | | 请从下面的文件名中识别文档类型,如果识别失败不要输出任何字符。 |
| | | 文件名:{os.path.basename(self.docx_file)} |
| | | # 识别规则 |
| | | {rules} |
| | | # 示例 |
| | | 遥测大纲 |
| | | ''') |
| | | resp = llm.invoke([msg]) |
| | | Log.info(f'识别结果:{resp.content}') |
| | | return resp.content |
| | | def get_tc_info(self, paragraph: ParagraphInfo): |
| | | if self.doc_type not in [DocType.tc_format]: |
| | | return '' |
| | | prompt = HumanMessagePromptTemplate.from_template(''' |
| | | # 指令 |
| | | 请从下面的文本中识别指令信息,如果识别失败不要输出任何字符。 |
| | | 指令信息包括:指令名称。 |
| | | # 识别规则 |
| | | - 文本内容为遥控指令数据域或遥控指令应用数据的定义描述。 |
| | | # 约束 |
| | | - 如果文本内容是目录则不要输出任何字符; |
| | | - 指令名称在章节标题中,提取指令名称要和文本中的严格一致; |
| | | - 如果没有识别到指令信息不要输出任何字符; |
| | | - 识别失败,不要输出任何内容,包括解释性文本; |
| | | - 输出json格式。 |
| | | # 示例 - 识别到指令 |
| | | {{ |
| | | "name": "xxx" |
| | | }} |
| | | # 示例 - 未识别到数据包 |
| | | "" |
| | | # 文本内容: |
| | | {text} |
| | | ''') |
| | | chain = prompt.prompt | llm | JsonOutputParser() |
| | | resp = chain.invoke({"text": paragraph.full_text}) |
| | | return resp |
| | | def get_tm_pkt_info(self, paragraph: ParagraphInfo): |
| | | if self.doc_type not in [DocType.tm_outline, DocType.tm_pkt_design]: |
| | | return '' |
| | | prompt = HumanMessagePromptTemplate.from_template(''' |
| | | # 指令 |
| | | 识别遥测包信息,请从下面的文本中识别遥测包信息,如果识别失败不要输出任何字符。 |
| | | 识别规则:章节标题中包含包名称和代号,章节内容为表格,表格中包括包头定义和包参数定义。 |
| | | 提取的遥测包信息包括:包名称,包代号,APID。 |
| | | # 约束 |
| | | - 如果文本内容是目录则不要输出任何字符; |
| | | - 文本描述的内容是单个遥测包,如果有多个遥测包则不要输出任何字符; |
| | | - 文本结构通常是:包名称、代号和APID在开头,后面紧接着是包头和参数定义表; |
| | | - 如果没有识别到遥测包信息不要输出任何字符; |
| | | - 识别失败,不要输出任何内容,包括解释性文本; |
| | | - 输出json格式。 |
| | | # 复合要求的文本结构 |
| | | 1.1.1 code xxx包(APID=0x123) |
| | | ```json |
| | | 表格内容 |
| | | ``` |
| | | # 示例 - 识别到数据包 |
| | | {{ |
| | | "name": "xxx包", |
| | | "code": "xxx", |
| | | "apid": 123 |
| | | }} |
| | | # 示例 - 未识别到数据包 |
| | | "" |
| | | # 文本内容: |
| | | {text} |
| | | ''') |
| | | chain = prompt.prompt | llm | JsonOutputParser() |
| | | resp = chain.invoke({"text": paragraph.full_text}) |
| | | return resp |
| | | |
| | | async def gen_chapter_entities(self, paragraph: ParagraphInfo): |
| | | # 获取章节实体词 |
| | | entity_names = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text)) |
| | | Log.info(f'章节{paragraph.title_num}实体词:{entity_names}') |
| | | if entity_names: |
| | | paragraph.entities = doc_dbh.get_entities_by_names(entity_names) |
| | | # 获取遥测包信息 |
| | | pkt = self.get_tm_pkt_info(paragraph) |
| | | if pkt: |
| | | entity = TEntity(name=pkt['code'], type='遥测包配置', prompts='', doc_type='') |
| | | e = doc_dbh.get_entity(entity) |
| | | if e: |
| | | entity.id = e.id |
| | | else: |
| | | doc_dbh.add_entity(entity) |
| | | Log.info(f"新增Entity:{entity.name},id:{entity.id}") |
| | | paragraph.entities.append(entity) |
| | | # 获取指令信息 |
| | | cmd = self.get_tc_info(paragraph) |
| | | if cmd: |
| | | entity = TEntity(name=cmd['name'], type='指令格式配置', prompts='', doc_type='') |
| | | e = doc_dbh.get_entity(entity) |
| | | if e: |
| | | entity.id = e.id |
| | | else: |
| | | doc_dbh.add_entity(entity) |
| | | Log.info(f"新增Entity:{entity.name},id:{entity.id}") |
| | | paragraph.entities.append(entity) |
| | | |
| | | def process(self): |
| | | # 分批并发处理,每批10页 |
| | | self.doc_split.split() |
| | | # 分批并发处理,每批10个 |
| | | batch_size = 10 |
| | | for i in range(0, len(self.doc_split.page_infos), batch_size): |
| | | batch_page_infos = self.doc_split.page_infos[i:i + batch_size] |
| | | for i in range(0, len(self.doc_split.paragraphs), batch_size): |
| | | batch_paragraphs = self.doc_split.paragraphs[i:i + batch_size] |
| | | tasks = [] |
| | | for page_info in batch_page_infos: |
| | | tasks.append(self.gen_page_entities(page_info)) |
| | | asyncio.run(asyncio.gather(*tasks)) |
| | | self.save_to_neo4j() |
| | | for paragraph in batch_paragraphs: |
| | | tasks.append(self.gen_chapter_entities(paragraph)) |
| | | |
| | | def save_to_neo4j(self): |
| | | async def run(): |
| | | await asyncio.gather(*tasks) |
| | | |
| | | asyncio.run(run()) |
| | | # 保存到数据库 |
| | | self.save_to_db() |
| | | |
| | | def save_to_db(self): |
| | | """ |
| | | 保存页和页实体词到neo4j数据库。 |
| | | |
| | | 1.每一页为一个Node; |
| | | 2.每一个实体词为一个Node; |
| | | 3.页和实体词直接建立关系 页->实体词 |
| | | :return: |
| | | 保存段落和段落实体词关系到数据库。 |
| | | """ |
| | | for page_info in self.doc_split.page_infos: |
| | | # 创建页节点 |
| | | page_node = self.neo4j.create_page_node(page_info) |
| | | entity_nodes = [] |
| | | for entity in page_info.entities: |
| | | # 创建实体词节点 |
| | | entity_node = self.neo4j.create_entity_node(entity) |
| | | # 建立关系 页->实体词 |
| | | self.neo4j.create_page_entity_relationship(page_node, entity_node) |
| | | entity_nodes.append(entity_node) |
| | | if len(entity_nodes) > 0: |
| | | for i in range(len(entity_nodes)): |
| | | prev_entity_node = entity_nodes[i] |
| | | for entity_node in entity_nodes[i + 1:]: |
| | | # 建立关系 一页中的 实体词1->实体词2 |
| | | self.neo4j.create_entity_relationship(prev_entity_node, entity_node) |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | pdf_file = "D:/workspace/PythonProjects/KnowledgeBase/doc/XA-5D无人机探测大纲(公开)111.pdf" |
| | | doc_processor = DocProcessor(pdf_file) |
| | | doc_processor.process() |
| | | Log.info('保存段落和段落实体词关系到数据库...') |
| | | with open(self.docx_file, 'rb') as f: |
| | | file_bytes = f.read() |
| | | md5 = utils.generate_bytes_md5(file_bytes) |
| | | doc = DocInfo(os.path.basename(self.docx_file), md5, self.doc_type, self.doc_split.paragraph_tree) |
| | | self.doc_id = doc_dbh.add_doc(doc) |
| | | for paragraph in doc.paragraphs: |
| | | doc_dbh.add_paragraph(self.doc_id, None, paragraph) |
| | | Log.info('保存段落和段落实体词关系到数据库完成') |