lyg
2025-05-22 c099e6662b8a6e320ac314d31eda9b40455e5aa7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
# -*- coding: utf-8 -*-
# @file: doc_processor.py
# @author: lyg
# @date: 2025-5-13
# @version: 
# @description: 处理文档,拆分文档,将拆分后的章节保存到数据库中。
from langchain_core.messages import HumanMessage
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate, HumanMessagePromptTemplate
 
from knowledgebase.db.doc_db_models import TEntity
from knowledgebase.doc.docx_split import DocSplit
import asyncio
from knowledgebase.db.doc_db_helper import doc_dbh
from knowledgebase.doc.entity_helper import entity_helper
from knowledgebase.doc.entity_recognition import EntityRecognition
import os.path
 
from knowledgebase.doc.models import DocInfo, ParagraphInfo, DocType
from knowledgebase.llm import llm
from knowledgebase.log import Log
from knowledgebase import utils
 
 
class DocProcessor:
    def __init__(self, docx_file: str):
        """
        文档处理
        :param docx_file: 要处理的文档
        """
        Log.info(f'开始处理文档:{docx_file}')
        self.docx_file = docx_file
        self.doc_type = self.get_doc_type()
        self.doc_split = DocSplit(docx_file, self.doc_type)
        self.entity_recognition = EntityRecognition(self.doc_type)
        self.doc_id = 0
 
    def get_doc_type(self):
        Log.info(f'识别文档类型:{self.docx_file}')
        rules = ';\n'.join([f'- {it}:{entity_helper.doc_prompt_map[it]}' for it in entity_helper.doc_prompt_map.keys()])
        msg = HumanMessage(f'''
# 指令
请从下面的文件名中识别文档类型,如果识别失败不要输出任何字符。
文件名:{os.path.basename(self.docx_file)}
# 识别规则
{rules}
# 示例
遥测大纲
''')
        resp = llm.invoke([msg])
        Log.info(f'识别结果:{resp.content}')
        return resp.content
    def get_tc_info(self,  paragraph: ParagraphInfo):
        if self.doc_type not in [DocType.tc_format]:
            return ''
        prompt = HumanMessagePromptTemplate.from_template('''
# 指令
请从下面的文本中识别指令信息,如果识别失败不要输出任何字符。
指令信息包括:指令名称。
# 识别规则
- 文本内容为遥控指令数据域或遥控指令应用数据的定义描述。
# 约束
- 如果文本内容是目录则不要输出任何字符;
- 指令名称在章节标题中,提取指令名称要和文本中的严格一致;
- 如果没有识别到指令信息不要输出任何字符;
- 识别失败,不要输出任何内容,包括解释性文本;
- 输出json格式。
# 示例 - 识别到指令
{{
    "name": "xxx"
}}
# 示例 - 未识别到数据包
""
# 文本内容:
{text}
''')
        chain = prompt.prompt | llm | JsonOutputParser()
        resp = chain.invoke({"text": paragraph.full_text})
        return resp
    def get_tm_pkt_info(self, paragraph: ParagraphInfo):
        if self.doc_type not in [DocType.tm_outline, DocType.tm_pkt_design]:
            return ''
        prompt = HumanMessagePromptTemplate.from_template('''
# 指令
识别遥测包信息,请从下面的文本中识别遥测包信息,如果识别失败不要输出任何字符。
识别规则:章节标题中包含包名称和代号,章节内容为表格,表格中包括包头定义和包参数定义。
提取的遥测包信息包括:包名称,包代号,APID。
# 约束
- 如果文本内容是目录则不要输出任何字符;
- 文本描述的内容是单个遥测包,如果有多个遥测包则不要输出任何字符;
- 文本结构通常是:包名称、代号和APID在开头,后面紧接着是包头和参数定义表;
- 如果没有识别到遥测包信息不要输出任何字符;
- 识别失败,不要输出任何内容,包括解释性文本;
- 输出json格式。
# 复合要求的文本结构
1.1.1 code xxx包(APID=0x123)
```json
表格内容
``` 
# 示例 - 识别到数据包
{{
    "name": "xxx包",
    "code": "xxx",
    "apid": 123
}}
# 示例 - 未识别到数据包
""
# 文本内容:
{text}
''')
        chain = prompt.prompt | llm | JsonOutputParser()
        resp = chain.invoke({"text": paragraph.full_text})
        return resp
 
    async def gen_chapter_entities(self, paragraph: ParagraphInfo):
        # 获取章节实体词
        entity_names = await asyncio.to_thread(lambda: self.entity_recognition.run(paragraph.full_text))
        Log.info(f'章节{paragraph.title_num}实体词:{entity_names}')
        if entity_names:
            paragraph.entities = doc_dbh.get_entities_by_names(entity_names)
        # 获取遥测包信息
        pkt = self.get_tm_pkt_info(paragraph)
        if pkt:
            entity = TEntity(name=pkt['code'], type='遥测包配置', prompts='', doc_type='')
            e = doc_dbh.get_entity(entity)
            if e:
                entity.id = e.id
            else:
                doc_dbh.add_entity(entity)
                Log.info(f"新增Entity:{entity.name},id:{entity.id}")
            paragraph.entities.append(entity)
        # 获取指令信息
        cmd = self.get_tc_info(paragraph)
        if cmd:
            entity = TEntity(name=cmd['name'], type='指令格式配置', prompts='', doc_type='')
            e = doc_dbh.get_entity(entity)
            if e:
                entity.id = e.id
            else:
                doc_dbh.add_entity(entity)
                Log.info(f"新增Entity:{entity.name},id:{entity.id}")
            paragraph.entities.append(entity)
 
    def process(self):
        self.doc_split.split()
        # 分批并发处理,每批10个
        batch_size = 10
        for i in range(0, len(self.doc_split.paragraphs), batch_size):
            batch_paragraphs = self.doc_split.paragraphs[i:i + batch_size]
            tasks = []
            for paragraph in batch_paragraphs:
                tasks.append(self.gen_chapter_entities(paragraph))
 
            async def run():
                await asyncio.gather(*tasks)
 
            asyncio.run(run())
        # 保存到数据库
        self.save_to_db()
 
    def save_to_db(self):
        """
        保存段落和段落实体词关系到数据库。
        """
        Log.info('保存段落和段落实体词关系到数据库...')
        with open(self.docx_file, 'rb') as f:
            file_bytes = f.read()
            md5 = utils.generate_bytes_md5(file_bytes)
        doc = DocInfo(os.path.basename(self.docx_file), md5, self.doc_type, self.doc_split.paragraph_tree)
        self.doc_id = doc_dbh.add_doc(doc)
        for paragraph in doc.paragraphs:
            doc_dbh.add_paragraph(self.doc_id, None, paragraph)
        Log.info('保存段落和段落实体词关系到数据库完成')