# -*- coding: utf-8 -*- # @file: doc_processor.py # @author: lyg # @date: 2025-5-13 # @version: # @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。 from langchain_core.messages import HumanMessage from langchain_core.output_parsers import JsonOutputParser from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate from knowledgebase.db.doc_db_models import TEntity from knowledgebase.doc.docx_split import DocSplit import asyncio from knowledgebase.db.doc_db_helper import doc_dbh from knowledgebase.doc.entity_helper import get_entity_helper from knowledgebase.doc.entity_recognition import EntityRecognition import os.path from knowledgebase.doc.models import DocInfo, ParagraphInfo, DocType from knowledgebase.llm import llm from knowledgebase.log import Log from knowledgebase import utils class DocProcessor: def __init__(self, docx_file: str): """ 文档处理 :param docx_file: 要处理的文档 """ Log.info(f'开始处理文档:{docx_file}') self.docx_file = docx_file self.doc_type = self.get_doc_type() self.doc_split = DocSplit(docx_file, self.doc_type) self.entity_recognition = EntityRecognition(self.doc_type) self.doc_id = 0 def get_doc_type(self): entity_helper = get_entity_helper() Log.info(f'识别文档类型:{self.docx_file}') rules = ';\n'.join([f'- {it}:{entity_helper.doc_prompt_map[it]}' for it in entity_helper.doc_prompt_map.keys()]) msg = HumanMessage(f''' # 指令 请从下面的文件名中识别文档类型,如果识别失败不要输出任何字符。 文件名:{os.path.basename(self.docx_file)} # 识别规则 {rules} # 示例 遥测大纲 ''') resp = llm.invoke([msg]) Log.info(f'识别结果:{resp.content}') return resp.content async def get_tc_info(self, paragraph: ParagraphInfo): if self.doc_type not in [DocType.tc_format]: return '' prompt = HumanMessagePromptTemplate.from_template(''' # 指令 请从下面的文本中识别指令信息,如果识别失败不要输出任何字符。 指令信息包括:指令名称。 # 识别规则 - 文本内容为遥控指令数据域或遥控指令应用数据的定义描述。 # 约束 - 如果文本内容是目录则不要输出任何字符; - 指令名称在章节标题中,提取指令名称要和文本中的严格一致; - 如果没有识别到指令信息不要输出任何字符; - 识别失败,不要输出任何内容,包括解释性文本; - 输出json格式。 # 示例 - 识别到指令 {{ "name": "xxx" }} # 示例 - 未识别到指令 "" # 文本内容: {text} ''') chain = prompt.prompt | llm | JsonOutputParser() resp = await chain.ainvoke({"text": paragraph.full_text}) import json # Log.info(f'>>>>>>指令识别:\n{paragraph.full_text}') # Log.info(f'<<<<<<指令:{json.dumps(resp, ensure_ascii=False)}') return resp async def get_tm_pkt_info(self, paragraph: ParagraphInfo): if self.doc_type not in [DocType.tm_outline, DocType.tm_pkt_design]: return '' prompt = HumanMessagePromptTemplate.from_template(''' # 指令 识别遥测包信息,请从下面的文本中识别遥测包信息,如果识别失败不要输出任何字符。 识别规则:章节标题中包含包名称和代号,章节内容为表格,表格中包括包头定义和包参数定义。 提取的遥测包信息包括:包名称,包代号。 # 约束 - 如果文本内容是目录则不要输出任何字符; - 文本描述的内容是单个遥测包,如果有多个遥测包则不要输出任何字符; - 文本结构通常是:包名称、代号和APID(应用过程标识)在开头(应用过程标识也有可能在表格中),后面紧接着是包头和参数定义表; - 如果没有识别到遥测包信息不要输出任何字符; - 识别失败,不要输出任何内容,包括解释性文本; - 输出json格式。 # 符合要求的文本结构1 1.1.1 code xxx包(APID=0x123) ```json 表格内容 ``` # 符合要求的文本结构2 1.1.1 code xxx包 ```json 表格内容 应用过程标识 ... ``` # 示例 - 识别到数据包 {{ "name": "xxx包", "code": "TMS001" }} # 示例 - 未识别到数据包 "" # 文本内容: {text} ''') chain = prompt.prompt | llm | JsonOutputParser() resp = await chain.ainvoke({"text": paragraph.full_text}) return resp async def get_chapter_refs(self, paragraph: ParagraphInfo, toc: [str]) -> [str]: if self.doc_type not in [DocType.tc_format]: return '' toc_text = '\n'.join(toc) prompt = HumanMessagePromptTemplate.from_template(f''' # 角色 你是一名资深的软件工程师。 # 指令 帮助我完成对文本中引用关系的抽取,判断当前文本中是否包含了引用信息,例如包含以下关键字:“详见1.1”、“见1.1”、“具体见1.1”、“见附录”等。 如果包含引用,将引用与“目录内容”中的目录条目进行匹配。 将匹配到的目录条目输出,输出格式为json格式。 # 约束 - 是否包含引用的判断条件中必须包含引用相关的描述,例如:“详见1.1”、“见1.1”、“具体见1.1”、“见附录”等; - 注意不要自己引用自己; - 仅提取目录内容中包含的条目,如果目录内容不包含则不提取; - 如果仅靠标题号码无法确定目录条目的,根据文本内容匹配对应的目录条目; - 输出的内容必须是目录中的条目; - 输出json格式,不要输出任何json以外的字符。 # 输出案例 ["1.1 xxx"] # 目录内容: {toc_text} # 文本内容: {{text}} ''') chain = prompt.prompt | llm | JsonOutputParser() resp = await chain.ainvoke({"text": paragraph.full_text}) return resp async def gen_chapter_entities(self, paragraph: ParagraphInfo, paragraphs: [ParagraphInfo], toc: [str]): # 获取章节实体词 entity_names_task = self.entity_recognition.run(paragraph.full_text) # 获取指令信息 cmd_task = self.get_tc_info(paragraph) # 获取遥测包信息 pkt_task = self.get_tm_pkt_info(paragraph) # 获取文档引用 refs_task = self.get_chapter_refs(paragraph, toc) entity_names, cmd, pkt, chapter_refs = await asyncio.gather(entity_names_task, cmd_task, pkt_task, refs_task) Log.info(f'章节{paragraph.title_num}实体词:{entity_names}') Log.info(f'章节{paragraph.title_num}引用:{chapter_refs}') if entity_names: paragraph.entities = doc_dbh.get_entities_by_names(entity_names) if pkt: entity = TEntity(name=pkt['code'], type='遥测包配置', prompts='', doc_type='') e = doc_dbh.get_entity(entity) if e: entity.id = e.id else: doc_dbh.add_entity(entity) Log.info(f"新增Entity:{entity.name},id:{entity.id}") paragraph.entities.append(entity) if cmd: entity = TEntity(name=cmd['name'], type='指令格式配置', prompts='', doc_type='') e = doc_dbh.get_entity(entity) if e: entity.id = e.id else: doc_dbh.add_entity(entity) Log.info(f"新增Entity:{entity.name},id:{entity.id}") paragraph.entities.append(entity) # 获取引用信息 if chapter_refs: for ref in chapter_refs: _p = next(filter(lambda p: ref == p.title, self.doc_split.paragraphs), None) if _p: if paragraph != _p: paragraph.refs.append(_p) def process(self): self.doc_split.split() # 分批并发处理,每批10个 tasks = [] toc = [] for p in self.doc_split.paragraphs: if p.title_level: toc.append(p.title) for paragraph in self.doc_split.paragraphs: tasks.append(self.gen_chapter_entities(paragraph, self.doc_split.paragraphs, toc)) async def run(): await asyncio.gather(*tasks) asyncio.run(run()) # 保存到数据库 self.save_to_db() def save_to_db(self): """ 保存段落和段落实体词关系到数据库。 """ Log.info('保存段落和段落实体词关系到数据库...') with open(self.docx_file, 'rb') as f: file_bytes = f.read() md5 = utils.generate_bytes_md5(file_bytes) doc = DocInfo(os.path.basename(self.docx_file), md5, self.doc_type, self.doc_split.paragraph_tree) self.doc_id = doc_dbh.add_doc(doc) for paragraph in doc.paragraphs: doc_dbh.add_paragraph(self.doc_id, None, paragraph) for paragraph in self.doc_split.paragraphs: for ref_paragraph in paragraph.refs: doc_dbh.add_paragraph_ref_link(paragraph.id, ref_paragraph.id) Log.info(f"{paragraph.title} 引用了-> {ref_paragraph.title}") Log.info('保存段落和段落实体词关系到数据库完成')