| | |
| | | # -*- coding: utf-8 -*- |
| | | # @file: doc_processor.py |
| | | # @author: lyg |
| | | # @date: 20250427 |
| | | # @date: 2025-5-13 |
| | | # @version: |
| | | # @description: 处理文档,提取章节信息,提取页码信息,提取实体词,写入图数据库(neo4j)。 |
| | | from knowledgebase.db.neo4j import Neo4jHelper |
| | | from knowledgebase.doc.doc_split import DocSplit |
| | | from knowledgebase.doc.entity_recognition import EntityRecognition |
| | | # @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。 |
| | | from langchain_core.messages import HumanMessage |
| | | |
| | | from knowledgebase.doc.docx_split import DocSplit |
| | | import asyncio |
| | | from knowledgebase.db.doc_db_helper import doc_dbh |
| | | from knowledgebase.doc.entity_helper import entity_helper |
| | | from knowledgebase.doc.entity_recognition import EntityRecognition |
| | | import os.path |
| | | |
| | | from knowledgebase.doc.models import DocInfo, ParagraphInfo |
| | | from knowledgebase.llm import llm |
| | | from knowledgebase.log import Log |
| | | from knowledgebase import utils |
| | | |
| | | |
| | | class DocProcessor: |
| | | def __init__(self, pdf_file): |
| | | self.doc_split = DocSplit(pdf_file) |
| | | self.entity_recognition = EntityRecognition() |
| | | self.neo4j = Neo4jHelper() |
| | | def __init__(self, docx_file: str): |
| | | """ |
| | | 文档处理 |
| | | :param docx_file: 要处理的文档 |
| | | """ |
| | | Log.info(f'开始处理文档:{docx_file}') |
| | | self.docx_file = docx_file |
| | | self.doc_split = DocSplit(docx_file) |
| | | self.doc_type = self.get_doc_type() |
| | | self.entity_recognition = EntityRecognition(self.doc_type) |
| | | self.doc_id = 0 |
| | | |
| | | async def gen_page_entities(self, page_info): |
| | | # 获取页面实体词 |
| | | page_entities = await asyncio.to_thread(lambda: self.entity_recognition.run(page_info.text)) |
| | | page_info.entities = page_entities |
| | | def get_doc_type(self): |
| | | Log.info(f'识别文档类型:{self.docx_file}') |
| | | rules = ';\n'.join([f'- {it}:{entity_helper.doc_prompt_map[it]}' for it in entity_helper.doc_prompt_map.keys()]) |
| | | msg = HumanMessage(f''' |
| | | # 指令 |
| | | 请从下面的文件名中识别文档类型,如果识别失败不要输出任何字符。 |
| | | 文件名:{os.path.basename(self.docx_file)} |
| | | # 识别规则 |
| | | {rules} |
| | | # 示例 |
| | | 遥测大纲 |
| | | ''') |
| | | resp = llm.invoke([msg]) |
| | | Log.info(f'识别结果:{resp.content}') |
| | | return resp.content |
| | | |
| | | async def gen_sect_entities(self, paragraph: ParagraphInfo): |
| | | # Log.info(f'生成章节实体词:{paragraph.full_text}') |
| | | # 获取章节实体词 |
| | | entities = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text)) |
| | | Log.info(f'章节实体词:{entities}') |
| | | if entities: |
| | | paragraph.entities = [next(filter(lambda x: x.name == e, entity_helper.entities), None) for e in entities] |
| | | paragraph.entities = [e for e in paragraph.entities if e] |
| | | |
| | | def process(self): |
| | | # 分批并发处理,每批10页 |
| | | self.doc_split.split() |
| | | # 分批并发处理,每批10个 |
| | | batch_size = 10 |
| | | for i in range(0, len(self.doc_split.page_infos), batch_size): |
| | | batch_page_infos = self.doc_split.page_infos[i:i + batch_size] |
| | | for i in range(0, len(self.doc_split.paragraphs), batch_size): |
| | | batch_paragraphs = self.doc_split.paragraphs[i:i + batch_size] |
| | | tasks = [] |
| | | for page_info in batch_page_infos: |
| | | tasks.append(self.gen_page_entities(page_info)) |
| | | asyncio.run(asyncio.gather(*tasks)) |
| | | self.save_to_neo4j() |
| | | for paragraph in batch_paragraphs: |
| | | tasks.append(self.gen_sect_entities(paragraph)) |
| | | |
| | | def save_to_neo4j(self): |
| | | """ |
| | | 保存页和页实体词到neo4j数据库。 |
| | | async def run(): |
| | | await asyncio.gather(*tasks) |
| | | |
| | | 1.每一页为一个Node; |
| | | 2.每一个实体词为一个Node; |
| | | 3.页和实体词直接建立关系 页->实体词 |
| | | :return: |
| | | asyncio.run(run()) |
| | | # 保存到数据库 |
| | | self.save_to_db() |
| | | |
| | | def save_to_db(self): |
| | | """ |
| | | for page_info in self.doc_split.page_infos: |
| | | # 创建页节点 |
| | | page_node = self.neo4j.create_page_node(page_info) |
| | | entity_nodes = [] |
| | | for entity in page_info.entities: |
| | | # 创建实体词节点 |
| | | entity_node = self.neo4j.create_entity_node(entity) |
| | | # 建立关系 页->实体词 |
| | | self.neo4j.create_page_entity_relationship(page_node, entity_node) |
| | | entity_nodes.append(entity_node) |
| | | if len(entity_nodes) > 0: |
| | | for i in range(len(entity_nodes)): |
| | | prev_entity_node = entity_nodes[i] |
| | | for entity_node in entity_nodes[i + 1:]: |
| | | # 建立关系 一页中的 实体词1->实体词2 |
| | | self.neo4j.create_entity_relationship(prev_entity_node, entity_node) |
| | | 保存段落和段落实体词关系到数据库。 |
| | | """ |
| | | Log.info('保存段落和段落实体词关系到数据库...') |
| | | with open(self.docx_file, 'rb') as f: |
| | | file_bytes = f.read() |
| | | md5 = utils.generate_bytes_md5(file_bytes) |
| | | doc = DocInfo(os.path.basename(self.docx_file), md5, self.doc_type, self.doc_split.paragraph_tree) |
| | | self.doc_id = doc_dbh.add_doc(doc) |
| | | for paragraph in doc.paragraphs: |
| | | doc_dbh.add_paragraph(self.doc_id, None, paragraph) |
| | | Log.info('保存段落和段落实体词关系到数据库完成') |
| | | |
| | | |
| | | if __name__ == '__main__': |
| | | pdf_file = "D:/workspace/PythonProjects/KnowledgeBase/doc/XA-5D无人机探测大纲(公开)111.pdf" |
| | | doc_processor = DocProcessor(pdf_file) |
| | | doc_processor.process() |
| | | files = [ |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机1553B总线传输通信帧分配(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机分系统遥测源包设计报告(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机软件用户需求(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测大纲(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机遥测信号分配表(公开).docx", |
| | | # r"D:\workspace\PythonProjects\KnowledgeBase\doc\XA-5D无人机指令格式与编码定义(公开).docx", |
| | | r"D:\workspace\PythonProjects\KnowledgeBase\doc\指令格式(公开).docx" |
| | | ] |
| | | for file in files: |
| | | doc_processor = DocProcessor(file) |
| | | doc_processor.process() |
| | | |
| | | # doc_dbh.get_docs() |