# -*- coding: utf-8 -*-
|
# @file: doc_processor.py
|
# @author: lyg
|
# @date: 2025-5-13
|
# @version:
|
# @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。
|
from langchain_core.messages import HumanMessage
|
from langchain_core.output_parsers import JsonOutputParser
|
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
|
|
from knowledgebase.db.doc_db_models import TEntity
|
from knowledgebase.doc.docx_split import DocSplit
|
import asyncio
|
from knowledgebase.db.doc_db_helper import doc_dbh
|
from knowledgebase.doc.entity_helper import get_entity_helper
|
from knowledgebase.doc.entity_recognition import EntityRecognition
|
import os.path
|
|
from knowledgebase.doc.models import DocInfo, ParagraphInfo, DocType
|
from knowledgebase.llm import llm
|
from knowledgebase.log import Log
|
from knowledgebase import utils
|
|
|
class DocProcessor:
|
def __init__(self, docx_file: str):
|
"""
|
文档处理
|
:param docx_file: 要处理的文档
|
"""
|
Log.info(f'开始处理文档:{docx_file}')
|
self.docx_file = docx_file
|
self.doc_type = self.get_doc_type()
|
self.doc_split = DocSplit(docx_file, self.doc_type)
|
self.entity_recognition = EntityRecognition(self.doc_type)
|
self.doc_id = 0
|
|
def get_doc_type(self):
|
entity_helper = get_entity_helper()
|
Log.info(f'识别文档类型:{self.docx_file}')
|
rules = ';\n'.join([f'- {it}:{entity_helper.doc_prompt_map[it]}' for it in entity_helper.doc_prompt_map.keys()])
|
msg = HumanMessage(f'''
|
# 指令
|
请从下面的文件名中识别文档类型,如果识别失败不要输出任何字符。
|
文件名:{os.path.basename(self.docx_file)}
|
# 识别规则
|
{rules}
|
# 示例
|
遥测大纲
|
''')
|
resp = llm.invoke([msg])
|
Log.info(f'识别结果:{resp.content}')
|
return resp.content
|
|
async def get_tc_info(self, paragraph: ParagraphInfo):
|
if self.doc_type not in [DocType.tc_format]:
|
return ''
|
prompt = HumanMessagePromptTemplate.from_template('''
|
# 指令
|
请从下面的文本中识别指令信息,如果识别失败不要输出任何字符。
|
指令信息包括:指令名称。
|
# 识别规则
|
- 文本内容为遥控指令数据域或遥控指令应用数据的定义描述。
|
# 约束
|
- 如果文本内容是目录则不要输出任何字符;
|
- 指令名称在章节标题中,提取指令名称要和文本中的严格一致;
|
- 如果没有识别到指令信息不要输出任何字符;
|
- 识别失败,不要输出任何内容,包括解释性文本;
|
- 输出json格式。
|
# 示例 - 识别到指令
|
{{
|
"name": "xxx"
|
}}
|
# 示例 - 未识别到指令
|
""
|
# 文本内容:
|
{text}
|
''')
|
chain = prompt.prompt | llm | JsonOutputParser()
|
resp = await chain.ainvoke({"text": paragraph.full_text})
|
import json
|
# Log.info(f'>>>>>>指令识别:\n{paragraph.full_text}')
|
# Log.info(f'<<<<<<指令:{json.dumps(resp, ensure_ascii=False)}')
|
return resp
|
|
async def get_tm_pkt_info(self, paragraph: ParagraphInfo):
|
if self.doc_type not in [DocType.tm_outline, DocType.tm_pkt_design]:
|
return ''
|
prompt = HumanMessagePromptTemplate.from_template('''
|
# 指令
|
识别遥测包信息,请从下面的文本中识别遥测包信息,如果识别失败不要输出任何字符。
|
识别规则:章节标题中包含包名称和代号,章节内容为表格,表格中包括包头定义和包参数定义。
|
提取的遥测包信息包括:包名称,包代号。
|
# 约束
|
- 如果文本内容是目录则不要输出任何字符;
|
- 文本描述的内容是单个遥测包,如果有多个遥测包则不要输出任何字符;
|
- 文本结构通常是:包名称、代号和APID(应用过程标识)在开头(应用过程标识也有可能在表格中),后面紧接着是包头和参数定义表;
|
- 如果没有识别到遥测包信息不要输出任何字符;
|
- 识别失败,不要输出任何内容,包括解释性文本;
|
- 输出json格式。
|
# 符合要求的文本结构1
|
1.1.1 code xxx包(APID=0x123)
|
```json
|
表格内容
|
```
|
# 符合要求的文本结构2
|
1.1.1 code xxx包
|
```json
|
表格内容
|
应用过程标识
|
...
|
```
|
# 示例 - 识别到数据包
|
{{
|
"name": "xxx包",
|
"code": "TMS001"
|
}}
|
# 示例 - 未识别到数据包
|
""
|
# 文本内容:
|
{text}
|
''')
|
chain = prompt.prompt | llm | JsonOutputParser()
|
resp = await chain.ainvoke({"text": paragraph.full_text})
|
return resp
|
|
async def get_chapter_refs(self, paragraph: ParagraphInfo, toc: [str]) -> [str]:
|
if self.doc_type not in [DocType.tc_format]:
|
return ''
|
toc_text = '\n'.join(toc)
|
prompt = HumanMessagePromptTemplate.from_template(f'''
|
# 角色
|
你是一名资深的软件工程师。
|
# 指令
|
帮助我完成对文本中引用关系的抽取,判断当前文本中是否包含了引用信息,例如包含以下关键字:“详见1.1”、“见1.1”、“具体见1.1”、“见附录”等。
|
如果包含引用,将引用与“目录内容”中的目录条目进行匹配。
|
将匹配到的目录条目输出,输出格式为json格式。
|
# 约束
|
- 是否包含引用的判断条件中必须包含引用相关的描述,例如:“详见1.1”、“见1.1”、“具体见1.1”、“见附录”等;
|
- 注意不要自己引用自己;
|
- 仅提取目录内容中包含的条目,如果目录内容不包含则不提取;
|
- 如果仅靠标题号码无法确定目录条目的,根据文本内容匹配对应的目录条目;
|
- 输出的内容必须是目录中的条目;
|
- 输出json格式,不要输出任何json以外的字符。
|
# 输出案例
|
["1.1 xxx"]
|
# 目录内容:
|
{toc_text}
|
# 文本内容:
|
{{text}}
|
''')
|
chain = prompt.prompt | llm | JsonOutputParser()
|
resp = await chain.ainvoke({"text": paragraph.full_text})
|
return resp
|
|
async def gen_chapter_entities(self, paragraph: ParagraphInfo, paragraphs: [ParagraphInfo], toc: [str]):
|
# 获取章节实体词
|
entity_names_task = self.entity_recognition.run(paragraph.full_text)
|
# 获取指令信息
|
cmd_task = self.get_tc_info(paragraph)
|
# 获取遥测包信息
|
pkt_task = self.get_tm_pkt_info(paragraph)
|
# 获取文档引用
|
refs_task = self.get_chapter_refs(paragraph, toc)
|
entity_names, cmd, pkt, chapter_refs = await asyncio.gather(entity_names_task, cmd_task, pkt_task, refs_task)
|
|
Log.info(f'章节{paragraph.title_num}实体词:{entity_names}')
|
Log.info(f'章节{paragraph.title_num}引用:{chapter_refs}')
|
if entity_names:
|
paragraph.entities = doc_dbh.get_entities_by_names(entity_names)
|
|
if pkt:
|
entity = TEntity(name=pkt['code'], type='遥测包配置', prompts='', doc_type='')
|
e = doc_dbh.get_entity(entity)
|
if e:
|
entity.id = e.id
|
else:
|
doc_dbh.add_entity(entity)
|
Log.info(f"新增Entity:{entity.name},id:{entity.id}")
|
paragraph.entities.append(entity)
|
|
if cmd:
|
entity = TEntity(name=cmd['name'], type='指令格式配置', prompts='', doc_type='')
|
e = doc_dbh.get_entity(entity)
|
if e:
|
entity.id = e.id
|
else:
|
doc_dbh.add_entity(entity)
|
Log.info(f"新增Entity:{entity.name},id:{entity.id}")
|
paragraph.entities.append(entity)
|
# 获取引用信息
|
if chapter_refs:
|
for ref in chapter_refs:
|
_p = next(filter(lambda p: ref == p.title, self.doc_split.paragraphs), None)
|
if _p:
|
if paragraph != _p:
|
paragraph.refs.append(_p)
|
|
def process(self):
|
self.doc_split.split()
|
# 分批并发处理,每批10个
|
tasks = []
|
toc = []
|
for p in self.doc_split.paragraphs:
|
if p.title_level:
|
toc.append(p.title)
|
for paragraph in self.doc_split.paragraphs:
|
tasks.append(self.gen_chapter_entities(paragraph, self.doc_split.paragraphs, toc))
|
|
async def run():
|
await asyncio.gather(*tasks)
|
|
asyncio.run(run())
|
# 保存到数据库
|
self.save_to_db()
|
|
def save_to_db(self):
|
"""
|
保存段落和段落实体词关系到数据库。
|
"""
|
Log.info('保存段落和段落实体词关系到数据库...')
|
with open(self.docx_file, 'rb') as f:
|
file_bytes = f.read()
|
md5 = utils.generate_bytes_md5(file_bytes)
|
doc = DocInfo(os.path.basename(self.docx_file), md5, self.doc_type, self.doc_split.paragraph_tree)
|
self.doc_id = doc_dbh.add_doc(doc)
|
for paragraph in doc.paragraphs:
|
doc_dbh.add_paragraph(self.doc_id, None, paragraph)
|
for paragraph in self.doc_split.paragraphs:
|
for ref_paragraph in paragraph.refs:
|
doc_dbh.add_paragraph_ref_link(paragraph.id, ref_paragraph.id)
|
Log.info(f"{paragraph.title} 引用了-> {ref_paragraph.title}")
|
Log.info('保存段落和段落实体词关系到数据库完成')
|